Handle all ressource events with unified sqlalchemy event handlers.

This commit is contained in:
Patrick Jentsch 2021-08-20 14:41:40 +02:00
parent 98a43ec86f
commit 1a59b19124
5 changed files with 90 additions and 268 deletions

View File

@ -1,4 +1,4 @@
from .. import db, socketio
from .. import db
from ..decorators import background
from ..models import Corpus, CorpusFile, QueryResult
@ -12,11 +12,6 @@ def build_corpus(corpus_id, *args, **kwargs):
raise Exception('Corpus {} not found'.format(corpus_id))
corpus.build()
db.session.commit()
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
@background
@ -25,12 +20,8 @@ def delete_corpus(corpus_id, *args, **kwargs):
corpus = Corpus.query.get(corpus_id)
if corpus is None:
raise Exception('Corpus {} not found'.format(corpus_id))
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}]
room = 'user_{}'.format(corpus.user_id)
corpus.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
@ -39,13 +30,8 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs):
corpus_file = CorpusFile.query.get(corpus_file_id)
if corpus_file is None:
raise Exception('Corpus file {} not found'.format(corpus_file_id))
event = 'user_{}_patch'.format(corpus_file.corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa
room = 'user_{}'.format(corpus_file.corpus.user_id)
corpus_file.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
@ -54,9 +40,5 @@ def delete_query_result(query_result_id, *args, **kwargs):
query_result = QueryResult.query.get(query_result_id)
if query_result is None:
raise Exception('QueryResult {} not found'.format(query_result_id))
event = 'user_{}_patch'.format(query_result.user_id)
jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa
room = 'user_{}'.format(query_result.user_id)
query_result.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)

View File

@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm,
DisplayOptionsForm, InspectDisplayOptionsForm,
ImportCorpusForm)
from jsonschema import validate
from .. import db, socketio
from .. import db
from ..models import Corpus, CorpusFile, QueryResult
import json
import logging
@ -40,10 +40,6 @@ def add_corpus():
else:
db.session.commit()
flash('Corpus "{}" added!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return redirect(url_for('.corpus', corpus_id=corpus.id))
return render_template('corpora/add_corpus.html.j2', form=form,
title='Add corpus')
@ -106,10 +102,6 @@ def import_corpus():
db.session.commit()
os.remove(archive_file)
flash('Corpus "{}" imported!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201)
else:
@ -212,11 +204,6 @@ def add_corpus_file(corpus_id):
corpus.status = 'unprepared'
db.session.commit()
flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa
{'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa
return render_template('corpora/add_corpus_file.html.j2', corpus=corpus,
form=form, title='Add corpus file')
@ -356,10 +343,6 @@ def add_query_result():
query_result_file_content.pop('cpos_lookup')
query_result.query_metadata = query_result_file_content
db.session.commit()
event = 'user_{}_patch'.format(query_result.user_id)
jsonpatch = [{'op': 'add', 'path': '/query_results/{}'.format(query_result.id), 'value': query_result.to_dict()}] # noqa
room = 'user_{}'.format(query_result.user_id)
socketio.emit(event, jsonpatch, room=room)
flash('Query result added!', 'result')
return make_response({'redirect_url': url_for('.query_result', query_result_id=query_result.id)}, 201) # noqa
return render_template('corpora/query_results/add_query_result.html.j2',

View File

@ -1,161 +1,112 @@
from datetime import datetime
from . import db, socketio
from .models import Job, JobInput, JobResult
import logging
from .models import Corpus, CorpusFile, Job, JobInput, JobResult
###############################################################################
# SQLAlchemy event handlers #
###############################################################################
@db.event.listens_for(Corpus, 'after_delete')
@db.event.listens_for(CorpusFile, 'after_delete')
@db.event.listens_for(Job, 'after_delete')
@db.event.listens_for(JobInput, 'after_delete')
@db.event.listens_for(JobResult, 'after_delete')
def ressource_after_delete(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [{'op': 'remove', 'path': path}]
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
###############################################################################
## Job events #
###############################################################################
@db.event.listens_for(Corpus, 'after_insert')
@db.event.listens_for(CorpusFile, 'after_insert')
@db.event.listens_for(Job, 'after_insert')
@db.event.listens_for(JobInput, 'after_insert')
@db.event.listens_for(JobResult, 'after_insert')
def ressource_after_insert_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [
{
'op': 'add',
'path': path,
'value': ressource.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Corpus, 'after_update')
@db.event.listens_for(CorpusFile, 'after_update')
@db.event.listens_for(Job, 'after_update')
def after_job_update(mapper, connection, job):
@db.event.listens_for(JobInput, 'after_update')
@db.event.listens_for(JobResult, 'after_update')
def ressource_after_update_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
base_path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
base_path = '/corpora/{}/files/{}'.format(ressource.corpus.id,
ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
base_path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/results/{}'.format(ressource.job.id,
ressource.id)
jsonpatch = []
for attr in db.inspect(job).attrs:
for attr in db.inspect(ressource).attrs:
# We don't want to emit changes about relationship fields
if attr.key in ['inputs', 'results']:
if attr.key in ['files', 'inputs', 'results']:
continue
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
# DateTime attributes must be converted to a timestamp
if attr.key in ['creation_date', 'end_date']:
new_value = None if new_value is None else new_value.timestamp()
if isinstance(new_value, datetime):
new_value = new_value.timestamp()
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/{}'.format(job.id, attr.key),
'path': '{}/{}'.format(base_path, attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job.user_id)
room = 'user_{}'.format(job.user_id)
event = 'user_{}_patch'.format(user_id)
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Job, 'after_insert')
def after_job_insert(mapper, connection, job):
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}'.format(job.id),
'value': job.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Job, 'after_delete')
def after_job_delete(mapper, connection, job):
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
###############################################################################
## JobInput events #
###############################################################################
@db.event.listens_for(JobInput, 'after_update')
def after_job_input_update(mapper, connection, job_input):
jsonpatch = []
for attr in db.inspect(job_input).attrs:
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/inputs/{}/{}'.format(job_input.job_id,
job_input.id,
attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job_input.job.user_id)
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobInput, 'after_insert')
def after_job_input_insert(mapper, connection, job_input):
event = 'user_{}_patch'.format(job_input.job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
job_input.id),
'value': job_input.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobInput, 'after_delete')
def after_job_input_delete(mapper, connection, job_input):
event = 'user_{}_patch'.format(job_input.job.user_id)
jsonpatch = [
{
'op': 'remove',
'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
job_input.id)
}
]
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
###############################################################################
## JobResult events #
###############################################################################
@db.event.listens_for(JobResult, 'after_update')
def after_job_result_update(mapper, connection, job_result):
jsonpatch = []
for attr in db.inspect(job_result).attrs:
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/results/{}/{}'.format(job_result.job_id,
job_result.id,
attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job_result.job.user_id)
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobResult, 'after_insert')
def after_job_result_insert(mapper, connection, job_result):
event = 'user_{}_patch'.format(job_result.job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}/results/{}'.format(job_result.job_id,
job_result.id),
'value': job_result.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobResult, 'after_delete')
def after_job_result_delete(mapper, connection, job_result):
event = 'user_{}_patch'.format(job_result.job.user_id)
jsonpatch = [
{
'op': 'remove',
'path': '/jobs/{}/results/{}'.format(job_result.job_id,
job_result.id)
}
]
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)

