Use sqlalchemy events to emit jsonpatches to the client.

This commit is contained in:
Patrick Jentsch 2021-08-18 15:11:11 +02:00
parent 5662140a8d
commit 98a43ec86f
7 changed files with 169 additions and 62 deletions

View File

@ -32,7 +32,8 @@ def create_app(config_name):
app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI']) app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI'])
with app.app_context(): with app.app_context():
from . import events from . import socketio_events
from . import sqlalchemy_events
from .admin import admin as admin_blueprint from .admin import admin as admin_blueprint
from .auth import auth as auth_blueprint from .auth import auth as auth_blueprint
from .corpora import corpora as corpora_blueprint from .corpora import corpora as corpora_blueprint

View File

@ -4,7 +4,7 @@ from flask_login import current_user
from socket import gaierror from socket import gaierror
from .. import db, socketio from .. import db, socketio
from ..decorators import socketio_login_required from ..decorators import socketio_login_required
from ..events import socketio_sessions from ..socketio_events import socketio_sessions
from ..models import Corpus from ..models import Corpus
import cqi import cqi
import math import math

View File

@ -1,4 +1,4 @@
from .. import db, socketio from .. import db
from ..decorators import background from ..decorators import background
from ..models import Job from ..models import Job
@ -9,12 +9,8 @@ def delete_job(job_id, *args, **kwargs):
job = Job.query.get(job_id) job = Job.query.get(job_id)
if job is None: if job is None:
raise Exception('Job {} not found'.format(job_id)) raise Exception('Job {} not found'.format(job_id))
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
room = 'user_{}'.format(job.user_id)
job.delete() job.delete()
db.session.commit() db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background @background
@ -29,8 +25,3 @@ def restart_job(job_id, *args, **kwargs):
pass pass
else: else:
db.session.commit() db.session.commit()
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)

View File

@ -4,7 +4,7 @@ from flask_login import current_user, login_required
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from . import services from . import services
from . import SERVICES from . import SERVICES
from .. import db, socketio from .. import db
from .forms import AddJobForms from .forms import AddJobForms
from ..models import Job, JobInput from ..models import Job, JobInput
import json import json
@ -69,10 +69,6 @@ def service(service):
job.status = 'submitted' job.status = 'submitted'
db.session.commit() db.session.commit()
flash('Job "{}" added'.format(job.title), 'job') flash('Job "{}" added'.format(job.title), 'job')
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response( return make_response(
{'redirect_url': url_for('jobs.job', job_id=job.id)}, 201) {'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
return render_template('services/{}.html.j2'.format(service.replace('-', '_')), return render_template('services/{}.html.j2'.format(service.replace('-', '_')),

View File

@ -6,9 +6,6 @@ from .decorators import socketio_login_required
from .models import User from .models import User
###############################################################################
# Socket.IO event handlers #
###############################################################################
''' '''
' A list containing session ids of connected Socket.IO sessions, to keep track ' A list containing session ids of connected Socket.IO sessions, to keep track
' of all connected sessions, which can be used to determine the runtimes of ' of all connected sessions, which can be used to determine the runtimes of
@ -17,6 +14,9 @@ from .models import User
socketio_sessions = [] socketio_sessions = []
###############################################################################
# Socket.IO event handlers #
###############################################################################
@socketio.on('connect') @socketio.on('connect')
@socketio_login_required @socketio_login_required
def socketio_connect(): def socketio_connect():

161
app/sqlalchemy_events.py Normal file
View File

@ -0,0 +1,161 @@
from . import db, socketio
from .models import Job, JobInput, JobResult
import logging
###############################################################################
# SQLAlchemy event handlers #
###############################################################################
###############################################################################
## Job events #
###############################################################################
@db.event.listens_for(Job, 'after_update')
def after_job_update(mapper, connection, job):
jsonpatch = []
for attr in db.inspect(job).attrs:
# We don't want to emit changes about relationship fields
if attr.key in ['inputs', 'results']:
continue
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
# DateTime attributes must be converted to a timestamp
if attr.key in ['creation_date', 'end_date']:
new_value = None if new_value is None else new_value.timestamp()
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/{}'.format(job.id, attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job.user_id)
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Job, 'after_insert')
def after_job_insert(mapper, connection, job):
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}'.format(job.id),
'value': job.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Job, 'after_delete')
def after_job_delete(mapper, connection, job):
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
###############################################################################
## JobInput events #
###############################################################################
@db.event.listens_for(JobInput, 'after_update')
def after_job_input_update(mapper, connection, job_input):
jsonpatch = []
for attr in db.inspect(job_input).attrs:
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/inputs/{}/{}'.format(job_input.job_id,
job_input.id,
attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job_input.job.user_id)
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobInput, 'after_insert')
def after_job_input_insert(mapper, connection, job_input):
event = 'user_{}_patch'.format(job_input.job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
job_input.id),
'value': job_input.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobInput, 'after_delete')
def after_job_input_delete(mapper, connection, job_input):
event = 'user_{}_patch'.format(job_input.job.user_id)
jsonpatch = [
{
'op': 'remove',
'path': '/jobs/{}/inputs/{}'.format(job_input.job_id,
job_input.id)
}
]
room = 'user_{}'.format(job_input.job.user_id)
socketio.emit(event, jsonpatch, room=room)
###############################################################################
## JobResult events #
###############################################################################
@db.event.listens_for(JobResult, 'after_update')
def after_job_result_update(mapper, connection, job_result):
jsonpatch = []
for attr in db.inspect(job_result).attrs:
history = attr.load_history()
if not history.has_changes():
continue
new_value = history.added[0]
jsonpatch.append(
{
'op': 'replace',
'path': '/jobs/{}/results/{}/{}'.format(job_result.job_id,
job_result.id,
attr.key),
'value': new_value
}
)
if jsonpatch:
event = 'user_{}_patch'.format(job_result.job.user_id)
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobResult, 'after_insert')
def after_job_result_insert(mapper, connection, job_result):
event = 'user_{}_patch'.format(job_result.job.user_id)
jsonpatch = [
{
'op': 'add',
'path': '/jobs/{}/results/{}'.format(job_result.job_id,
job_result.id),
'value': job_result.to_dict(include_relationships=False)
}
]
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(JobResult, 'after_delete')
def after_job_result_delete(mapper, connection, job_result):
event = 'user_{}_patch'.format(job_result.job.user_id)
jsonpatch = [
{
'op': 'remove',
'path': '/jobs/{}/results/{}'.format(job_result.job_id,
job_result.id)
}
]
room = 'user_{}'.format(job_result.job.user_id)
socketio.emit(event, jsonpatch, room=room)

