diff --git a/web/app/corpora/events.py b/web/app/corpora/events.py index 7e085814..22804fbe 100644 --- a/web/app/corpora/events.py +++ b/web/app/corpora/events.py @@ -4,12 +4,13 @@ from flask_login import current_user from socket import gaierror from .. import db, socketio from ..decorators import socketio_login_required -from ..events import connected_sessions +from ..events import socketio_sessions from ..models import Corpus, User import cqi import math import os import shutil +import logging ''' @@ -282,7 +283,7 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} socketio.emit('corpus_analysis_init', response, room=session_id) ''' Observe analysis session ''' - while session_id in connected_sessions: + while session_id in socketio_sessions: socketio.sleep(3) ''' Teardown analysis session ''' if client.status == 'running': diff --git a/web/app/corpora/tasks.py b/web/app/corpora/tasks.py index f65a00de..3ad43740 100644 --- a/web/app/corpora/tasks.py +++ b/web/app/corpora/tasks.py @@ -1,4 +1,4 @@ -from .. import db +from .. import db, socketio from ..decorators import background from ..models import Corpus, CorpusFile, QueryResult @@ -12,6 +12,11 @@ def build_corpus(corpus_id, *args, **kwargs): raise Exception('Corpus {} not found'.format(corpus_id)) corpus.build() db.session.commit() + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa + {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa + room = 'user_{}'.format(corpus.user_id) + socketio.emit(event, jsonpatch, room=room) @background @@ -20,8 +25,12 @@ def delete_corpus(corpus_id, *args, **kwargs): corpus = Corpus.query.get(corpus_id) if corpus is None: raise Exception('Corpus {} not found'.format(corpus_id)) + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}] + room = 'user_{}'.format(corpus.user_id) corpus.delete() db.session.commit() + socketio.emit(event, jsonpatch, room=room) @background @@ -30,8 +39,13 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs): corpus_file = CorpusFile.query.get(corpus_file_id) if corpus_file is None: raise Exception('Corpus file {} not found'.format(corpus_file_id)) + event = 'user_{}_patch'.format(corpus_file.corpus.user_id) + jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa + {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa + room = 'user_{}'.format(corpus_file.corpus.user_id) corpus_file.delete() db.session.commit() + socketio.emit(event, jsonpatch, room=room) @background @@ -40,5 +54,9 @@ def delete_query_result(query_result_id, *args, **kwargs): query_result = QueryResult.query.get(query_result_id) if query_result is None: raise Exception('QueryResult {} not found'.format(query_result_id)) + event = 'user_{}_patch'.format(query_result.user_id) + jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa + room = 'user_{}'.format(query_result.user_id) query_result.delete() db.session.commit() + socketio.emit(event, jsonpatch, room=room) diff --git a/web/app/corpora/views.py b/web/app/corpora/views.py index d4382211..cef864c9 100644 --- a/web/app/corpora/views.py +++ b/web/app/corpora/views.py @@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm, DisplayOptionsForm, InspectDisplayOptionsForm, ImportCorpusForm) from jsonschema import validate -from .. import db +from .. import db, socketio from ..models import Corpus, CorpusFile, QueryResult import json import logging @@ -29,16 +29,22 @@ def add_corpus(): description=form.description.data, title=form.title.data) db.session.add(corpus) - db.session.commit() + db.session.flush() + db.session.refresh(corpus) try: os.makedirs(corpus.path) except OSError: logging.error('Make dir {} led to an OSError!'.format(corpus.path)) - db.session.delete(corpus) - db.session.commit() + db.session.rollback() abort(500) - flash('Corpus "{}" added!'.format(corpus.title), 'corpus') - return redirect(url_for('.corpus', corpus_id=corpus.id)) + else: + db.session.commit() + flash('Corpus "{}" added!'.format(corpus.title), 'corpus') + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa + room = 'user_{}'.format(corpus.user_id) + socketio.emit(event, jsonpatch, room=room) + return redirect(url_for('.corpus', corpus_id=corpus.id)) return render_template('corpora/add_corpus.html.j2', form=form, title='Add corpus') @@ -54,13 +60,13 @@ def import_corpus(): description=form.description.data, title=form.title.data) db.session.add(corpus) - db.session.commit() + db.session.flush() + db.session.refresh(corpus) try: os.makedirs(corpus.path) except OSError: logging.error('Make dir {} led to an OSError!'.format(corpus.path)) - db.session.delete(corpus) - db.session.commit() + db.session.rollback() flash('Internal Server Error', 'error') return make_response( {'redirect_url': url_for('.import_corpus')}, 500) @@ -100,6 +106,10 @@ def import_corpus(): db.session.commit() os.remove(archive_file) flash('Corpus "{}" imported!'.format(corpus.title), 'corpus') + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa + room = 'user_{}'.format(corpus.user_id) + socketio.emit(event, jsonpatch, room=room) return make_response( {'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) else: @@ -205,6 +215,11 @@ def add_corpus_file(corpus_id): corpus.status = 'unprepared' db.session.commit() flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus') + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa + {'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa + room = 'user_{}'.format(corpus.user_id) + socketio.emit(event, jsonpatch, room=room) return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa return render_template('corpora/add_corpus_file.html.j2', corpus=corpus, form=form, title='Add corpus file') diff --git a/web/app/events.py b/web/app/events.py index a0f76b3b..72cdef66 100644 --- a/web/app/events.py +++ b/web/app/events.py @@ -1,73 +1,72 @@ -from flask import current_app, request +from flask import request from flask_login import current_user -from . import db, socketio -from .decorators import socketio_admin_required, socketio_login_required +from flask_socketio import join_room, leave_room +from . import socketio +from .decorators import socketio_login_required from .models import User -import json -import jsonpatch +############################################################################### +# Socket.IO event handlers # +############################################################################### ''' ' A list containing session ids of connected Socket.IO sessions, to keep track -' of all connected sessions, which is used to determine the runtimes of +' of all connected sessions, which can be used to determine the runtimes of ' associated background tasks. ''' -connected_sessions = [] +socketio_sessions = [] @socketio.on('connect') -def connect(): +@socketio_login_required +def socketio_connect(): ''' ' The Socket.IO module creates a session id (sid) for each request. ' On connect the sid is saved in the connected sessions list. ''' - connected_sessions.append(request.sid) + socketio_sessions.append(request.sid) @socketio.on('disconnect') -def disconnect(): +def socketio_disconnect(): ''' ' On disconnect the session id gets removed from the connected sessions ' list. ''' - connected_sessions.remove(request.sid) + socketio_sessions.remove(request.sid) @socketio.on('start_user_session') @socketio_login_required -def start_user_session(user_id): - if not (current_user.id == user_id or current_user.is_administrator): - return - socketio.start_background_task(user_session, - current_app._get_current_object(), - user_id, request.sid) +def socketio_start_user_session(user_id): + user = User.query.get(user_id) + if user is None: + response = {'code': 404, 'msg': 'Not found'} + socketio.emit('start_user_session', response, room=request.sid) + elif not (user == current_user or current_user.is_administrator): + response = {'code': 403, 'msg': 'Forbidden'} + socketio.emit('start_user_session', response, room=request.sid) + else: + response = {'code': 200, 'msg': 'OK'} + socketio.emit('start_user_session', response, room=request.sid) + socketio.emit('user_{}_init'.format(user.id), user.to_dict(), + room=request.sid) + room = 'user_{}'.format(user.id) + join_room(room) -def user_session(app, user_id, session_id): - ''' - ' Sends initial user data to the client. Afterwards it checks every 3s if - ' changes to the initial values appeared. If changes are detected, a - ' RFC 6902 compliant JSON patch gets send. - ''' - init_event = 'user_{}_init'.format(user_id) - patch_event = 'user_{}_patch'.format(user_id) - with app.app_context(): - # Gather current values from database. - user = User.query.get(user_id) - user_dict = user.to_dict() - # Send initial values to the client. - socketio.emit(init_event, json.dumps(user_dict), room=session_id) - while session_id in connected_sessions: - # Get new values from the database - db.session.refresh(user) - new_user_dict = user.to_dict() - # Compute JSON patches. - user_patch = jsonpatch.JsonPatch.from_diff(user_dict, - new_user_dict) - # In case there are patches, send them to the client. - if user_patch: - socketio.emit(patch_event, user_patch.to_string(), - room=session_id) - # Set new values as references for the next iteration. - user_dict = new_user_dict - socketio.sleep(3) +@socketio.on('stop_user_session') +@socketio_login_required +def socketio_stop_user_session(user_id): + user = User.query.get(user_id) + if user is None: + response = {'code': 404, 'msg': 'Not found'} + socketio.emit('stop_user_session', response, room=request.sid) + elif not (user == current_user or current_user.is_administrator): + response = {'code': 403, 'msg': 'Forbidden'} + socketio.emit('stop_user_session', response, room=request.sid) + else: + response = {'code': 200, 'msg': 'OK'} + socketio.emit('stop_user_session', response, room=request.sid) + room = 'user_{}'.format(user.id) + leave_room(room) diff --git a/web/app/jobs/tasks.py b/web/app/jobs/tasks.py index 8ab823dc..fb4223b8 100644 --- a/web/app/jobs/tasks.py +++ b/web/app/jobs/tasks.py @@ -1,4 +1,4 @@ -from .. import db +from .. import db, socketio from ..decorators import background from ..models import Job @@ -9,8 +9,12 @@ def delete_job(job_id, *args, **kwargs): job = Job.query.get(job_id) if job is None: raise Exception('Job {} not found'.format(job_id)) + event = 'user_{}_patch'.format(job.user_id) + jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}] + room = 'user_{}'.format(job.user_id) job.delete() db.session.commit() + socketio.emit(event, jsonpatch, room=room) @background @@ -19,5 +23,14 @@ def restart_job(job_id, *args, **kwargs): job = Job.query.get(job_id) if job is None: raise Exception('Job {} not found'.format(job_id)) - job.restart() - db.session.commit() + try: + job.restart() + except Exception: + pass + else: + db.session.commit() + event = 'user_{}_patch'.format(job.user_id) + jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa + {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa + room = 'user_{}'.format(job.user_id) + socketio.emit(event, jsonpatch, room=room) diff --git a/web/app/services/views.py b/web/app/services/views.py index bd4bf3fa..1d81e6a8 100644 --- a/web/app/services/views.py +++ b/web/app/services/views.py @@ -2,7 +2,7 @@ from flask import abort, flash, make_response, render_template, url_for from flask_login import current_user, login_required from werkzeug.utils import secure_filename from . import services -from .. import db +from .. import db, socketio from ..jobs.forms import AddFileSetupJobForm, AddNLPJobForm, AddOCRJobForm from ..models import Job, JobInput import json @@ -70,6 +70,10 @@ def service(service): job.status = 'submitted' db.session.commit() flash('Job "{}" added'.format(job.title), 'job') + event = 'user_{}_patch'.format(job.user_id) + jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa + room = 'user_{}'.format(job.user_id) + socketio.emit(event, jsonpatch, room=room) return make_response( {'redirect_url': url_for('jobs.job', job_id=job.id)}, 201) return render_template('services/{}.html.j2'.format(service), diff --git a/web/app/static/js/nopaque/main.js b/web/app/static/js/nopaque/main.js index c8f886be..6435186f 100644 --- a/web/app/static/js/nopaque/main.js +++ b/web/app/static/js/nopaque/main.js @@ -9,8 +9,8 @@ class AppClient { if (userId in this.users) {return this.users[userId];} let user = new User(); this.users[userId] = user; - this.socket.on(`user_${userId}_init`, msg => user.init(JSON.parse(msg))); - this.socket.on(`user_${userId}_patch`, msg => user.patch(JSON.parse(msg))); + this.socket.on(`user_${userId}_init`, msg => user.init(msg)); + this.socket.on(`user_${userId}_patch`, msg => user.patch(msg)); this.socket.emit('start_user_session', userId); return user; } @@ -40,40 +40,44 @@ class User { } init(data) { + console.log('### User.init ###'); + console.log(data); this.data = data; for (let [corpusId, eventListeners] of Object.entries(this.eventListeners.corpus)) { if (corpusId === '*') { - for (let eventListener of eventListeners) {eventListener('init', this.data.corpora);} + for (let eventListener of eventListeners) {eventListener('init');} } else { if (corpusId in this.data.corpora) { - for (let eventListener of eventListeners) {eventListener('init', this.data.corpora[corpusId]);} + for (let eventListener of eventListeners) {eventListener('init');} } } } for (let [jobId, eventListeners] of Object.entries(this.eventListeners.job)) { if (jobId === '*') { - for (let eventListener of eventListeners) {eventListener('init', this.data.jobs);} + for (let eventListener of eventListeners) {eventListener('init');} } else { if (jobId in this.data.jobs) { - for (let eventListener of eventListeners) {eventListener('init', this.data.jobs[jobId]);} + for (let eventListener of eventListeners) {eventListener('init');} } } } for (let [queryResultId, eventListeners] of Object.entries(this.eventListeners.queryResult)) { if (queryResultId === '*') { - for (let eventListener of eventListeners) {eventListener('init', this.data.query_results);} + for (let eventListener of eventListeners) {eventListener('init');} } else { if (queryResultId in this.data.query_results) { - for (let eventListener of eventListeners) {eventListener('init', this.data.query_results[queryResultId]);} + for (let eventListener of eventListeners) {eventListener('init');} } } } } patch(patch) { + console.log('### User.patch ###'); + console.log(patch); this.data = jsonpatch.apply_patch(this.data, patch); let corporaPatch = patch.filter(operation => operation.path.startsWith("/corpora")); diff --git a/web/app/tasks/__init__.py b/web/app/tasks/__init__.py index 154841b7..d19a4b8f 100644 --- a/web/app/tasks/__init__.py +++ b/web/app/tasks/__init__.py @@ -1,31 +1,57 @@ -from .. import db -from ..models import Corpus, Job +from .corpus_utils import CheckCorporaMixin +from .job_utils import CheckJobsMixin +from ..import db, socketio import docker -docker_client = docker.from_env() -from . import corpus_utils, job_utils # noqa +class TaskRunner(CheckCorporaMixin, CheckJobsMixin): + def __init__(self): + self.docker = docker.from_env() + self._socketio_message_buffer = {} + + def run(self): + self.check_corpora() + self.check_jobs() + db.session.commit() + self.flush_socketio_messages() + + def buffer_socketio_message(self, event, payload, room, + msg_id=None, override_policy='replace'): + if room not in self._socketio_message_buffer: + self._socketio_message_buffer[room] = {} + if event not in self._socketio_message_buffer[room]: + self._socketio_message_buffer[room][event] = {} + if msg_id is None: + msg_id = len(self._socketio_message_buffer[room][event].keys()) + if override_policy == 'append': + if msg_id in self._socketio_message_buffer[room][event]: + # If the current message value isn't a list, convert it! + if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa + self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa + else: + self._socketio_message_buffer[room][event][msg_id] = [] + self._socketio_message_buffer[room][event][msg_id].append(payload) + elif override_policy == 'replace': + self._socketio_message_buffer[room][event][msg_id] = payload + else: + raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa + return msg_id + + def buffer_user_patch_operation(self, ressource, patch_operation): + self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id), + patch_operation, + 'user_{}'.format(ressource.user_id), + msg_id='_', override_policy='append') + + def clear_socketio_message_buffer(self): + self._socketio_message_buffer = {} + + def flush_socketio_messages(self): + for room in self._socketio_message_buffer: + for event in self._socketio_message_buffer[room]: + for message in self._socketio_message_buffer[room][event]: + socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa + self.clear_socketio_message_buffer() -def check_corpora(): - corpora = Corpus.query.all() - for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): - corpus_utils.create_build_corpus_service(corpus) - for corpus in filter(lambda corpus: corpus.status in ['queued', 'running'], corpora): # noqa - corpus_utils.checkout_build_corpus_service(corpus) - for corpus in filter(lambda corpus: corpus.status == 'start analysis', corpora): # noqa - corpus_utils.create_cqpserver_container(corpus) - for corpus in filter(lambda corpus: corpus.status == 'stop analysis', corpora): # noqa - corpus_utils.remove_cqpserver_container(corpus) - db.session.commit() - - -def check_jobs(): - jobs = Job.query.all() - for job in filter(lambda job: job.status == 'submitted', jobs): - job_utils.create_job_service(job) - for job in filter(lambda job: job.status in ['queued', 'running'], jobs): - job_utils.checkout_job_service(job) - for job in filter(lambda job: job.status == 'canceling', jobs): - job_utils.remove_job_service(job) - db.session.commit() +task_runner = TaskRunner() diff --git a/web/app/tasks/corpus_utils.py b/web/app/tasks/corpus_utils.py index 2167da46..5d51f342 100644 --- a/web/app/tasks/corpus_utils.py +++ b/web/app/tasks/corpus_utils.py @@ -1,174 +1,204 @@ -from . import docker_client +from ..models import Corpus import docker import logging import os import shutil -def create_build_corpus_service(corpus): - corpus_data_dir = os.path.join(corpus.path, 'data') - shutil.rmtree(corpus_data_dir, ignore_errors=True) - os.mkdir(corpus_data_dir) - corpus_registry_dir = os.path.join(corpus.path, 'registry') - shutil.rmtree(corpus_registry_dir, ignore_errors=True) - os.mkdir(corpus_registry_dir) - corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt') - service_kwargs = { - 'command': 'docker-entrypoint.sh build-corpus', - 'constraints': ['node.role==worker'], - 'labels': {'origin': 'nopaque', - 'type': 'corpus.build', - 'corpus_id': str(corpus.id)}, - 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', - corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'build-corpus_{}'.format(corpus.id), - 'restart_policy': docker.types.RestartPolicy() - } - service_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' - try: - docker_client.services.create(service_image, **service_kwargs) - except docker.errors.APIError as e: - logging.error( - 'Create "{}" service raised '.format(service_kwargs['name']) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - else: - corpus.status = 'queued' +class CheckCorporaMixin: + def check_corpora(self): + corpora = Corpus.query.all() + queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora)) + running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora)) + start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora)) + stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora)) + submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora)) + for corpus in submitted_corpora: + self.create_build_corpus_service(corpus) + for corpus in queued_corpora + running_corpora: + self.checkout_build_corpus_service(corpus) + for corpus in start_analysis_corpora: + self.create_cqpserver_container(corpus) + for corpus in stop_analysis_corpora: + self.remove_cqpserver_container(corpus) + def create_build_corpus_service(self, corpus): + corpus_data_dir = os.path.join(corpus.path, 'data') + shutil.rmtree(corpus_data_dir, ignore_errors=True) + os.mkdir(corpus_data_dir) + corpus_registry_dir = os.path.join(corpus.path, 'registry') + shutil.rmtree(corpus_registry_dir, ignore_errors=True) + os.mkdir(corpus_registry_dir) + corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt') + service_kwargs = { + 'command': 'docker-entrypoint.sh build-corpus', + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'corpus.build', + 'corpus_id': str(corpus.id)}, + 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', + corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'build-corpus_{}'.format(corpus.id), + 'restart_policy': docker.types.RestartPolicy() + } + service_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + try: + self.docker.services.create(service_image, **service_kwargs) + except docker.errors.APIError as e: + logging.error( + 'Create "{}" service raised '.format(service_kwargs['name']) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + else: + corpus.status = 'queued' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) -def checkout_build_corpus_service(corpus): - service_name = 'build-corpus_{}'.format(corpus.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.NotFound" The service does not exist. ' - + '(corpus.status: {} -> failed)'.format(corpus.status) - ) - corpus.status = 'failed' - except docker.errors.APIError as e: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - except docker.errors.InvalidVersion: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.InvalidVersion" One of the arguments is ' - + 'not supported with the current API version.' - ) - else: - service_tasks = service.tasks() - if not service_tasks: + def checkout_build_corpus_service(self, corpus): + service_name = 'build-corpus_{}'.format(corpus.id) + try: + service = self.docker.services.get(service_name) + except docker.errors.NotFound: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.NotFound" The service does not exist. ' + + '(corpus.status: {} -> failed)'.format(corpus.status) + ) + corpus.status = 'failed' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) + except docker.errors.APIError as e: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + except docker.errors.InvalidVersion: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.InvalidVersion" One of the arguments is ' + + 'not supported with the current API version.' + ) + else: + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if corpus.status == 'queued' and task_state != 'pending': + corpus.status = 'running' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) + elif (corpus.status == 'running' + and task_state in ['complete', 'failed']): + try: + service.remove() + except docker.errors.APIError as e: + logging.error( + 'Remove "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + else: + corpus.status = 'prepared' if task_state == 'complete' \ + else 'failed' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) + + def create_cqpserver_container(self, corpus): + corpus_data_dir = os.path.join(corpus.path, 'data') + corpus_registry_dir = os.path.join(corpus.path, 'registry') + container_kwargs = { + 'command': 'cqpserver', + 'detach': True, + 'volumes': [corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'cqpserver_{}'.format(corpus.id), + 'network': 'nopaque_default' + } + container_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + # Check if a cqpserver container already exists. If this is the case, + # remove it and create a new one + try: + container = self.docker.containers.get(container_kwargs['name']) + except docker.errors.NotFound: + pass + except docker.errors.APIError as e: + logging.error( + 'Get "{}" container raised '.format(container_kwargs['name']) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) return - task_state = service_tasks[0].get('Status').get('State') - if corpus.status == 'queued' and task_state != 'pending': - corpus.status = 'running' - elif (corpus.status == 'running' - and task_state in ['complete', 'failed']): + else: try: - service.remove() + container.remove(force=True) except docker.errors.APIError as e: logging.error( - 'Remove "{}" service raised '.format(service_name) + 'Remove "{}" container raised '.format(container_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return - else: - corpus.status = 'prepared' if task_state == 'complete' \ - else 'failed' - - -def create_cqpserver_container(corpus): - corpus_data_dir = os.path.join(corpus.path, 'data') - corpus_registry_dir = os.path.join(corpus.path, 'registry') - container_kwargs = { - 'command': 'cqpserver', - 'detach': True, - 'volumes': [corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'cqpserver_{}'.format(corpus.id), - 'network': 'nopaque_default' - } - container_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' - # Check if a cqpserver container already exists. If this is the case, - # remove it and create a new one - try: - container = docker_client.containers.get(container_kwargs['name']) - except docker.errors.NotFound: - pass - except docker.errors.APIError as e: - logging.error( - 'Get "{}" container raised '.format(container_kwargs['name']) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - else: try: - container.remove(force=True) + self.docker.containers.run(container_image, **container_kwargs) + except docker.errors.ContainerError: + # This case should not occur, because detach is True. + logging.error( + 'Run "{}" container raised '.format(container_kwargs['name']) + + '"docker.errors.ContainerError" The container exits with a ' + + 'non-zero exit code and detach is False.' + ) + corpus.status = 'failed' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) + except docker.errors.ImageNotFound: + logging.error( + 'Run "{}" container raised '.format(container_kwargs['name']) + + '"docker.errors.ImageNotFound" The specified image does not ' + + 'exist.' + ) + corpus.status = 'failed' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( - 'Remove "{}" container raised '.format(container_kwargs['name']) # noqa + 'Run "{}" container raised '.format(container_kwargs['name']) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + else: + corpus.status = 'analysing' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) + + def remove_cqpserver_container(self, corpus): + container_name = 'cqpserver_{}'.format(corpus.id) + try: + container = self.docker.containers.get(container_name) + except docker.errors.NotFound: + pass + except docker.errors.APIError as e: + logging.error( + 'Get "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return - try: - docker_client.containers.run(container_image, **container_kwargs) - except docker.errors.ContainerError: - # This case should not occur, because detach is True. - logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) - + '"docker.errors.ContainerError" The container exits with a ' - + 'non-zero exit code and detach is False.' - ) - corpus.status = 'failed' - except docker.errors.ImageNotFound: - logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) - + '"docker.errors.ImageNotFound" The specified image does not ' - + 'exist.' - ) - corpus.status = 'failed' - except docker.errors.APIError as e: - logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - else: - corpus.status = 'analysing' - - -def remove_cqpserver_container(corpus): - container_name = 'cqpserver_{}'.format(corpus.id) - try: - container = docker_client.containers.get(container_name) - except docker.errors.NotFound: - pass - except docker.errors.APIError as e: - logging.error( - 'Get "{}" container raised '.format(container_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - else: - try: - container.remove(force=True) - except docker.errors.APIError as e: - logging.error( - 'Remove "{}" container raised '.format(container_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - corpus.status = 'prepared' + else: + try: + container.remove(force=True) + except docker.errors.APIError as e: + logging.error( + 'Remove "{}" container raised '.format(container_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + corpus.status = 'prepared' + patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + self.buffer_user_patch_operation(corpus, patch_operation) diff --git a/web/app/tasks/job_utils.py b/web/app/tasks/job_utils.py index 19a75b81..e2635dda 100644 --- a/web/app/tasks/job_utils.py +++ b/web/app/tasks/job_utils.py @@ -1,82 +1,161 @@ from datetime import datetime from werkzeug.utils import secure_filename -from . import docker_client from .. import db, mail from ..email import create_message -from ..models import JobResult +from ..models import Job, JobResult import docker import logging import json import os -def create_job_service(job): - cmd = '{} -i /files -o /files/output'.format(job.service) - if job.service == 'file-setup': - cmd += ' -f {}'.format(secure_filename(job.title)) - cmd += ' --log-dir /files' - cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) - cmd += ' ' + ' '.join(json.loads(job.service_args)) - service_kwargs = {'command': cmd, - 'constraints': ['node.role==worker'], - 'labels': {'origin': 'nopaque', - 'type': 'service.{}'.format(job.service), - 'job_id': str(job.id)}, - 'mounts': [job.path + ':/files:rw'], - 'name': 'job_{}'.format(job.id), - 'resources': docker.types.Resources( - cpu_reservation=job.n_cores * (10 ** 9), - mem_reservation=job.mem_mb * (10 ** 6) - ), - 'restart_policy': docker.types.RestartPolicy()} - service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( - job.service, job.service_version) - try: - docker_client.services.create(service_image, **service_kwargs) - except docker.errors.APIError as e: - logging.error( - 'Create "{}" service raised '.format(service_kwargs['name']) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - else: - job.status = 'queued' - finally: - send_notification(job) +class CheckJobsMixin: + def check_jobs(self): + jobs = Job.query.all() + canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs)) + queued_jobs = list(filter(lambda job: job.status == 'queued', jobs)) + running_jobs = list(filter(lambda job: job.status == 'running', jobs)) + submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs)) + for job in submitted_jobs: + self.create_job_service(job) + for job in queued_jobs + running_jobs: + self.checkout_job_service(job) + for job in canceling_jobs: + self.remove_job_service(job) - -def checkout_job_service(job): - service_name = 'job_{}'.format(job.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - logging.error('Get "{}" service raised '.format(service_name) - + '"docker.errors.NotFound" The service does not exist. ' - + '(job.status: {} -> failed)'.format(job.status)) - job.status = 'failed' - except docker.errors.APIError as e: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - except docker.errors.InvalidVersion: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.InvalidVersion" One of the arguments is ' - + 'not supported with the current API version.' - ) - return - else: - service_tasks = service.tasks() - if not service_tasks: + def create_job_service(self, job): + cmd = '{} -i /files -o /files/output'.format(job.service) + if job.service == 'file-setup': + cmd += ' -f {}'.format(secure_filename(job.title)) + cmd += ' --log-dir /files' + cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) + cmd += ' ' + ' '.join(json.loads(job.service_args)) + service_kwargs = {'command': cmd, + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'service.{}'.format(job.service), + 'job_id': str(job.id)}, + 'mounts': [job.path + ':/files:rw'], + 'name': 'job_{}'.format(job.id), + 'resources': docker.types.Resources( + cpu_reservation=job.n_cores * (10 ** 9), + mem_reservation=job.mem_mb * (10 ** 6) + ), + 'restart_policy': docker.types.RestartPolicy()} + service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version) + try: + self.docker.services.create(service_image, **service_kwargs) + except docker.errors.APIError as e: + logging.error( + 'Create "{}" service raised '.format(service_kwargs['name']) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) return - task_state = service_tasks[0].get('Status').get('State') - if job.status == 'queued' and task_state != 'pending': - job.status = 'running' - elif job.status == 'running' and task_state in ['complete', 'failed']: + else: + job.status = 'queued' + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + self.buffer_user_patch_operation(job, patch_operation) + finally: + self.send_job_notification(job) + + def checkout_job_service(self, job): + service_name = 'job_{}'.format(job.id) + try: + service = self.docker.services.get(service_name) + except docker.errors.NotFound: + logging.error('Get "{}" service raised '.format(service_name) + + '"docker.errors.NotFound" The service does not exist. ' + + '(job.status: {} -> failed)'.format(job.status)) + job.status = 'failed' + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + self.buffer_user_patch_operation(job, patch_operation) + except docker.errors.APIError as e: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + except docker.errors.InvalidVersion: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.InvalidVersion" One of the arguments is ' + + 'not supported with the current API version.' + ) + return + else: + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if job.status == 'queued' and task_state != 'pending': + job.status = 'running' + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + self.buffer_user_patch_operation(job, patch_operation) + elif job.status == 'running' and task_state in ['complete', 'failed']: + try: + service.remove() + except docker.errors.APIError as e: + logging.error( + 'Remove "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + else: + if task_state == 'complete': + results_dir = os.path.join(job.path, 'output') + result_files = filter(lambda x: x.endswith('.zip'), + os.listdir(results_dir)) + for result_file in result_files: + job_result = JobResult(filename=result_file, job=job) + db.session.add(job_result) + db.session.flush() + db.session.refresh(job_result) + patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} + self.buffer_user_patch_operation(job, patch_operation) + job.end_date = datetime.utcnow() + patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} + self.buffer_user_patch_operation(job, patch_operation) + job.status = task_state + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + self.buffer_user_patch_operation(job, patch_operation) + finally: + self.send_job_notification(job) + + def remove_job_service(self, job): + service_name = 'job_{}'.format(job.id) + try: + service = self.docker.services.get(service_name) + except docker.errors.NotFound: + job.status = 'canceled' + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + self.buffer_user_patch_operation(job, patch_operation) + except docker.errors.APIError as e: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + except docker.errors.InvalidVersion: + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.InvalidVersion" One of the arguments is ' + + 'not supported with the current API version.' + ) + return + else: + try: + service.update(mounts=None) + except docker.errors.APIError as e: + logging.error( + 'Update "{}" service raised '.format(service_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return try: service.remove() except docker.errors.APIError as e: @@ -85,68 +164,14 @@ def checkout_job_service(job): + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) - return - else: - if task_state == 'complete': - job_results_dir = os.path.join(job.path, 'output') - job_results = filter(lambda x: x.endswith('.zip'), - os.listdir(job_results_dir)) - for job_result in job_results: - job_result = JobResult(filename=job_result, job=job) - db.session.add(job_result) - job.end_date = datetime.utcnow() - job.status = task_state - finally: - send_notification(job) - -def remove_job_service(job): - service_name = 'job_{}'.format(job.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - job.status = 'canceled' - except docker.errors.APIError as e: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - except docker.errors.InvalidVersion: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.InvalidVersion" One of the arguments is ' - + 'not supported with the current API version.' - ) - return - else: - try: - service.update(mounts=None) - except docker.errors.APIError as e: - logging.error( - 'Update "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) + def send_job_notification(self, job): + if job.creator.setting_job_status_mail_notifications == 'none': return - try: - service.remove() - except docker.errors.APIError as e: - logging.error( - 'Remove "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - - -def send_notification(job): - if job.creator.setting_job_status_mail_notifications == 'none': - return - if (job.creator.setting_job_status_mail_notifications == 'end' - and job.status not in ['complete', 'failed']): - return - msg = create_message(job.creator.email, - 'Status update for your Job "{}"'.format(job.title), - 'tasks/email/notification', job=job) - mail.send(msg) + if (job.creator.setting_job_status_mail_notifications == 'end' + and job.status not in ['complete', 'failed']): + return + msg = create_message(job.creator.email, + 'Status update for your Job "{}"'.format(job.title), + 'tasks/email/notification', job=job) + mail.send(msg) diff --git a/web/nopaque.py b/web/nopaque.py index 5c5c5af5..16662a23 100644 --- a/web/nopaque.py +++ b/web/nopaque.py @@ -50,9 +50,8 @@ def deploy(): @app.cli.command() def tasks(): - from app.tasks import check_corpora, check_jobs - check_corpora() - check_jobs() + from app.tasks import task_runner + task_runner.run() @app.cli.command()