View File

@ -7,48 +7,8 @@ import docker
class TaskRunner(CheckCorporaMixin, CheckJobsMixin):
def __init__(self):
self.docker = docker.from_env()
self._socketio_message_buffer = {}
def run(self):
self.check_corpora()
self.check_jobs()
db.session.commit()
self.flush_socketio_messages()
def buffer_socketio_message(self, event, payload, room,
msg_id=None, override_policy='replace'):
if room not in self._socketio_message_buffer:
self._socketio_message_buffer[room] = {}
if event not in self._socketio_message_buffer[room]:
self._socketio_message_buffer[room][event] = {}
if msg_id is None:
msg_id = len(self._socketio_message_buffer[room][event].keys())
if override_policy == 'append':
if msg_id in self._socketio_message_buffer[room][event]:
# If the current message value isn't a list, convert it!
if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa
self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa
else:
self._socketio_message_buffer[room][event][msg_id] = []
self._socketio_message_buffer[room][event][msg_id].append(payload)
elif override_policy == 'replace':
self._socketio_message_buffer[room][event][msg_id] = payload
else:
raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa
return msg_id
def buffer_user_patch_operation(self, ressource, patch_operation):
self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id),
patch_operation,
'user_{}'.format(ressource.user_id),
msg_id='_', override_policy='append')
def clear_socketio_message_buffer(self):
self._socketio_message_buffer = {}
def flush_socketio_messages(self):
for room in self._socketio_message_buffer:
for event in self._socketio_message_buffer[room]:
for message in self._socketio_message_buffer[room][event]:
socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa
self.clear_socketio_message_buffer()

View File

@ -85,12 +85,6 @@ class CheckCorporaMixin:
)
else:
corpus.status = 'queued'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
def checkout_build_corpus_service(self, corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
@ -103,12 +97,6 @@ class CheckCorporaMixin:
+ '(corpus.status: {} -> failed)'.format(corpus.status)
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
@ -128,12 +116,6 @@ class CheckCorporaMixin:
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
elif (corpus.status == 'running'
and task_state in ['complete', 'failed']):
try:
@ -148,12 +130,6 @@ class CheckCorporaMixin:
else:
corpus.status = \
'prepared' if task_state == 'complete' else 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
def create_cqpserver_container(self, corpus):
''' # Docker container settings # '''
@ -214,12 +190,6 @@ class CheckCorporaMixin:
+ 'non-zero exit code and detach is False.'
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.ImageNotFound:
logging.error(
'Run "{}" container raised '.format(name)
@ -227,12 +197,6 @@ class CheckCorporaMixin:
+ 'exist.'
)
corpus.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Run "{}" container raised '.format(name)
@ -241,12 +205,6 @@ class CheckCorporaMixin:
)
else:
corpus.status = 'analysing'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
def checkout_analysing_corpus_container(self, corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
@ -255,12 +213,6 @@ class CheckCorporaMixin:
except docker.errors.NotFound:
logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa
corpus.status = 'prepared'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" container raised '.format(container_name)
@ -293,9 +245,3 @@ class CheckCorporaMixin:
)
return
corpus.status = 'prepared'
patch_operation = {
'op': 'replace',
'path': '/corpora/{}/status'.format(corpus.id),
'value': corpus.status
}
self.buffer_user_patch_operation(corpus, patch_operation)