View File

@ -105,12 +105,6 @@ class CheckJobsMixin:
return return
else: else:
job.status = 'queued' job.status = 'queued'
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/status'.format(job.id),
'value': job.status
}
self.buffer_user_patch_operation(job, patch_operation)
finally: finally:
self.send_job_notification(job) self.send_job_notification(job)
@ -125,12 +119,6 @@ class CheckJobsMixin:
+ '(job.status: {} -> failed)'.format(job.status) + '(job.status: {} -> failed)'.format(job.status)
) )
job.status = 'failed' job.status = 'failed'
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/status'.format(job.id),
'value': job.status
}
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e: except docker.errors.APIError as e:
logging.error( logging.error(
'Get "{}" service raised '.format(service_name) 'Get "{}" service raised '.format(service_name)
@ -152,12 +140,6 @@ class CheckJobsMixin:
task_state = service_tasks[0].get('Status').get('State') task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending': if job.status == 'queued' and task_state != 'pending':
job.status = 'running' job.status = 'running'
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/status'.format(job.id),
'value': job.status
}
self.buffer_user_patch_operation(job, patch_operation)
elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa
try: try:
service.remove() service.remove()
@ -178,26 +160,8 @@ class CheckJobsMixin:
db.session.add(job_result) db.session.add(job_result)
db.session.flush() db.session.flush()
db.session.refresh(job_result) db.session.refresh(job_result)
patch_operation = {
'op': 'add',
'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), # noqa
'value': job_result.to_dict()
}
self.buffer_user_patch_operation(job, patch_operation) # noqa
job.end_date = datetime.utcnow() job.end_date = datetime.utcnow()
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/end_date'.format(job.id),
'value': job.end_date.timestamp()
}
self.buffer_user_patch_operation(job, patch_operation)
job.status = task_state job.status = task_state
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/status'.format(job.id),
'value': job.status
}
self.buffer_user_patch_operation(job, patch_operation)
finally: finally:
self.send_job_notification(job) self.send_job_notification(job)
@ -207,12 +171,6 @@ class CheckJobsMixin:
service = self.docker.services.get(service_name) service = self.docker.services.get(service_name)
except docker.errors.NotFound: except docker.errors.NotFound:
job.status = 'canceled' job.status = 'canceled'
patch_operation = {
'op': 'replace',
'path': '/jobs/{}/status'.format(job.id),
'value': job.status
}
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e: except docker.errors.APIError as e:
logging.error( logging.error(
'Get "{}" service raised '.format(service_name) 'Get "{}" service raised '.format(service_name)