handle email job status updates with the sqlalchemy event handlers

This commit is contained in:
Patrick Jentsch 2021-09-10 16:25:32 +02:00
parent 281e568edf
commit bc5c8ef074
3 changed files with 73 additions and 80 deletions

View File

@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
from .. import db, socketio from .. import db, mail, socketio
from ..models import Corpus, CorpusFile, Job, JobInput, JobResult from ..email import create_message
from ..models import Corpus, CorpusFile, Job, JobInput, JobResult, QueryResult
############################################################################### ###############################################################################
@ -11,102 +12,77 @@ from ..models import Corpus, CorpusFile, Job, JobInput, JobResult
@db.event.listens_for(Job, 'after_delete') @db.event.listens_for(Job, 'after_delete')
@db.event.listens_for(JobInput, 'after_delete') @db.event.listens_for(JobInput, 'after_delete')
@db.event.listens_for(JobResult, 'after_delete') @db.event.listens_for(JobResult, 'after_delete')
@db.event.listens_for(QueryResult, 'after_delete')
def ressource_after_delete(mapper, connection, ressource): def ressource_after_delete(mapper, connection, ressource):
if isinstance(ressource, Corpus): event = 'user_{}_patch'.format(ressource.user_id)
user_id = ressource.creator.id jsonpatch = [{'op': 'remove', 'path': ressource.jsonpatch_path}]
path = '/corpora/{}'.format(ressource.id) room = 'user_{}'.format(ressource.user_id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [{'op': 'remove', 'path': path}]
room = 'user_{}'.format(user_id)
socketio.emit(event, jsonpatch, room=room) socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Corpus, 'after_insert') @db.event.listens_for(Corpus, 'after_insert')
@db.event.listens_for(CorpusFile, 'after_insert') @db.event.listens_for(CorpusFile, 'after_insert')
@db.event.listens_for(Job, 'after_insert') @db.event.listens_for(Job, 'after_insert')
@db.event.listens_for(JobInput, 'after_insert') @db.event.listens_for(JobInput, 'after_insert')
@db.event.listens_for(JobResult, 'after_insert') @db.event.listens_for(JobResult, 'after_insert')
@db.event.listens_for(QueryResult, 'after_insert')
def ressource_after_insert_handler(mapper, connection, ressource): def ressource_after_insert_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus): event = 'user_{}_patch'.format(ressource.user_id)
user_id = ressource.creator.id
path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
path = '/corpora/{}/files/{}'.format(ressource.corpus.id, ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
path = '/jobs/{}/results/{}'.format(ressource.job.id, ressource.id)
event = 'user_{}_patch'.format(user_id)
jsonpatch = [ jsonpatch = [
{ {
'op': 'add', 'op': 'add',
'path': path, 'path': ressource.jsonpatch_path,
'value': ressource.to_dict(include_relationships=False) 'value': ressource.to_dict(include_relationships=False)
} }
] ]
room = 'user_{}'.format(user_id) room = 'user_{}'.format(ressource.user_id)
socketio.emit(event, jsonpatch, room=room) socketio.emit(event, jsonpatch, room=room)
@db.event.listens_for(Corpus, 'after_update') @db.event.listens_for(Corpus, 'after_update')
@db.event.listens_for(CorpusFile, 'after_update') @db.event.listens_for(CorpusFile, 'after_update')
@db.event.listens_for(Job, 'after_update') @db.event.listens_for(Job, 'after_update')
@db.event.listens_for(JobInput, 'after_update') @db.event.listens_for(JobInput, 'after_update')
@db.event.listens_for(JobResult, 'after_update') @db.event.listens_for(JobResult, 'after_update')
@db.event.listens_for(QueryResult, 'after_update')
def ressource_after_update_handler(mapper, connection, ressource): def ressource_after_update_handler(mapper, connection, ressource):
if isinstance(ressource, Corpus):
user_id = ressource.creator.id
base_path = '/corpora/{}'.format(ressource.id)
elif isinstance(ressource, CorpusFile):
user_id = ressource.corpus.creator.id
base_path = '/corpora/{}/files/{}'.format(ressource.corpus.id,
ressource.id)
elif isinstance(ressource, Job):
user_id = ressource.creator.id
base_path = '/jobs/{}'.format(ressource.id)
elif isinstance(ressource, JobInput):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/inputs/{}'.format(ressource.job.id, ressource.id)
elif isinstance(ressource, JobResult):
user_id = ressource.job.creator.id
base_path = '/jobs/{}/results/{}'.format(ressource.job.id,
ressource.id)
jsonpatch = [] jsonpatch = []
for attr in db.inspect(ressource).attrs: for attr in db.inspect(ressource).attrs:
# We don't want to emit changes about relationship fields # We don't want to handle changes in relationship fields
# TODO: Find a way to handle this without a hardcoded list
if attr.key in ['files', 'inputs', 'results']: if attr.key in ['files', 'inputs', 'results']:
continue continue
history = attr.load_history() history = attr.load_history()
if not history.has_changes(): if not history.has_changes():
continue continue
new_value = history.added[0] new_value = history.added[0]
# DateTime attributes must be converted to a string # In order to be JSON serializable, DateTime attributes must be
# converted to a string
if isinstance(new_value, datetime): if isinstance(new_value, datetime):
new_value = new_value.isoformat() new_value = new_value.isoformat()
jsonpatch.append( jsonpatch.append(
{ {
'op': 'replace', 'op': 'replace',
'path': '{}/{}'.format(base_path, attr.key), 'path': '{}/{}'.format(ressource.jsonpatch_path, attr.key),
'value': new_value 'value': new_value
} }
) )
# Job status update notification if it changed and wanted by the user
if isinstance(ressource, Job) and attr.key == 'status':
if ressource.creator.setting_job_status_mail_notifications == 'none': # noqa
pass
elif (ressource.creator.setting_job_status_mail_notifications == 'end' # noqa
and ressource.status not in ['complete', 'failed']):
pass
else:
msg = create_message(
ressource.creator.email,
'Status update for your Job "{}"'.format(ressource.title),
'tasks/email/notification',
job=ressource
)
mail.send(msg)
if jsonpatch: if jsonpatch:
event = 'user_{}_patch'.format(user_id) event = 'user_{}_patch'.format(ressource.user_id)
room = 'user_{}'.format(user_id) room = 'user_{}'.format(ressource.user_id)
socketio.emit(event, jsonpatch, room=room) socketio.emit(event, jsonpatch, room=room)

View File

@ -292,6 +292,10 @@ class JobInput(db.Model):
return url_for('jobs.download_job_input', job_id=self.job_id, return url_for('jobs.download_job_input', job_id=self.job_id,
job_input_id=self.id) job_input_id=self.id)
@property
def jsonpatch_path(self):
return '/jobs/{}/inputs/{}'.format(self.job_id, self.id)
@property @property
def path(self): def path(self):
return os.path.join(self.job.path, self.filename) return os.path.join(self.job.path, self.filename)
@ -301,6 +305,10 @@ class JobInput(db.Model):
return url_for('jobs.job', job_id=self.job_id, return url_for('jobs.job', job_id=self.job_id,
_anchor='job-{}-input-{}'.format(self.job_id, self.id)) _anchor='job-{}-input-{}'.format(self.job_id, self.id))
@property
def user_id(self):
return self.job.user_id
def __repr__(self): def __repr__(self):
''' '''
String representation of the JobInput. For human readability. String representation of the JobInput. For human readability.
@ -332,6 +340,10 @@ class JobResult(db.Model):
return url_for('jobs.download_job_result', job_id=self.job_id, return url_for('jobs.download_job_result', job_id=self.job_id,
job_result_id=self.id) job_result_id=self.id)
@property
def jsonpatch_path(self):
return '/jobs/{}/results/{}'.format(self.job_id, self.id)
@property @property
def path(self): def path(self):
return os.path.join(self.job.path, 'output', self.filename) return os.path.join(self.job.path, 'output', self.filename)
@ -341,6 +353,10 @@ class JobResult(db.Model):
return url_for('jobs.job', job_id=self.job_id, return url_for('jobs.job', job_id=self.job_id,
_anchor='job-{}-result-{}'.format(self.job_id, self.id)) _anchor='job-{}-result-{}'.format(self.job_id, self.id))
@property
def user_id(self):
return self.job.user_id
def __repr__(self): def __repr__(self):
''' '''
String representation of the JobResult. For human readability. String representation of the JobResult. For human readability.
@ -383,6 +399,10 @@ class Job(db.Model):
results = db.relationship('JobResult', backref='job', lazy='dynamic', results = db.relationship('JobResult', backref='job', lazy='dynamic',
cascade='save-update, merge, delete') cascade='save-update, merge, delete')
@property
def jsonpatch_path(self):
return '/jobs/{}'.format(self.id)
@property @property
def path(self): def path(self):
return os.path.join(self.creator.path, 'jobs', str(self.id)) return os.path.join(self.creator.path, 'jobs', str(self.id))
@ -479,6 +499,10 @@ class CorpusFile(db.Model):
return url_for('corpora.download_corpus_file', return url_for('corpora.download_corpus_file',
corpus_id=self.corpus_id, corpus_file_id=self.id) corpus_id=self.corpus_id, corpus_file_id=self.id)
@property
def jsonpatch_path(self):
return '/corpora/{}/files/{}'.format(self.corpus_id, self.id)
@property @property
def path(self): def path(self):
return os.path.join(self.corpus.path, self.filename) return os.path.join(self.corpus.path, self.filename)
@ -488,6 +512,10 @@ class CorpusFile(db.Model):
return url_for('corpora.corpus_file', corpus_id=self.corpus_id, return url_for('corpora.corpus_file', corpus_id=self.corpus_id,
corpus_file_id=self.id) corpus_file_id=self.id)
@property
def user_id(self):
return self.corpus.user_id
def delete(self): def delete(self):
try: try:
os.remove(self.path) os.remove(self.path)
@ -543,6 +571,10 @@ class Corpus(db.Model):
def analysis_url(self): def analysis_url(self):
return url_for('corpora.analyse_corpus', corpus_id=self.id) return url_for('corpora.analyse_corpus', corpus_id=self.id)
@property
def jsonpatch_path(self):
return '/corpora/{}'.format(self.id)
@property @property
def path(self): def path(self):
return os.path.join(self.creator.path, 'corpora', str(self.id)) return os.path.join(self.creator.path, 'corpora', str(self.id))
@ -632,6 +664,10 @@ class QueryResult(db.Model):
return url_for('corpora.download_query_result', return url_for('corpora.download_query_result',
query_result_id=self.id) query_result_id=self.id)
@property
def jsonpatch_path(self):
return '/query_results/{}'.format(self.id)
@property @property
def path(self): def path(self):
return os.path.join( return os.path.join(

View File

@ -1,8 +1,7 @@
from datetime import datetime from datetime import datetime
from flask import current_app from flask import current_app
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from .. import db, mail from .. import db
from ..email import create_message
from ..models import Job, JobResult from ..models import Job, JobResult
import docker import docker
import json import json
@ -105,8 +104,6 @@ class CheckJobsMixin:
return return
else: else:
job.status = 'queued' job.status = 'queued'
finally:
self.send_job_notification(job)
def checkout_job_service(self, job): def checkout_job_service(self, job):
service_name = 'job_{}'.format(job.id) service_name = 'job_{}'.format(job.id)
@ -162,8 +159,6 @@ class CheckJobsMixin:
db.session.refresh(job_result) db.session.refresh(job_result)
job.end_date = datetime.utcnow() job.end_date = datetime.utcnow()
job.status = task_state job.status = task_state
finally:
self.send_job_notification(job)
def remove_job_service(self, job): def remove_job_service(self, job):
service_name = 'job_{}'.format(job.id) service_name = 'job_{}'.format(job.id)
@ -203,17 +198,3 @@ class CheckJobsMixin:
+ '"docker.errors.APIError" The server returned an error. ' + '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e) + 'Details: {}'.format(e)
) )
def send_job_notification(self, job):
if job.creator.setting_job_status_mail_notifications == 'none':
return
if (job.creator.setting_job_status_mail_notifications == 'end'
and job.status not in ['complete', 'failed']):
return
msg = create_message(
job.creator.email,
'Status update for your Job "{}"'.format(job.title),
'tasks/email/notification',
job=job
)
mail.send(msg)