Remove user session loop. Instead send ressource updates directly on change

This commit is contained in:
Patrick Jentsch 2021-02-01 12:51:07 +01:00
parent ee9fdd1017
commit 996ed1c790
11 changed files with 513 additions and 379 deletions

View File

@ -4,12 +4,13 @@ from flask_login import current_user
from socket import gaierror
from .. import db, socketio
from ..decorators import socketio_login_required
from ..events import connected_sessions
from ..events import socketio_sessions
from ..models import Corpus, User
import cqi
import math
import os
import shutil
import logging
'''
@ -282,7 +283,7 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload}
socketio.emit('corpus_analysis_init', response, room=session_id)
''' Observe analysis session '''
while session_id in connected_sessions:
while session_id in socketio_sessions:
socketio.sleep(3)
''' Teardown analysis session '''
if client.status == 'running':

View File

@ -1,4 +1,4 @@
from .. import db
from .. import db, socketio
from ..decorators import background
from ..models import Corpus, CorpusFile, QueryResult
@ -12,6 +12,11 @@ def build_corpus(corpus_id, *args, **kwargs):
raise Exception('Corpus {} not found'.format(corpus_id))
corpus.build()
db.session.commit()
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
@background
@ -20,8 +25,12 @@ def delete_corpus(corpus_id, *args, **kwargs):
corpus = Corpus.query.get(corpus_id)
if corpus is None:
raise Exception('Corpus {} not found'.format(corpus_id))
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}]
room = 'user_{}'.format(corpus.user_id)
corpus.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
@ -30,8 +39,13 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs):
corpus_file = CorpusFile.query.get(corpus_file_id)
if corpus_file is None:
raise Exception('Corpus file {} not found'.format(corpus_file_id))
event = 'user_{}_patch'.format(corpus_file.corpus.user_id)
jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa
{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa
room = 'user_{}'.format(corpus_file.corpus.user_id)
corpus_file.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
@ -40,5 +54,9 @@ def delete_query_result(query_result_id, *args, **kwargs):
query_result = QueryResult.query.get(query_result_id)
if query_result is None:
raise Exception('QueryResult {} not found'.format(query_result_id))
event = 'user_{}_patch'.format(query_result.user_id)
jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa
room = 'user_{}'.format(query_result.user_id)
query_result.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)

View File

@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm,
DisplayOptionsForm, InspectDisplayOptionsForm,
ImportCorpusForm)
from jsonschema import validate
from .. import db
from .. import db, socketio
from ..models import Corpus, CorpusFile, QueryResult
import json
import logging
@ -29,15 +29,21 @@ def add_corpus():
description=form.description.data,
title=form.title.data)
db.session.add(corpus)
db.session.commit()
db.session.flush()
db.session.refresh(corpus)
try:
os.makedirs(corpus.path)
except OSError:
logging.error('Make dir {} led to an OSError!'.format(corpus.path))
db.session.delete(corpus)
db.session.commit()
db.session.rollback()
abort(500)
else:
db.session.commit()
flash('Corpus "{}" added!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return redirect(url_for('.corpus', corpus_id=corpus.id))
return render_template('corpora/add_corpus.html.j2', form=form,
title='Add corpus')
@ -54,13 +60,13 @@ def import_corpus():
description=form.description.data,
title=form.title.data)
db.session.add(corpus)
db.session.commit()
db.session.flush()
db.session.refresh(corpus)
try:
os.makedirs(corpus.path)
except OSError:
logging.error('Make dir {} led to an OSError!'.format(corpus.path))
db.session.delete(corpus)
db.session.commit()
db.session.rollback()
flash('Internal Server Error', 'error')
return make_response(
{'redirect_url': url_for('.import_corpus')}, 500)
@ -100,6 +106,10 @@ def import_corpus():
db.session.commit()
os.remove(archive_file)
flash('Corpus "{}" imported!'.format(corpus.title), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201)
else:
@ -205,6 +215,11 @@ def add_corpus_file(corpus_id):
corpus.status = 'unprepared'
db.session.commit()
flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus')
event = 'user_{}_patch'.format(corpus.user_id)
jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa
{'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa
room = 'user_{}'.format(corpus.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa
return render_template('corpora/add_corpus_file.html.j2', corpus=corpus,
form=form, title='Add corpus file')

View File

@ -1,73 +1,72 @@
from flask import current_app, request
from flask import request
from flask_login import current_user
from . import db, socketio
from .decorators import socketio_admin_required, socketio_login_required
from flask_socketio import join_room, leave_room
from . import socketio
from .decorators import socketio_login_required
from .models import User
import json
import jsonpatch
###############################################################################
# Socket.IO event handlers #
###############################################################################
'''
' A list containing session ids of connected Socket.IO sessions, to keep track
' of all connected sessions, which is used to determine the runtimes of
' of all connected sessions, which can be used to determine the runtimes of
' associated background tasks.
'''
connected_sessions = []
socketio_sessions = []
@socketio.on('connect')
def connect():
@socketio_login_required
def socketio_connect():
'''
' The Socket.IO module creates a session id (sid) for each request.
' On connect the sid is saved in the connected sessions list.
'''
connected_sessions.append(request.sid)
socketio_sessions.append(request.sid)
@socketio.on('disconnect')
def disconnect():
def socketio_disconnect():
'''
' On disconnect the session id gets removed from the connected sessions
' list.
'''
connected_sessions.remove(request.sid)
socketio_sessions.remove(request.sid)
@socketio.on('start_user_session')
@socketio_login_required
def start_user_session(user_id):
if not (current_user.id == user_id or current_user.is_administrator):
return
socketio.start_background_task(user_session,
current_app._get_current_object(),
user_id, request.sid)
def user_session(app, user_id, session_id):
'''
' Sends initial user data to the client. Afterwards it checks every 3s if
' changes to the initial values appeared. If changes are detected, a
' RFC 6902 compliant JSON patch gets send.
'''
init_event = 'user_{}_init'.format(user_id)
patch_event = 'user_{}_patch'.format(user_id)
with app.app_context():
# Gather current values from database.
def socketio_start_user_session(user_id):
user = User.query.get(user_id)
user_dict = user.to_dict()
# Send initial values to the client.
socketio.emit(init_event, json.dumps(user_dict), room=session_id)
while session_id in connected_sessions:
# Get new values from the database
db.session.refresh(user)
new_user_dict = user.to_dict()
# Compute JSON patches.
user_patch = jsonpatch.JsonPatch.from_diff(user_dict,
new_user_dict)
# In case there are patches, send them to the client.
if user_patch:
socketio.emit(patch_event, user_patch.to_string(),
room=session_id)
# Set new values as references for the next iteration.
user_dict = new_user_dict
socketio.sleep(3)
if user is None:
response = {'code': 404, 'msg': 'Not found'}
socketio.emit('start_user_session', response, room=request.sid)
elif not (user == current_user or current_user.is_administrator):
response = {'code': 403, 'msg': 'Forbidden'}
socketio.emit('start_user_session', response, room=request.sid)
else:
response = {'code': 200, 'msg': 'OK'}
socketio.emit('start_user_session', response, room=request.sid)
socketio.emit('user_{}_init'.format(user.id), user.to_dict(),
room=request.sid)
room = 'user_{}'.format(user.id)
join_room(room)
@socketio.on('stop_user_session')
@socketio_login_required
def socketio_stop_user_session(user_id):
user = User.query.get(user_id)
if user is None:
response = {'code': 404, 'msg': 'Not found'}
socketio.emit('stop_user_session', response, room=request.sid)
elif not (user == current_user or current_user.is_administrator):
response = {'code': 403, 'msg': 'Forbidden'}
socketio.emit('stop_user_session', response, room=request.sid)
else:
response = {'code': 200, 'msg': 'OK'}
socketio.emit('stop_user_session', response, room=request.sid)
room = 'user_{}'.format(user.id)
leave_room(room)

View File

@ -1,4 +1,4 @@
from .. import db
from .. import db, socketio
from ..decorators import background
from ..models import Job
@ -9,8 +9,12 @@ def delete_job(job_id, *args, **kwargs):
job = Job.query.get(job_id)
if job is None:
raise Exception('Job {} not found'.format(job_id))
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
room = 'user_{}'.format(job.user_id)
job.delete()
db.session.commit()
socketio.emit(event, jsonpatch, room=room)
@background
@ -19,5 +23,14 @@ def restart_job(job_id, *args, **kwargs):
job = Job.query.get(job_id)
if job is None:
raise Exception('Job {} not found'.format(job_id))
try:
job.restart()
except Exception:
pass
else:
db.session.commit()
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa
{'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)

View File

@ -2,7 +2,7 @@ from flask import abort, flash, make_response, render_template, url_for
from flask_login import current_user, login_required
from werkzeug.utils import secure_filename
from . import services
from .. import db
from .. import db, socketio
from ..jobs.forms import AddFileSetupJobForm, AddNLPJobForm, AddOCRJobForm
from ..models import Job, JobInput
import json
@ -70,6 +70,10 @@ def service(service):
job.status = 'submitted'
db.session.commit()
flash('Job "{}" added'.format(job.title), 'job')
event = 'user_{}_patch'.format(job.user_id)
jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa
room = 'user_{}'.format(job.user_id)
socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
return render_template('services/{}.html.j2'.format(service),

View File

@ -9,8 +9,8 @@ class AppClient {
if (userId in this.users) {return this.users[userId];}
let user = new User();
this.users[userId] = user;
this.socket.on(`user_${userId}_init`, msg => user.init(JSON.parse(msg)));
this.socket.on(`user_${userId}_patch`, msg => user.patch(JSON.parse(msg)));
this.socket.on(`user_${userId}_init`, msg => user.init(msg));
this.socket.on(`user_${userId}_patch`, msg => user.patch(msg));
this.socket.emit('start_user_session', userId);
return user;
}
@ -40,40 +40,44 @@ class User {
}
init(data) {
console.log('### User.init ###');
console.log(data);
this.data = data;
for (let [corpusId, eventListeners] of Object.entries(this.eventListeners.corpus)) {
if (corpusId === '*') {
for (let eventListener of eventListeners) {eventListener('init', this.data.corpora);}
for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (corpusId in this.data.corpora) {
for (let eventListener of eventListeners) {eventListener('init', this.data.corpora[corpusId]);}
for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
for (let [jobId, eventListeners] of Object.entries(this.eventListeners.job)) {
if (jobId === '*') {
for (let eventListener of eventListeners) {eventListener('init', this.data.jobs);}
for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (jobId in this.data.jobs) {
for (let eventListener of eventListeners) {eventListener('init', this.data.jobs[jobId]);}
for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
for (let [queryResultId, eventListeners] of Object.entries(this.eventListeners.queryResult)) {
if (queryResultId === '*') {
for (let eventListener of eventListeners) {eventListener('init', this.data.query_results);}
for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (queryResultId in this.data.query_results) {
for (let eventListener of eventListeners) {eventListener('init', this.data.query_results[queryResultId]);}
for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
}
patch(patch) {
console.log('### User.patch ###');
console.log(patch);
this.data = jsonpatch.apply_patch(this.data, patch);
let corporaPatch = patch.filter(operation => operation.path.startsWith("/corpora"));

View File

@ -1,31 +1,57 @@
from .. import db
from ..models import Corpus, Job
from .corpus_utils import CheckCorporaMixin
from .job_utils import CheckJobsMixin
from ..import db, socketio
import docker
docker_client = docker.from_env()
from . import corpus_utils, job_utils # noqa
class TaskRunner(CheckCorporaMixin, CheckJobsMixin):
def __init__(self):
self.docker = docker.from_env()
self._socketio_message_buffer = {}
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
corpus_utils.create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status in ['queued', 'running'], corpora): # noqa
corpus_utils.checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis', corpora): # noqa
corpus_utils.create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis', corpora): # noqa
corpus_utils.remove_cqpserver_container(corpus)
def run(self):
self.check_corpora()
self.check_jobs()
db.session.commit()
self.flush_socketio_messages()
def buffer_socketio_message(self, event, payload, room,
msg_id=None, override_policy='replace'):
if room not in self._socketio_message_buffer:
self._socketio_message_buffer[room] = {}
if event not in self._socketio_message_buffer[room]:
self._socketio_message_buffer[room][event] = {}
if msg_id is None:
msg_id = len(self._socketio_message_buffer[room][event].keys())
if override_policy == 'append':
if msg_id in self._socketio_message_buffer[room][event]:
# If the current message value isn't a list, convert it!
if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa
self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa
else:
self._socketio_message_buffer[room][event][msg_id] = []
self._socketio_message_buffer[room][event][msg_id].append(payload)
elif override_policy == 'replace':
self._socketio_message_buffer[room][event][msg_id] = payload
else:
raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa
return msg_id
def buffer_user_patch_operation(self, ressource, patch_operation):
self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id),
patch_operation,
'user_{}'.format(ressource.user_id),
msg_id='_', override_policy='append')
def clear_socketio_message_buffer(self):
self._socketio_message_buffer = {}
def flush_socketio_messages(self):
for room in self._socketio_message_buffer:
for event in self._socketio_message_buffer[room]:
for message in self._socketio_message_buffer[room][event]:
socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa
self.clear_socketio_message_buffer()
def check_jobs():
jobs = Job.query.all()
for job in filter(lambda job: job.status == 'submitted', jobs):
job_utils.create_job_service(job)
for job in filter(lambda job: job.status in ['queued', 'running'], jobs):
job_utils.checkout_job_service(job)
for job in filter(lambda job: job.status == 'canceling', jobs):
job_utils.remove_job_service(job)
db.session.commit()
task_runner = TaskRunner()

View File

@ -1,11 +1,28 @@
from . import docker_client
from ..models import Corpus
import docker
import logging
import os
import shutil
def create_build_corpus_service(corpus):
class CheckCorporaMixin:
def check_corpora(self):
corpora = Corpus.query.all()
queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora))
running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora))
start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora))
stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora))
submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora))
for corpus in submitted_corpora:
self.create_build_corpus_service(corpus)
for corpus in queued_corpora + running_corpora:
self.checkout_build_corpus_service(corpus)
for corpus in start_analysis_corpora:
self.create_cqpserver_container(corpus)
for corpus in stop_analysis_corpora:
self.remove_cqpserver_container(corpus)
def create_build_corpus_service(self, corpus):
corpus_data_dir = os.path.join(corpus.path, 'data')
shutil.rmtree(corpus_data_dir, ignore_errors=True)
os.mkdir(corpus_data_dir)
@ -28,7 +45,7 @@ def create_build_corpus_service(corpus):
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
docker_client.services.create(service_image, **service_kwargs)
self.docker.services.create(service_image, **service_kwargs)
except docker.errors.APIError as e:
logging.error(
'Create "{}" service raised '.format(service_kwargs['name'])
@ -37,12 +54,13 @@ def create_build_corpus_service(corpus):
)
else:
corpus.status = 'queued'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
def checkout_build_corpus_service(corpus):
def checkout_build_corpus_service(self, corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
logging.error(
'Get "{}" service raised '.format(service_name)
@ -50,6 +68,8 @@ def checkout_build_corpus_service(corpus):
+ '(corpus.status: {} -> failed)'.format(corpus.status)
)
corpus.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
@ -69,6 +89,8 @@ def checkout_build_corpus_service(corpus):
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
elif (corpus.status == 'running'
and task_state in ['complete', 'failed']):
try:
@ -83,9 +105,10 @@ def checkout_build_corpus_service(corpus):
else:
corpus.status = 'prepared' if task_state == 'complete' \
else 'failed'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
def create_cqpserver_container(corpus):
def create_cqpserver_container(self, corpus):
corpus_data_dir = os.path.join(corpus.path, 'data')
corpus_registry_dir = os.path.join(corpus.path, 'registry')
container_kwargs = {
@ -101,7 +124,7 @@ def create_cqpserver_container(corpus):
# Check if a cqpserver container already exists. If this is the case,
# remove it and create a new one
try:
container = docker_client.containers.get(container_kwargs['name'])
container = self.docker.containers.get(container_kwargs['name'])
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
@ -116,13 +139,13 @@ def create_cqpserver_container(corpus):
container.remove(force=True)
except docker.errors.APIError as e:
logging.error(
'Remove "{}" container raised '.format(container_kwargs['name']) # noqa
'Remove "{}" container raised '.format(container_kwargs['name'])
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
try:
docker_client.containers.run(container_image, **container_kwargs)
self.docker.containers.run(container_image, **container_kwargs)
except docker.errors.ContainerError:
# This case should not occur, because detach is True.
logging.error(
@ -131,6 +154,8 @@ def create_cqpserver_container(corpus):
+ 'non-zero exit code and detach is False.'
)
corpus.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.ImageNotFound:
logging.error(
'Run "{}" container raised '.format(container_kwargs['name'])
@ -138,6 +163,8 @@ def create_cqpserver_container(corpus):
+ 'exist.'
)
corpus.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Run "{}" container raised '.format(container_kwargs['name'])
@ -146,12 +173,13 @@ def create_cqpserver_container(corpus):
)
else:
corpus.status = 'analysing'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)
def remove_cqpserver_container(corpus):
def remove_cqpserver_container(self, corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
container = self.docker.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
@ -172,3 +200,5 @@ def remove_cqpserver_container(corpus):
)
return
corpus.status = 'prepared'
patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
self.buffer_user_patch_operation(corpus, patch_operation)

View File

@ -1,16 +1,29 @@
from datetime import datetime
from werkzeug.utils import secure_filename
from . import docker_client
from .. import db, mail
from ..email import create_message
from ..models import JobResult
from ..models import Job, JobResult
import docker
import logging
import json
import os
def create_job_service(job):
class CheckJobsMixin:
def check_jobs(self):
jobs = Job.query.all()
canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs))
queued_jobs = list(filter(lambda job: job.status == 'queued', jobs))
running_jobs = list(filter(lambda job: job.status == 'running', jobs))
submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs))
for job in submitted_jobs:
self.create_job_service(job)
for job in queued_jobs + running_jobs:
self.checkout_job_service(job)
for job in canceling_jobs:
self.remove_job_service(job)
def create_job_service(self, job):
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(secure_filename(job.title))
@ -29,10 +42,9 @@ def create_job_service(job):
mem_reservation=job.mem_mb * (10 ** 6)
),
'restart_policy': docker.types.RestartPolicy()}
service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
job.service, job.service_version)
service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version)
try:
docker_client.services.create(service_image, **service_kwargs)
self.docker.services.create(service_image, **service_kwargs)
except docker.errors.APIError as e:
logging.error(
'Create "{}" service raised '.format(service_kwargs['name'])
@ -42,19 +54,22 @@ def create_job_service(job):
return
else:
job.status = 'queued'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
self.buffer_user_patch_operation(job, patch_operation)
finally:
send_notification(job)
self.send_job_notification(job)
def checkout_job_service(job):
def checkout_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
logging.error('Get "{}" service raised '.format(service_name)
+ '"docker.errors.NotFound" The service does not exist. '
+ '(job.status: {} -> failed)'.format(job.status))
job.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
@ -76,6 +91,8 @@ def checkout_job_service(job):
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
self.buffer_user_patch_operation(job, patch_operation)
elif job.status == 'running' and task_state in ['complete', 'failed']:
try:
service.remove()
@ -88,24 +105,33 @@ def checkout_job_service(job):
return
else:
if task_state == 'complete':
job_results_dir = os.path.join(job.path, 'output')
job_results = filter(lambda x: x.endswith('.zip'),
os.listdir(job_results_dir))
for job_result in job_results:
job_result = JobResult(filename=job_result, job=job)
results_dir = os.path.join(job.path, 'output')
result_files = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result_file in result_files:
job_result = JobResult(filename=result_file, job=job)
db.session.add(job_result)
db.session.flush()
db.session.refresh(job_result)
patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()}
self.buffer_user_patch_operation(job, patch_operation)
job.end_date = datetime.utcnow()
patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}
self.buffer_user_patch_operation(job, patch_operation)
job.status = task_state
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
self.buffer_user_patch_operation(job, patch_operation)
finally:
send_notification(job)
self.send_job_notification(job)
def remove_job_service(job):
def remove_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
@ -139,8 +165,7 @@ def remove_job_service(job):
+ 'Details: {}'.format(e)
)
def send_notification(job):
def send_job_notification(self, job):
if job.creator.setting_job_status_mail_notifications == 'none':
return
if (job.creator.setting_job_status_mail_notifications == 'end'

View File

@ -50,9 +50,8 @@ def deploy():
@app.cli.command()
def tasks():
from app.tasks import check_corpora, check_jobs
check_corpora()
check_jobs()
from app.tasks import task_runner
task_runner.run()
@app.cli.command()