From 5662140a8d132992ce8546b08f92a9ed61ea9b14 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Wed, 18 Aug 2021 15:09:56 +0200 Subject: [PATCH 1/3] Add a flag to all db Model to_dict methods: include_relationships --- app/models.py | 119 ++++++++++++++++++++++++++++---------------------- 1 file changed, 67 insertions(+), 52 deletions(-) diff --git a/app/models.py b/app/models.py index ba139e91..a67c6975 100644 --- a/app/models.py +++ b/app/models.py @@ -151,25 +151,31 @@ class User(UserMixin, db.Model): def password(self, password): self.password_hash = generate_password_hash(password) - def to_dict(self): - return {'id': self.id, - 'role_id': self.role_id, - 'confirmed': self.confirmed, - 'email': self.email, - 'last_seen': self.last_seen.timestamp(), - 'member_since': self.member_since.timestamp(), - 'settings': {'dark_mode': self.setting_dark_mode, - 'job_status_mail_notifications': - self.setting_job_status_mail_notifications, - 'job_status_site_notifications': - self.setting_job_status_site_notifications}, - 'username': self.username, - 'corpora': {corpus.id: corpus.to_dict() - for corpus in self.corpora}, - 'jobs': {job.id: job.to_dict() for job in self.jobs}, - 'query_results': {query_result.id: query_result.to_dict() - for query_result in self.query_results}, - 'role': self.role.to_dict()} + def to_dict(self, include_relationships=True): + dict_user = { + 'id': self.id, + 'role_id': self.role_id, + 'confirmed': self.confirmed, + 'email': self.email, + 'last_seen': self.last_seen.timestamp(), + 'member_since': self.member_since.timestamp(), + 'settings': {'dark_mode': self.setting_dark_mode, + 'job_status_mail_notifications': + self.setting_job_status_mail_notifications, + 'job_status_site_notifications': + self.setting_job_status_site_notifications}, + 'username': self.username, + 'role': self.role.to_dict() + } + if include_relationships: + dict_user['corpora'] = {corpus.id: corpus.to_dict() + for corpus in self.corpora} + dict_user['jobs'] = {job.id: job.to_dict() for job in self.jobs} + dict_user['query_results'] = { + query_result.id: query_result.to_dict() + for query_result in self.query_results + } + return dict_user def __repr__(self): ''' @@ -301,7 +307,7 @@ class JobInput(db.Model): ''' return ''.format(self.filename) - def to_dict(self): + def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, @@ -341,7 +347,7 @@ class JobResult(db.Model): ''' return ''.format(self.filename) - def to_dict(self): + def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, @@ -420,22 +426,26 @@ class Job(db.Model): self.end_date = None self.status = 'submitted' - def to_dict(self): - return {'url': self.url, - 'id': self.id, - 'user_id': self.user_id, - 'creation_date': self.creation_date.timestamp(), - 'description': self.description, - 'end_date': (self.end_date.timestamp() if self.end_date else - None), - 'service': self.service, - 'service_args': self.service_args, - 'service_version': self.service_version, - 'status': self.status, - 'title': self.title, - 'inputs': {input.id: input.to_dict() for input in self.inputs}, - 'results': {result.id: result.to_dict() - for result in self.results}} + def to_dict(self, include_relationships=True): + dict_job = { + 'url': self.url, + 'id': self.id, + 'user_id': self.user_id, + 'creation_date': self.creation_date.timestamp(), + 'description': self.description, + 'end_date': self.end_date.timestamp() if self.end_date else None, + 'service': self.service, + 'service_args': self.service_args, + 'service_version': self.service_version, + 'status': self.status, + 'title': self.title, + } + if include_relationships: + dict_job['inputs'] = {input.id: input.to_dict() + for input in self.inputs} + dict_job['results'] = {result.id: result.to_dict() + for result in self.results} + return dict_job class CorpusFile(db.Model): @@ -485,7 +495,7 @@ class CorpusFile(db.Model): db.session.delete(self) self.corpus.status = 'unprepared' - def to_dict(self): + def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, @@ -539,19 +549,24 @@ class Corpus(db.Model): def url(self): return url_for('corpora.corpus', corpus_id=self.id) - def to_dict(self): - return {'analysis_url': self.analysis_url, - 'url': self.url, - 'id': self.id, - 'user_id': self.user_id, - 'creation_date': self.creation_date.timestamp(), - 'current_nr_of_tokens': self.current_nr_of_tokens, - 'description': self.description, - 'status': self.status, - 'last_edited_date': self.last_edited_date.timestamp(), - 'max_nr_of_tokens': self.max_nr_of_tokens, - 'title': self.title, - 'files': {file.id: file.to_dict() for file in self.files}} + def to_dict(self, include_relationships=True): + dict_corpus = { + 'analysis_url': self.analysis_url, + 'url': self.url, + 'id': self.id, + 'user_id': self.user_id, + 'creation_date': self.creation_date.timestamp(), + 'current_nr_of_tokens': self.current_nr_of_tokens, + 'description': self.description, + 'status': self.status, + 'last_edited_date': self.last_edited_date.timestamp(), + 'max_nr_of_tokens': self.max_nr_of_tokens, + 'title': self.title, + } + if include_relationships: + dict_corpus['files'] = {file.id: file.to_dict() + for file in self.files} + return dict_corpus def build(self): output_dir = os.path.join(self.path, 'merged') @@ -628,7 +643,7 @@ class QueryResult(db.Model): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) - def to_dict(self): + def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, From 98a43ec86f9d31cf8cadd65064b04ece1056df7c Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Wed, 18 Aug 2021 15:11:11 +0200 Subject: [PATCH 2/3] Use sqlalchemy events to emit jsonpatches to the client. --- app/__init__.py | 3 +- app/corpora/events.py | 2 +- app/jobs/tasks.py | 11 +- app/services/views.py | 6 +- app/{events.py => socketio_events.py} | 6 +- app/sqlalchemy_events.py | 161 ++++++++++++++++++++++++++ app/tasks/job_utils.py | 42 ------- 7 files changed, 169 insertions(+), 62 deletions(-) rename app/{events.py => socketio_events.py} (100%) create mode 100644 app/sqlalchemy_events.py diff --git a/app/__init__.py b/app/__init__.py index 75be4213..8a6b4325 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -32,7 +32,8 @@ def create_app(config_name): app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI']) with app.app_context(): - from . import events + from . import socketio_events + from . import sqlalchemy_events from .admin import admin as admin_blueprint from .auth import auth as auth_blueprint from .corpora import corpora as corpora_blueprint diff --git a/app/corpora/events.py b/app/corpora/events.py index 323bab74..8b4dd2ef 100644 --- a/app/corpora/events.py +++ b/app/corpora/events.py @@ -4,7 +4,7 @@ from flask_login import current_user from socket import gaierror from .. import db, socketio from ..decorators import socketio_login_required -from ..events import socketio_sessions +from ..socketio_events import socketio_sessions from ..models import Corpus import cqi import math diff --git a/app/jobs/tasks.py b/app/jobs/tasks.py index fb4223b8..08c2aa22 100644 --- a/app/jobs/tasks.py +++ b/app/jobs/tasks.py @@ -1,4 +1,4 @@ -from .. import db, socketio +from .. import db from ..decorators import background from ..models import Job @@ -9,12 +9,8 @@ def delete_job(job_id, *args, **kwargs): job = Job.query.get(job_id) if job is None: raise Exception('Job {} not found'.format(job_id)) - event = 'user_{}_patch'.format(job.user_id) - jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}] - room = 'user_{}'.format(job.user_id) job.delete() db.session.commit() - socketio.emit(event, jsonpatch, room=room) @background @@ -29,8 +25,3 @@ def restart_job(job_id, *args, **kwargs): pass else: db.session.commit() - event = 'user_{}_patch'.format(job.user_id) - jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa - {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa - room = 'user_{}'.format(job.user_id) - socketio.emit(event, jsonpatch, room=room) diff --git a/app/services/views.py b/app/services/views.py index 62af71f9..75712b34 100644 --- a/app/services/views.py +++ b/app/services/views.py @@ -4,7 +4,7 @@ from flask_login import current_user, login_required from werkzeug.utils import secure_filename from . import services from . import SERVICES -from .. import db, socketio +from .. import db from .forms import AddJobForms from ..models import Job, JobInput import json @@ -69,10 +69,6 @@ def service(service): job.status = 'submitted' db.session.commit() flash('Job "{}" added'.format(job.title), 'job') - event = 'user_{}_patch'.format(job.user_id) - jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa - room = 'user_{}'.format(job.user_id) - socketio.emit(event, jsonpatch, room=room) return make_response( {'redirect_url': url_for('jobs.job', job_id=job.id)}, 201) return render_template('services/{}.html.j2'.format(service.replace('-', '_')), diff --git a/app/events.py b/app/socketio_events.py similarity index 100% rename from app/events.py rename to app/socketio_events.py index 72cdef66..f03e194c 100644 --- a/app/events.py +++ b/app/socketio_events.py @@ -6,9 +6,6 @@ from .decorators import socketio_login_required from .models import User -############################################################################### -# Socket.IO event handlers # -############################################################################### ''' ' A list containing session ids of connected Socket.IO sessions, to keep track ' of all connected sessions, which can be used to determine the runtimes of @@ -17,6 +14,9 @@ from .models import User socketio_sessions = [] +############################################################################### +# Socket.IO event handlers # +############################################################################### @socketio.on('connect') @socketio_login_required def socketio_connect(): diff --git a/app/sqlalchemy_events.py b/app/sqlalchemy_events.py new file mode 100644 index 00000000..ea5e6003 --- /dev/null +++ b/app/sqlalchemy_events.py @@ -0,0 +1,161 @@ +from . import db, socketio +from .models import Job, JobInput, JobResult +import logging + + +############################################################################### +# SQLAlchemy event handlers # +############################################################################### + +############################################################################### +## Job events # +############################################################################### +@db.event.listens_for(Job, 'after_update') +def after_job_update(mapper, connection, job): + jsonpatch = [] + for attr in db.inspect(job).attrs: + # We don't want to emit changes about relationship fields + if attr.key in ['inputs', 'results']: + continue + history = attr.load_history() + if not history.has_changes(): + continue + new_value = history.added[0] + # DateTime attributes must be converted to a timestamp + if attr.key in ['creation_date', 'end_date']: + new_value = None if new_value is None else new_value.timestamp() + jsonpatch.append( + { + 'op': 'replace', + 'path': '/jobs/{}/{}'.format(job.id, attr.key), + 'value': new_value + } + ) + if jsonpatch: + event = 'user_{}_patch'.format(job.user_id) + room = 'user_{}'.format(job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(Job, 'after_insert') +def after_job_insert(mapper, connection, job): + event = 'user_{}_patch'.format(job.user_id) + jsonpatch = [ + { + 'op': 'add', + 'path': '/jobs/{}'.format(job.id), + 'value': job.to_dict(include_relationships=False) + } + ] + room = 'user_{}'.format(job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(Job, 'after_delete') +def after_job_delete(mapper, connection, job): + event = 'user_{}_patch'.format(job.user_id) + jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}] + room = 'user_{}'.format(job.user_id) + socketio.emit(event, jsonpatch, room=room) + +############################################################################### +## JobInput events # +############################################################################### +@db.event.listens_for(JobInput, 'after_update') +def after_job_input_update(mapper, connection, job_input): + jsonpatch = [] + for attr in db.inspect(job_input).attrs: + history = attr.load_history() + if not history.has_changes(): + continue + new_value = history.added[0] + jsonpatch.append( + { + 'op': 'replace', + 'path': '/jobs/{}/inputs/{}/{}'.format(job_input.job_id, + job_input.id, + attr.key), + 'value': new_value + } + ) + if jsonpatch: + event = 'user_{}_patch'.format(job_input.job.user_id) + room = 'user_{}'.format(job_input.job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(JobInput, 'after_insert') +def after_job_input_insert(mapper, connection, job_input): + event = 'user_{}_patch'.format(job_input.job.user_id) + jsonpatch = [ + { + 'op': 'add', + 'path': '/jobs/{}/inputs/{}'.format(job_input.job_id, + job_input.id), + 'value': job_input.to_dict(include_relationships=False) + } + ] + room = 'user_{}'.format(job_input.job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(JobInput, 'after_delete') +def after_job_input_delete(mapper, connection, job_input): + event = 'user_{}_patch'.format(job_input.job.user_id) + jsonpatch = [ + { + 'op': 'remove', + 'path': '/jobs/{}/inputs/{}'.format(job_input.job_id, + job_input.id) + } + ] + room = 'user_{}'.format(job_input.job.user_id) + socketio.emit(event, jsonpatch, room=room) + +############################################################################### +## JobResult events # +############################################################################### +@db.event.listens_for(JobResult, 'after_update') +def after_job_result_update(mapper, connection, job_result): + jsonpatch = [] + for attr in db.inspect(job_result).attrs: + history = attr.load_history() + if not history.has_changes(): + continue + new_value = history.added[0] + jsonpatch.append( + { + 'op': 'replace', + 'path': '/jobs/{}/results/{}/{}'.format(job_result.job_id, + job_result.id, + attr.key), + 'value': new_value + } + ) + if jsonpatch: + event = 'user_{}_patch'.format(job_result.job.user_id) + room = 'user_{}'.format(job_result.job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(JobResult, 'after_insert') +def after_job_result_insert(mapper, connection, job_result): + event = 'user_{}_patch'.format(job_result.job.user_id) + jsonpatch = [ + { + 'op': 'add', + 'path': '/jobs/{}/results/{}'.format(job_result.job_id, + job_result.id), + 'value': job_result.to_dict(include_relationships=False) + } + ] + room = 'user_{}'.format(job_result.job.user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(JobResult, 'after_delete') +def after_job_result_delete(mapper, connection, job_result): + event = 'user_{}_patch'.format(job_result.job.user_id) + jsonpatch = [ + { + 'op': 'remove', + 'path': '/jobs/{}/results/{}'.format(job_result.job_id, + job_result.id) + } + ] + room = 'user_{}'.format(job_result.job.user_id) + socketio.emit(event, jsonpatch, room=room) diff --git a/app/tasks/job_utils.py b/app/tasks/job_utils.py index ee804bbc..f1a2b6a8 100644 --- a/app/tasks/job_utils.py +++ b/app/tasks/job_utils.py @@ -105,12 +105,6 @@ class CheckJobsMixin: return else: job.status = 'queued' - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/status'.format(job.id), - 'value': job.status - } - self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -125,12 +119,6 @@ class CheckJobsMixin: + '(job.status: {} -> failed)'.format(job.status) ) job.status = 'failed' - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/status'.format(job.id), - 'value': job.status - } - self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) @@ -152,12 +140,6 @@ class CheckJobsMixin: task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/status'.format(job.id), - 'value': job.status - } - self.buffer_user_patch_operation(job, patch_operation) elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa try: service.remove() @@ -178,26 +160,8 @@ class CheckJobsMixin: db.session.add(job_result) db.session.flush() db.session.refresh(job_result) - patch_operation = { - 'op': 'add', - 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), # noqa - 'value': job_result.to_dict() - } - self.buffer_user_patch_operation(job, patch_operation) # noqa job.end_date = datetime.utcnow() - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/end_date'.format(job.id), - 'value': job.end_date.timestamp() - } - self.buffer_user_patch_operation(job, patch_operation) job.status = task_state - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/status'.format(job.id), - 'value': job.status - } - self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -207,12 +171,6 @@ class CheckJobsMixin: service = self.docker.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' - patch_operation = { - 'op': 'replace', - 'path': '/jobs/{}/status'.format(job.id), - 'value': job.status - } - self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) From 1a59b19124cefd8401525e98d188f6638c669898 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Fri, 20 Aug 2021 14:41:40 +0200 Subject: [PATCH 3/3] Handle all ressource events with unified sqlalchemy event handlers. --- app/corpora/tasks.py | 20 +--- app/corpora/views.py | 19 +--- app/sqlalchemy_events.py | 225 +++++++++++++++----------------------- app/tasks/__init__.py | 40 ------- app/tasks/corpus_utils.py | 54 --------- 5 files changed, 90 insertions(+), 268 deletions(-) diff --git a/app/corpora/tasks.py b/app/corpora/tasks.py index 3ad43740..f65a00de 100644 --- a/app/corpora/tasks.py +++ b/app/corpora/tasks.py @@ -1,4 +1,4 @@ -from .. import db, socketio +from .. import db from ..decorators import background from ..models import Corpus, CorpusFile, QueryResult @@ -12,11 +12,6 @@ def build_corpus(corpus_id, *args, **kwargs): raise Exception('Corpus {} not found'.format(corpus_id)) corpus.build() db.session.commit() - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa - {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) @background @@ -25,12 +20,8 @@ def delete_corpus(corpus_id, *args, **kwargs): corpus = Corpus.query.get(corpus_id) if corpus is None: raise Exception('Corpus {} not found'.format(corpus_id)) - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}] - room = 'user_{}'.format(corpus.user_id) corpus.delete() db.session.commit() - socketio.emit(event, jsonpatch, room=room) @background @@ -39,13 +30,8 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs): corpus_file = CorpusFile.query.get(corpus_file_id) if corpus_file is None: raise Exception('Corpus file {} not found'.format(corpus_file_id)) - event = 'user_{}_patch'.format(corpus_file.corpus.user_id) - jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa - {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa - room = 'user_{}'.format(corpus_file.corpus.user_id) corpus_file.delete() db.session.commit() - socketio.emit(event, jsonpatch, room=room) @background @@ -54,9 +40,5 @@ def delete_query_result(query_result_id, *args, **kwargs): query_result = QueryResult.query.get(query_result_id) if query_result is None: raise Exception('QueryResult {} not found'.format(query_result_id)) - event = 'user_{}_patch'.format(query_result.user_id) - jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa - room = 'user_{}'.format(query_result.user_id) query_result.delete() db.session.commit() - socketio.emit(event, jsonpatch, room=room) diff --git a/app/corpora/views.py b/app/corpora/views.py index 2e740cd6..32b20702 100644 --- a/app/corpora/views.py +++ b/app/corpora/views.py @@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm, DisplayOptionsForm, InspectDisplayOptionsForm, ImportCorpusForm) from jsonschema import validate -from .. import db, socketio +from .. import db from ..models import Corpus, CorpusFile, QueryResult import json import logging @@ -40,10 +40,6 @@ def add_corpus(): else: db.session.commit() flash('Corpus "{}" added!'.format(corpus.title), 'corpus') - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) return redirect(url_for('.corpus', corpus_id=corpus.id)) return render_template('corpora/add_corpus.html.j2', form=form, title='Add corpus') @@ -106,10 +102,6 @@ def import_corpus(): db.session.commit() os.remove(archive_file) flash('Corpus "{}" imported!'.format(corpus.title), 'corpus') - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) return make_response( {'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) else: @@ -212,11 +204,6 @@ def add_corpus_file(corpus_id): corpus.status = 'unprepared' db.session.commit() flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus') - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa - {'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa return render_template('corpora/add_corpus_file.html.j2', corpus=corpus, form=form, title='Add corpus file') @@ -356,10 +343,6 @@ def add_query_result(): query_result_file_content.pop('cpos_lookup') query_result.query_metadata = query_result_file_content db.session.commit() - event = 'user_{}_patch'.format(query_result.user_id) - jsonpatch = [{'op': 'add', 'path': '/query_results/{}'.format(query_result.id), 'value': query_result.to_dict()}] # noqa - room = 'user_{}'.format(query_result.user_id) - socketio.emit(event, jsonpatch, room=room) flash('Query result added!', 'result') return make_response({'redirect_url': url_for('.query_result', query_result_id=query_result.id)}, 201) # noqa return render_template('corpora/query_results/add_query_result.html.j2', diff --git a/app/sqlalchemy_events.py b/app/sqlalchemy_events.py index ea5e6003..54737a52 100644 --- a/app/sqlalchemy_events.py +++ b/app/sqlalchemy_events.py @@ -1,161 +1,112 @@ +from datetime import datetime from . import db, socketio -from .models import Job, JobInput, JobResult -import logging +from .models import Corpus, CorpusFile, Job, JobInput, JobResult ############################################################################### # SQLAlchemy event handlers # ############################################################################### +@db.event.listens_for(Corpus, 'after_delete') +@db.event.listens_for(CorpusFile, 'after_delete') +@db.event.listens_for(Job, 'after_delete') +@db.event.listens_for(JobInput, 'after_delete') +@db.event.listens_for(JobResult, 'after_delete') +def ressource_after_delete(mapper, connection, ressource): + if isinstance(ressource, Corpus): + user_id = ressource.creator.id + path = '/corpora/{}'.format(ressource.id) + elif isinstance(ressource, CorpusFile): + user_id = ressource.corpus.creator.id + path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id) + elif isinstance(ressource, Job): + user_id = ressource.creator.id + path = '/jobs/{}'.format(ressource.id) + elif isinstance(ressource, JobInput): + user_id = ressource.job.creator.id + path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) + elif isinstance(ressource, JobResult): + user_id = ressource.job.creator.id + path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id) + event = 'user_{}_patch'.format(user_id) + jsonpatch = [{'op': 'remove', 'path': path}] + room = 'user_{}'.format(user_id) + socketio.emit(event, jsonpatch, room=room) -############################################################################### -## Job events # -############################################################################### +@db.event.listens_for(Corpus, 'after_insert') +@db.event.listens_for(CorpusFile, 'after_insert') +@db.event.listens_for(Job, 'after_insert') +@db.event.listens_for(JobInput, 'after_insert') +@db.event.listens_for(JobResult, 'after_insert') +def ressource_after_insert_handler(mapper, connection, ressource): + if isinstance(ressource, Corpus): + user_id = ressource.creator.id + path = '/corpora/{}'.format(ressource.id) + elif isinstance(ressource, CorpusFile): + user_id = ressource.corpus.creator.id + path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id) + elif isinstance(ressource, Job): + user_id = ressource.creator.id + path = '/jobs/{}'.format(ressource.id) + elif isinstance(ressource, JobInput): + user_id = ressource.job.creator.id + path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) + elif isinstance(ressource, JobResult): + user_id = ressource.job.creator.id + path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id) + event = 'user_{}_patch'.format(user_id) + jsonpatch = [ + { + 'op': 'add', + 'path': path, + 'value': ressource.to_dict(include_relationships=False) + } + ] + room = 'user_{}'.format(user_id) + socketio.emit(event, jsonpatch, room=room) + +@db.event.listens_for(Corpus, 'after_update') +@db.event.listens_for(CorpusFile, 'after_update') @db.event.listens_for(Job, 'after_update') -def after_job_update(mapper, connection, job): +@db.event.listens_for(JobInput, 'after_update') +@db.event.listens_for(JobResult, 'after_update') +def ressource_after_update_handler(mapper, connection, ressource): + if isinstance(ressource, Corpus): + user_id = ressource.creator.id + base_path = '/corpora/{}'.format(ressource.id) + elif isinstance(ressource, CorpusFile): + user_id = ressource.corpus.creator.id + base_path = '/corpora/{}/files/{}'.format(ressource.corpus.id, + ressource.id) + elif isinstance(ressource, Job): + user_id = ressource.creator.id + base_path = '/jobs/{}'.format(ressource.id) + elif isinstance(ressource, JobInput): + user_id = ressource.job.creator.id + base_path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id) + elif isinstance(ressource, JobResult): + user_id = ressource.job.creator.id + base_path = '/jobs/{}/results/{}'.format(ressource.job.id, + ressource.id) jsonpatch = [] - for attr in db.inspect(job).attrs: + for attr in db.inspect(ressource).attrs: # We don't want to emit changes about relationship fields - if attr.key in ['inputs', 'results']: + if attr.key in ['files', 'inputs', 'results']: continue history = attr.load_history() if not history.has_changes(): continue new_value = history.added[0] # DateTime attributes must be converted to a timestamp - if attr.key in ['creation_date', 'end_date']: - new_value = None if new_value is None else new_value.timestamp() + if isinstance(new_value, datetime): + new_value = new_value.timestamp() jsonpatch.append( { 'op': 'replace', - 'path': '/jobs/{}/{}'.format(job.id, attr.key), + 'path': '{}/{}'.format(base_path, attr.key), 'value': new_value } ) if jsonpatch: - event = 'user_{}_patch'.format(job.user_id) - room = 'user_{}'.format(job.user_id) + event = 'user_{}_patch'.format(user_id) + room = 'user_{}'.format(user_id) socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(Job, 'after_insert') -def after_job_insert(mapper, connection, job): - event = 'user_{}_patch'.format(job.user_id) - jsonpatch = [ - { - 'op': 'add', - 'path': '/jobs/{}'.format(job.id), - 'value': job.to_dict(include_relationships=False) - } - ] - room = 'user_{}'.format(job.user_id) - socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(Job, 'after_delete') -def after_job_delete(mapper, connection, job): - event = 'user_{}_patch'.format(job.user_id) - jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}] - room = 'user_{}'.format(job.user_id) - socketio.emit(event, jsonpatch, room=room) - -############################################################################### -## JobInput events # -############################################################################### -@db.event.listens_for(JobInput, 'after_update') -def after_job_input_update(mapper, connection, job_input): - jsonpatch = [] - for attr in db.inspect(job_input).attrs: - history = attr.load_history() - if not history.has_changes(): - continue - new_value = history.added[0] - jsonpatch.append( - { - 'op': 'replace', - 'path': '/jobs/{}/inputs/{}/{}'.format(job_input.job_id, - job_input.id, - attr.key), - 'value': new_value - } - ) - if jsonpatch: - event = 'user_{}_patch'.format(job_input.job.user_id) - room = 'user_{}'.format(job_input.job.user_id) - socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(JobInput, 'after_insert') -def after_job_input_insert(mapper, connection, job_input): - event = 'user_{}_patch'.format(job_input.job.user_id) - jsonpatch = [ - { - 'op': 'add', - 'path': '/jobs/{}/inputs/{}'.format(job_input.job_id, - job_input.id), - 'value': job_input.to_dict(include_relationships=False) - } - ] - room = 'user_{}'.format(job_input.job.user_id) - socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(JobInput, 'after_delete') -def after_job_input_delete(mapper, connection, job_input): - event = 'user_{}_patch'.format(job_input.job.user_id) - jsonpatch = [ - { - 'op': 'remove', - 'path': '/jobs/{}/inputs/{}'.format(job_input.job_id, - job_input.id) - } - ] - room = 'user_{}'.format(job_input.job.user_id) - socketio.emit(event, jsonpatch, room=room) - -############################################################################### -## JobResult events # -############################################################################### -@db.event.listens_for(JobResult, 'after_update') -def after_job_result_update(mapper, connection, job_result): - jsonpatch = [] - for attr in db.inspect(job_result).attrs: - history = attr.load_history() - if not history.has_changes(): - continue - new_value = history.added[0] - jsonpatch.append( - { - 'op': 'replace', - 'path': '/jobs/{}/results/{}/{}'.format(job_result.job_id, - job_result.id, - attr.key), - 'value': new_value - } - ) - if jsonpatch: - event = 'user_{}_patch'.format(job_result.job.user_id) - room = 'user_{}'.format(job_result.job.user_id) - socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(JobResult, 'after_insert') -def after_job_result_insert(mapper, connection, job_result): - event = 'user_{}_patch'.format(job_result.job.user_id) - jsonpatch = [ - { - 'op': 'add', - 'path': '/jobs/{}/results/{}'.format(job_result.job_id, - job_result.id), - 'value': job_result.to_dict(include_relationships=False) - } - ] - room = 'user_{}'.format(job_result.job.user_id) - socketio.emit(event, jsonpatch, room=room) - -@db.event.listens_for(JobResult, 'after_delete') -def after_job_result_delete(mapper, connection, job_result): - event = 'user_{}_patch'.format(job_result.job.user_id) - jsonpatch = [ - { - 'op': 'remove', - 'path': '/jobs/{}/results/{}'.format(job_result.job_id, - job_result.id) - } - ] - room = 'user_{}'.format(job_result.job.user_id) - socketio.emit(event, jsonpatch, room=room) diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py index 3e6f7354..69096a7b 100644 --- a/app/tasks/__init__.py +++ b/app/tasks/__init__.py @@ -7,48 +7,8 @@ import docker class TaskRunner(CheckCorporaMixin, CheckJobsMixin): def __init__(self): self.docker = docker.from_env() - self._socketio_message_buffer = {} def run(self): self.check_corpora() self.check_jobs() db.session.commit() - self.flush_socketio_messages() - - def buffer_socketio_message(self, event, payload, room, - msg_id=None, override_policy='replace'): - if room not in self._socketio_message_buffer: - self._socketio_message_buffer[room] = {} - if event not in self._socketio_message_buffer[room]: - self._socketio_message_buffer[room][event] = {} - if msg_id is None: - msg_id = len(self._socketio_message_buffer[room][event].keys()) - if override_policy == 'append': - if msg_id in self._socketio_message_buffer[room][event]: - # If the current message value isn't a list, convert it! - if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa - self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa - else: - self._socketio_message_buffer[room][event][msg_id] = [] - self._socketio_message_buffer[room][event][msg_id].append(payload) - elif override_policy == 'replace': - self._socketio_message_buffer[room][event][msg_id] = payload - else: - raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa - return msg_id - - def buffer_user_patch_operation(self, ressource, patch_operation): - self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id), - patch_operation, - 'user_{}'.format(ressource.user_id), - msg_id='_', override_policy='append') - - def clear_socketio_message_buffer(self): - self._socketio_message_buffer = {} - - def flush_socketio_messages(self): - for room in self._socketio_message_buffer: - for event in self._socketio_message_buffer[room]: - for message in self._socketio_message_buffer[room][event]: - socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa - self.clear_socketio_message_buffer() diff --git a/app/tasks/corpus_utils.py b/app/tasks/corpus_utils.py index 9e8dbb04..3c7aeaa7 100644 --- a/app/tasks/corpus_utils.py +++ b/app/tasks/corpus_utils.py @@ -85,12 +85,6 @@ class CheckCorporaMixin: ) else: corpus.status = 'queued' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) def checkout_build_corpus_service(self, corpus): service_name = 'build-corpus_{}'.format(corpus.id) @@ -103,12 +97,6 @@ class CheckCorporaMixin: + '(corpus.status: {} -> failed)'.format(corpus.status) ) corpus.status = 'failed' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) @@ -128,12 +116,6 @@ class CheckCorporaMixin: task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) elif (corpus.status == 'running' and task_state in ['complete', 'failed']): try: @@ -148,12 +130,6 @@ class CheckCorporaMixin: else: corpus.status = \ 'prepared' if task_state == 'complete' else 'failed' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) def create_cqpserver_container(self, corpus): ''' # Docker container settings # ''' @@ -214,12 +190,6 @@ class CheckCorporaMixin: + 'non-zero exit code and detach is False.' ) corpus.status = 'failed' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.ImageNotFound: logging.error( 'Run "{}" container raised '.format(name) @@ -227,12 +197,6 @@ class CheckCorporaMixin: + 'exist.' ) corpus.status = 'failed' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Run "{}" container raised '.format(name) @@ -241,12 +205,6 @@ class CheckCorporaMixin: ) else: corpus.status = 'analysing' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) def checkout_analysing_corpus_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) @@ -255,12 +213,6 @@ class CheckCorporaMixin: except docker.errors.NotFound: logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa corpus.status = 'prepared' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) @@ -293,9 +245,3 @@ class CheckCorporaMixin: ) return corpus.status = 'prepared' - patch_operation = { - 'op': 'replace', - 'path': '/corpora/{}/status'.format(corpus.id), - 'value': corpus.status - } - self.buffer_user_patch_operation(corpus, patch_operation)