From 996ed1c7900f99d81a0c5579604f8891ef870776 Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Mon, 1 Feb 2021 12:51:07 +0100
Subject: [PATCH] Remove user session loop. Instead send ressource updates
directly on change
---
web/app/corpora/events.py | 5 +-
web/app/corpora/tasks.py | 20 +-
web/app/corpora/views.py | 33 ++-
web/app/events.py | 89 ++++----
web/app/jobs/tasks.py | 19 +-
web/app/services/views.py | 6 +-
web/app/static/js/nopaque/main.js | 20 +-
web/app/tasks/__init__.py | 78 ++++---
web/app/tasks/corpus_utils.py | 332 ++++++++++++++++--------------
web/app/tasks/job_utils.py | 285 +++++++++++++------------
web/nopaque.py | 5 +-
11 files changed, 513 insertions(+), 379 deletions(-)
diff --git a/web/app/corpora/events.py b/web/app/corpora/events.py
index 7e085814..22804fbe 100644
--- a/web/app/corpora/events.py
+++ b/web/app/corpora/events.py
@@ -4,12 +4,13 @@ from flask_login import current_user
from socket import gaierror
from .. import db, socketio
from ..decorators import socketio_login_required
-from ..events import connected_sessions
+from ..events import socketio_sessions
from ..models import Corpus, User
import cqi
import math
import os
import shutil
+import logging
'''
@@ -282,7 +283,7 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload}
socketio.emit('corpus_analysis_init', response, room=session_id)
''' Observe analysis session '''
- while session_id in connected_sessions:
+ while session_id in socketio_sessions:
socketio.sleep(3)
''' Teardown analysis session '''
if client.status == 'running':
diff --git a/web/app/corpora/tasks.py b/web/app/corpora/tasks.py
index f65a00de..3ad43740 100644
--- a/web/app/corpora/tasks.py
+++ b/web/app/corpora/tasks.py
@@ -1,4 +1,4 @@
-from .. import db
+from .. import db, socketio
from ..decorators import background
from ..models import Corpus, CorpusFile, QueryResult
@@ -12,6 +12,11 @@ def build_corpus(corpus_id, *args, **kwargs):
raise Exception('Corpus {} not found'.format(corpus_id))
corpus.build()
db.session.commit()
+ event = 'user_{}_patch'.format(corpus.user_id)
+ jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/last_edited_date'.format(corpus.id), 'value': corpus.last_edited_date.timestamp()}, # noqa
+ {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa
+ room = 'user_{}'.format(corpus.user_id)
+ socketio.emit(event, jsonpatch, room=room)
@background
@@ -20,8 +25,12 @@ def delete_corpus(corpus_id, *args, **kwargs):
corpus = Corpus.query.get(corpus_id)
if corpus is None:
raise Exception('Corpus {} not found'.format(corpus_id))
+ event = 'user_{}_patch'.format(corpus.user_id)
+ jsonpatch = [{'op': 'remove', 'path': '/corpora/{}'.format(corpus.id)}]
+ room = 'user_{}'.format(corpus.user_id)
corpus.delete()
db.session.commit()
+ socketio.emit(event, jsonpatch, room=room)
@background
@@ -30,8 +39,13 @@ def delete_corpus_file(corpus_file_id, *args, **kwargs):
corpus_file = CorpusFile.query.get(corpus_file_id)
if corpus_file is None:
raise Exception('Corpus file {} not found'.format(corpus_file_id))
+ event = 'user_{}_patch'.format(corpus_file.corpus.user_id)
+ jsonpatch = [{'op': 'remove', 'path': '/corpora/{}/files/{}'.format(corpus_file.corpus_id, corpus_file.id)}, # noqa
+ {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus_file.corpus_id), 'value': corpus_file.corpus.status}] # noqa
+ room = 'user_{}'.format(corpus_file.corpus.user_id)
corpus_file.delete()
db.session.commit()
+ socketio.emit(event, jsonpatch, room=room)
@background
@@ -40,5 +54,9 @@ def delete_query_result(query_result_id, *args, **kwargs):
query_result = QueryResult.query.get(query_result_id)
if query_result is None:
raise Exception('QueryResult {} not found'.format(query_result_id))
+ event = 'user_{}_patch'.format(query_result.user_id)
+ jsonpatch = [{'op': 'remove', 'path': '/query_results/{}'.format(query_result.id)}] # noqa
+ room = 'user_{}'.format(query_result.user_id)
query_result.delete()
db.session.commit()
+ socketio.emit(event, jsonpatch, room=room)
diff --git a/web/app/corpora/views.py b/web/app/corpora/views.py
index d4382211..cef864c9 100644
--- a/web/app/corpora/views.py
+++ b/web/app/corpora/views.py
@@ -8,7 +8,7 @@ from .forms import (AddCorpusFileForm, AddCorpusForm, AddQueryResultForm,
DisplayOptionsForm, InspectDisplayOptionsForm,
ImportCorpusForm)
from jsonschema import validate
-from .. import db
+from .. import db, socketio
from ..models import Corpus, CorpusFile, QueryResult
import json
import logging
@@ -29,16 +29,22 @@ def add_corpus():
description=form.description.data,
title=form.title.data)
db.session.add(corpus)
- db.session.commit()
+ db.session.flush()
+ db.session.refresh(corpus)
try:
os.makedirs(corpus.path)
except OSError:
logging.error('Make dir {} led to an OSError!'.format(corpus.path))
- db.session.delete(corpus)
- db.session.commit()
+ db.session.rollback()
abort(500)
- flash('Corpus "{}" added!'.format(corpus.title), 'corpus')
- return redirect(url_for('.corpus', corpus_id=corpus.id))
+ else:
+ db.session.commit()
+ flash('Corpus "{}" added!'.format(corpus.title), 'corpus')
+ event = 'user_{}_patch'.format(corpus.user_id)
+ jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
+ room = 'user_{}'.format(corpus.user_id)
+ socketio.emit(event, jsonpatch, room=room)
+ return redirect(url_for('.corpus', corpus_id=corpus.id))
return render_template('corpora/add_corpus.html.j2', form=form,
title='Add corpus')
@@ -54,13 +60,13 @@ def import_corpus():
description=form.description.data,
title=form.title.data)
db.session.add(corpus)
- db.session.commit()
+ db.session.flush()
+ db.session.refresh(corpus)
try:
os.makedirs(corpus.path)
except OSError:
logging.error('Make dir {} led to an OSError!'.format(corpus.path))
- db.session.delete(corpus)
- db.session.commit()
+ db.session.rollback()
flash('Internal Server Error', 'error')
return make_response(
{'redirect_url': url_for('.import_corpus')}, 500)
@@ -100,6 +106,10 @@ def import_corpus():
db.session.commit()
os.remove(archive_file)
flash('Corpus "{}" imported!'.format(corpus.title), 'corpus')
+ event = 'user_{}_patch'.format(corpus.user_id)
+ jsonpatch = [{'op': 'add', 'path': '/corpora/{}'.format(corpus.id), 'value': corpus.to_dict()}] # noqa
+ room = 'user_{}'.format(corpus.user_id)
+ socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201)
else:
@@ -205,6 +215,11 @@ def add_corpus_file(corpus_id):
corpus.status = 'unprepared'
db.session.commit()
flash('Corpus file "{}" added!'.format(corpus_file.filename), 'corpus')
+ event = 'user_{}_patch'.format(corpus.user_id)
+ jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}, # noqa
+ {'op': 'add', 'path': '/corpora/{}/files/{}'.format(corpus.id, corpus_file.id), 'value': corpus_file.to_dict()}] # noqa
+ room = 'user_{}'.format(corpus.user_id)
+ socketio.emit(event, jsonpatch, room=room)
return make_response({'redirect_url': url_for('.corpus', corpus_id=corpus.id)}, 201) # noqa
return render_template('corpora/add_corpus_file.html.j2', corpus=corpus,
form=form, title='Add corpus file')
diff --git a/web/app/events.py b/web/app/events.py
index a0f76b3b..72cdef66 100644
--- a/web/app/events.py
+++ b/web/app/events.py
@@ -1,73 +1,72 @@
-from flask import current_app, request
+from flask import request
from flask_login import current_user
-from . import db, socketio
-from .decorators import socketio_admin_required, socketio_login_required
+from flask_socketio import join_room, leave_room
+from . import socketio
+from .decorators import socketio_login_required
from .models import User
-import json
-import jsonpatch
+###############################################################################
+# Socket.IO event handlers #
+###############################################################################
'''
' A list containing session ids of connected Socket.IO sessions, to keep track
-' of all connected sessions, which is used to determine the runtimes of
+' of all connected sessions, which can be used to determine the runtimes of
' associated background tasks.
'''
-connected_sessions = []
+socketio_sessions = []
@socketio.on('connect')
-def connect():
+@socketio_login_required
+def socketio_connect():
'''
' The Socket.IO module creates a session id (sid) for each request.
' On connect the sid is saved in the connected sessions list.
'''
- connected_sessions.append(request.sid)
+ socketio_sessions.append(request.sid)
@socketio.on('disconnect')
-def disconnect():
+def socketio_disconnect():
'''
' On disconnect the session id gets removed from the connected sessions
' list.
'''
- connected_sessions.remove(request.sid)
+ socketio_sessions.remove(request.sid)
@socketio.on('start_user_session')
@socketio_login_required
-def start_user_session(user_id):
- if not (current_user.id == user_id or current_user.is_administrator):
- return
- socketio.start_background_task(user_session,
- current_app._get_current_object(),
- user_id, request.sid)
+def socketio_start_user_session(user_id):
+ user = User.query.get(user_id)
+ if user is None:
+ response = {'code': 404, 'msg': 'Not found'}
+ socketio.emit('start_user_session', response, room=request.sid)
+ elif not (user == current_user or current_user.is_administrator):
+ response = {'code': 403, 'msg': 'Forbidden'}
+ socketio.emit('start_user_session', response, room=request.sid)
+ else:
+ response = {'code': 200, 'msg': 'OK'}
+ socketio.emit('start_user_session', response, room=request.sid)
+ socketio.emit('user_{}_init'.format(user.id), user.to_dict(),
+ room=request.sid)
+ room = 'user_{}'.format(user.id)
+ join_room(room)
-def user_session(app, user_id, session_id):
- '''
- ' Sends initial user data to the client. Afterwards it checks every 3s if
- ' changes to the initial values appeared. If changes are detected, a
- ' RFC 6902 compliant JSON patch gets send.
- '''
- init_event = 'user_{}_init'.format(user_id)
- patch_event = 'user_{}_patch'.format(user_id)
- with app.app_context():
- # Gather current values from database.
- user = User.query.get(user_id)
- user_dict = user.to_dict()
- # Send initial values to the client.
- socketio.emit(init_event, json.dumps(user_dict), room=session_id)
- while session_id in connected_sessions:
- # Get new values from the database
- db.session.refresh(user)
- new_user_dict = user.to_dict()
- # Compute JSON patches.
- user_patch = jsonpatch.JsonPatch.from_diff(user_dict,
- new_user_dict)
- # In case there are patches, send them to the client.
- if user_patch:
- socketio.emit(patch_event, user_patch.to_string(),
- room=session_id)
- # Set new values as references for the next iteration.
- user_dict = new_user_dict
- socketio.sleep(3)
+@socketio.on('stop_user_session')
+@socketio_login_required
+def socketio_stop_user_session(user_id):
+ user = User.query.get(user_id)
+ if user is None:
+ response = {'code': 404, 'msg': 'Not found'}
+ socketio.emit('stop_user_session', response, room=request.sid)
+ elif not (user == current_user or current_user.is_administrator):
+ response = {'code': 403, 'msg': 'Forbidden'}
+ socketio.emit('stop_user_session', response, room=request.sid)
+ else:
+ response = {'code': 200, 'msg': 'OK'}
+ socketio.emit('stop_user_session', response, room=request.sid)
+ room = 'user_{}'.format(user.id)
+ leave_room(room)
diff --git a/web/app/jobs/tasks.py b/web/app/jobs/tasks.py
index 8ab823dc..fb4223b8 100644
--- a/web/app/jobs/tasks.py
+++ b/web/app/jobs/tasks.py
@@ -1,4 +1,4 @@
-from .. import db
+from .. import db, socketio
from ..decorators import background
from ..models import Job
@@ -9,8 +9,12 @@ def delete_job(job_id, *args, **kwargs):
job = Job.query.get(job_id)
if job is None:
raise Exception('Job {} not found'.format(job_id))
+ event = 'user_{}_patch'.format(job.user_id)
+ jsonpatch = [{'op': 'remove', 'path': '/jobs/{}'.format(job.id)}]
+ room = 'user_{}'.format(job.user_id)
job.delete()
db.session.commit()
+ socketio.emit(event, jsonpatch, room=room)
@background
@@ -19,5 +23,14 @@ def restart_job(job_id, *args, **kwargs):
job = Job.query.get(job_id)
if job is None:
raise Exception('Job {} not found'.format(job_id))
- job.restart()
- db.session.commit()
+ try:
+ job.restart()
+ except Exception:
+ pass
+ else:
+ db.session.commit()
+ event = 'user_{}_patch'.format(job.user_id)
+ jsonpatch = [{'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}, # noqa
+ {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}] # noqa
+ room = 'user_{}'.format(job.user_id)
+ socketio.emit(event, jsonpatch, room=room)
diff --git a/web/app/services/views.py b/web/app/services/views.py
index bd4bf3fa..1d81e6a8 100644
--- a/web/app/services/views.py
+++ b/web/app/services/views.py
@@ -2,7 +2,7 @@ from flask import abort, flash, make_response, render_template, url_for
from flask_login import current_user, login_required
from werkzeug.utils import secure_filename
from . import services
-from .. import db
+from .. import db, socketio
from ..jobs.forms import AddFileSetupJobForm, AddNLPJobForm, AddOCRJobForm
from ..models import Job, JobInput
import json
@@ -70,6 +70,10 @@ def service(service):
job.status = 'submitted'
db.session.commit()
flash('Job "{}" added'.format(job.title), 'job')
+ event = 'user_{}_patch'.format(job.user_id)
+ jsonpatch = [{'op': 'add', 'path': '/jobs/{}'.format(job.id), 'value': job.to_dict()}] # noqa
+ room = 'user_{}'.format(job.user_id)
+ socketio.emit(event, jsonpatch, room=room)
return make_response(
{'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
return render_template('services/{}.html.j2'.format(service),
diff --git a/web/app/static/js/nopaque/main.js b/web/app/static/js/nopaque/main.js
index c8f886be..6435186f 100644
--- a/web/app/static/js/nopaque/main.js
+++ b/web/app/static/js/nopaque/main.js
@@ -9,8 +9,8 @@ class AppClient {
if (userId in this.users) {return this.users[userId];}
let user = new User();
this.users[userId] = user;
- this.socket.on(`user_${userId}_init`, msg => user.init(JSON.parse(msg)));
- this.socket.on(`user_${userId}_patch`, msg => user.patch(JSON.parse(msg)));
+ this.socket.on(`user_${userId}_init`, msg => user.init(msg));
+ this.socket.on(`user_${userId}_patch`, msg => user.patch(msg));
this.socket.emit('start_user_session', userId);
return user;
}
@@ -40,40 +40,44 @@ class User {
}
init(data) {
+ console.log('### User.init ###');
+ console.log(data);
this.data = data;
for (let [corpusId, eventListeners] of Object.entries(this.eventListeners.corpus)) {
if (corpusId === '*') {
- for (let eventListener of eventListeners) {eventListener('init', this.data.corpora);}
+ for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (corpusId in this.data.corpora) {
- for (let eventListener of eventListeners) {eventListener('init', this.data.corpora[corpusId]);}
+ for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
for (let [jobId, eventListeners] of Object.entries(this.eventListeners.job)) {
if (jobId === '*') {
- for (let eventListener of eventListeners) {eventListener('init', this.data.jobs);}
+ for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (jobId in this.data.jobs) {
- for (let eventListener of eventListeners) {eventListener('init', this.data.jobs[jobId]);}
+ for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
for (let [queryResultId, eventListeners] of Object.entries(this.eventListeners.queryResult)) {
if (queryResultId === '*') {
- for (let eventListener of eventListeners) {eventListener('init', this.data.query_results);}
+ for (let eventListener of eventListeners) {eventListener('init');}
} else {
if (queryResultId in this.data.query_results) {
- for (let eventListener of eventListeners) {eventListener('init', this.data.query_results[queryResultId]);}
+ for (let eventListener of eventListeners) {eventListener('init');}
}
}
}
}
patch(patch) {
+ console.log('### User.patch ###');
+ console.log(patch);
this.data = jsonpatch.apply_patch(this.data, patch);
let corporaPatch = patch.filter(operation => operation.path.startsWith("/corpora"));
diff --git a/web/app/tasks/__init__.py b/web/app/tasks/__init__.py
index 154841b7..d19a4b8f 100644
--- a/web/app/tasks/__init__.py
+++ b/web/app/tasks/__init__.py
@@ -1,31 +1,57 @@
-from .. import db
-from ..models import Corpus, Job
+from .corpus_utils import CheckCorporaMixin
+from .job_utils import CheckJobsMixin
+from ..import db, socketio
import docker
-docker_client = docker.from_env()
-from . import corpus_utils, job_utils # noqa
+class TaskRunner(CheckCorporaMixin, CheckJobsMixin):
+ def __init__(self):
+ self.docker = docker.from_env()
+ self._socketio_message_buffer = {}
+
+ def run(self):
+ self.check_corpora()
+ self.check_jobs()
+ db.session.commit()
+ self.flush_socketio_messages()
+
+ def buffer_socketio_message(self, event, payload, room,
+ msg_id=None, override_policy='replace'):
+ if room not in self._socketio_message_buffer:
+ self._socketio_message_buffer[room] = {}
+ if event not in self._socketio_message_buffer[room]:
+ self._socketio_message_buffer[room][event] = {}
+ if msg_id is None:
+ msg_id = len(self._socketio_message_buffer[room][event].keys())
+ if override_policy == 'append':
+ if msg_id in self._socketio_message_buffer[room][event]:
+ # If the current message value isn't a list, convert it!
+ if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa
+ self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa
+ else:
+ self._socketio_message_buffer[room][event][msg_id] = []
+ self._socketio_message_buffer[room][event][msg_id].append(payload)
+ elif override_policy == 'replace':
+ self._socketio_message_buffer[room][event][msg_id] = payload
+ else:
+ raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa
+ return msg_id
+
+ def buffer_user_patch_operation(self, ressource, patch_operation):
+ self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id),
+ patch_operation,
+ 'user_{}'.format(ressource.user_id),
+ msg_id='_', override_policy='append')
+
+ def clear_socketio_message_buffer(self):
+ self._socketio_message_buffer = {}
+
+ def flush_socketio_messages(self):
+ for room in self._socketio_message_buffer:
+ for event in self._socketio_message_buffer[room]:
+ for message in self._socketio_message_buffer[room][event]:
+ socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa
+ self.clear_socketio_message_buffer()
-def check_corpora():
- corpora = Corpus.query.all()
- for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
- corpus_utils.create_build_corpus_service(corpus)
- for corpus in filter(lambda corpus: corpus.status in ['queued', 'running'], corpora): # noqa
- corpus_utils.checkout_build_corpus_service(corpus)
- for corpus in filter(lambda corpus: corpus.status == 'start analysis', corpora): # noqa
- corpus_utils.create_cqpserver_container(corpus)
- for corpus in filter(lambda corpus: corpus.status == 'stop analysis', corpora): # noqa
- corpus_utils.remove_cqpserver_container(corpus)
- db.session.commit()
-
-
-def check_jobs():
- jobs = Job.query.all()
- for job in filter(lambda job: job.status == 'submitted', jobs):
- job_utils.create_job_service(job)
- for job in filter(lambda job: job.status in ['queued', 'running'], jobs):
- job_utils.checkout_job_service(job)
- for job in filter(lambda job: job.status == 'canceling', jobs):
- job_utils.remove_job_service(job)
- db.session.commit()
+task_runner = TaskRunner()
diff --git a/web/app/tasks/corpus_utils.py b/web/app/tasks/corpus_utils.py
index 2167da46..5d51f342 100644
--- a/web/app/tasks/corpus_utils.py
+++ b/web/app/tasks/corpus_utils.py
@@ -1,174 +1,204 @@
-from . import docker_client
+from ..models import Corpus
import docker
import logging
import os
import shutil
-def create_build_corpus_service(corpus):
- corpus_data_dir = os.path.join(corpus.path, 'data')
- shutil.rmtree(corpus_data_dir, ignore_errors=True)
- os.mkdir(corpus_data_dir)
- corpus_registry_dir = os.path.join(corpus.path, 'registry')
- shutil.rmtree(corpus_registry_dir, ignore_errors=True)
- os.mkdir(corpus_registry_dir)
- corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt')
- service_kwargs = {
- 'command': 'docker-entrypoint.sh build-corpus',
- 'constraints': ['node.role==worker'],
- 'labels': {'origin': 'nopaque',
- 'type': 'corpus.build',
- 'corpus_id': str(corpus.id)},
- 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
- corpus_data_dir + ':/corpora/data:rw',
- corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
- 'name': 'build-corpus_{}'.format(corpus.id),
- 'restart_policy': docker.types.RestartPolicy()
- }
- service_image = \
- 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
- try:
- docker_client.services.create(service_image, **service_kwargs)
- except docker.errors.APIError as e:
- logging.error(
- 'Create "{}" service raised '.format(service_kwargs['name'])
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- else:
- corpus.status = 'queued'
+class CheckCorporaMixin:
+ def check_corpora(self):
+ corpora = Corpus.query.all()
+ queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora))
+ running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora))
+ start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora))
+ stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora))
+ submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora))
+ for corpus in submitted_corpora:
+ self.create_build_corpus_service(corpus)
+ for corpus in queued_corpora + running_corpora:
+ self.checkout_build_corpus_service(corpus)
+ for corpus in start_analysis_corpora:
+ self.create_cqpserver_container(corpus)
+ for corpus in stop_analysis_corpora:
+ self.remove_cqpserver_container(corpus)
+ def create_build_corpus_service(self, corpus):
+ corpus_data_dir = os.path.join(corpus.path, 'data')
+ shutil.rmtree(corpus_data_dir, ignore_errors=True)
+ os.mkdir(corpus_data_dir)
+ corpus_registry_dir = os.path.join(corpus.path, 'registry')
+ shutil.rmtree(corpus_registry_dir, ignore_errors=True)
+ os.mkdir(corpus_registry_dir)
+ corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt')
+ service_kwargs = {
+ 'command': 'docker-entrypoint.sh build-corpus',
+ 'constraints': ['node.role==worker'],
+ 'labels': {'origin': 'nopaque',
+ 'type': 'corpus.build',
+ 'corpus_id': str(corpus.id)},
+ 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
+ corpus_data_dir + ':/corpora/data:rw',
+ corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
+ 'name': 'build-corpus_{}'.format(corpus.id),
+ 'restart_policy': docker.types.RestartPolicy()
+ }
+ service_image = \
+ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
+ try:
+ self.docker.services.create(service_image, **service_kwargs)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Create "{}" service raised '.format(service_kwargs['name'])
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ else:
+ corpus.status = 'queued'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
-def checkout_build_corpus_service(corpus):
- service_name = 'build-corpus_{}'.format(corpus.id)
- try:
- service = docker_client.services.get(service_name)
- except docker.errors.NotFound:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.NotFound" The service does not exist. '
- + '(corpus.status: {} -> failed)'.format(corpus.status)
- )
- corpus.status = 'failed'
- except docker.errors.APIError as e:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- except docker.errors.InvalidVersion:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.InvalidVersion" One of the arguments is '
- + 'not supported with the current API version.'
- )
- else:
- service_tasks = service.tasks()
- if not service_tasks:
+ def checkout_build_corpus_service(self, corpus):
+ service_name = 'build-corpus_{}'.format(corpus.id)
+ try:
+ service = self.docker.services.get(service_name)
+ except docker.errors.NotFound:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.NotFound" The service does not exist. '
+ + '(corpus.status: {} -> failed)'.format(corpus.status)
+ )
+ corpus.status = 'failed'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ except docker.errors.InvalidVersion:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.InvalidVersion" One of the arguments is '
+ + 'not supported with the current API version.'
+ )
+ else:
+ service_tasks = service.tasks()
+ if not service_tasks:
+ return
+ task_state = service_tasks[0].get('Status').get('State')
+ if corpus.status == 'queued' and task_state != 'pending':
+ corpus.status = 'running'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
+ elif (corpus.status == 'running'
+ and task_state in ['complete', 'failed']):
+ try:
+ service.remove()
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Remove "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
+ else:
+ corpus.status = 'prepared' if task_state == 'complete' \
+ else 'failed'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
+
+ def create_cqpserver_container(self, corpus):
+ corpus_data_dir = os.path.join(corpus.path, 'data')
+ corpus_registry_dir = os.path.join(corpus.path, 'registry')
+ container_kwargs = {
+ 'command': 'cqpserver',
+ 'detach': True,
+ 'volumes': [corpus_data_dir + ':/corpora/data:rw',
+ corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
+ 'name': 'cqpserver_{}'.format(corpus.id),
+ 'network': 'nopaque_default'
+ }
+ container_image = \
+ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
+ # Check if a cqpserver container already exists. If this is the case,
+ # remove it and create a new one
+ try:
+ container = self.docker.containers.get(container_kwargs['name'])
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Get "{}" container raised '.format(container_kwargs['name'])
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
return
- task_state = service_tasks[0].get('Status').get('State')
- if corpus.status == 'queued' and task_state != 'pending':
- corpus.status = 'running'
- elif (corpus.status == 'running'
- and task_state in ['complete', 'failed']):
+ else:
try:
- service.remove()
+ container.remove(force=True)
except docker.errors.APIError as e:
logging.error(
- 'Remove "{}" service raised '.format(service_name)
+ 'Remove "{}" container raised '.format(container_kwargs['name'])
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
- else:
- corpus.status = 'prepared' if task_state == 'complete' \
- else 'failed'
-
-
-def create_cqpserver_container(corpus):
- corpus_data_dir = os.path.join(corpus.path, 'data')
- corpus_registry_dir = os.path.join(corpus.path, 'registry')
- container_kwargs = {
- 'command': 'cqpserver',
- 'detach': True,
- 'volumes': [corpus_data_dir + ':/corpora/data:rw',
- corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
- 'name': 'cqpserver_{}'.format(corpus.id),
- 'network': 'nopaque_default'
- }
- container_image = \
- 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
- # Check if a cqpserver container already exists. If this is the case,
- # remove it and create a new one
- try:
- container = docker_client.containers.get(container_kwargs['name'])
- except docker.errors.NotFound:
- pass
- except docker.errors.APIError as e:
- logging.error(
- 'Get "{}" container raised '.format(container_kwargs['name'])
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- else:
try:
- container.remove(force=True)
+ self.docker.containers.run(container_image, **container_kwargs)
+ except docker.errors.ContainerError:
+ # This case should not occur, because detach is True.
+ logging.error(
+ 'Run "{}" container raised '.format(container_kwargs['name'])
+ + '"docker.errors.ContainerError" The container exits with a '
+ + 'non-zero exit code and detach is False.'
+ )
+ corpus.status = 'failed'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
+ except docker.errors.ImageNotFound:
+ logging.error(
+ 'Run "{}" container raised '.format(container_kwargs['name'])
+ + '"docker.errors.ImageNotFound" The specified image does not '
+ + 'exist.'
+ )
+ corpus.status = 'failed'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
except docker.errors.APIError as e:
logging.error(
- 'Remove "{}" container raised '.format(container_kwargs['name']) # noqa
+ 'Run "{}" container raised '.format(container_kwargs['name'])
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ else:
+ corpus.status = 'analysing'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
+
+ def remove_cqpserver_container(self, corpus):
+ container_name = 'cqpserver_{}'.format(corpus.id)
+ try:
+ container = self.docker.containers.get(container_name)
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Get "{}" container raised '.format(container_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
- try:
- docker_client.containers.run(container_image, **container_kwargs)
- except docker.errors.ContainerError:
- # This case should not occur, because detach is True.
- logging.error(
- 'Run "{}" container raised '.format(container_kwargs['name'])
- + '"docker.errors.ContainerError" The container exits with a '
- + 'non-zero exit code and detach is False.'
- )
- corpus.status = 'failed'
- except docker.errors.ImageNotFound:
- logging.error(
- 'Run "{}" container raised '.format(container_kwargs['name'])
- + '"docker.errors.ImageNotFound" The specified image does not '
- + 'exist.'
- )
- corpus.status = 'failed'
- except docker.errors.APIError as e:
- logging.error(
- 'Run "{}" container raised '.format(container_kwargs['name'])
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- else:
- corpus.status = 'analysing'
-
-
-def remove_cqpserver_container(corpus):
- container_name = 'cqpserver_{}'.format(corpus.id)
- try:
- container = docker_client.containers.get(container_name)
- except docker.errors.NotFound:
- pass
- except docker.errors.APIError as e:
- logging.error(
- 'Get "{}" container raised '.format(container_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- else:
- try:
- container.remove(force=True)
- except docker.errors.APIError as e:
- logging.error(
- 'Remove "{}" container raised '.format(container_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- corpus.status = 'prepared'
+ else:
+ try:
+ container.remove(force=True)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Remove "{}" container raised '.format(container_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
+ corpus.status = 'prepared'
+ patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}
+ self.buffer_user_patch_operation(corpus, patch_operation)
diff --git a/web/app/tasks/job_utils.py b/web/app/tasks/job_utils.py
index 19a75b81..e2635dda 100644
--- a/web/app/tasks/job_utils.py
+++ b/web/app/tasks/job_utils.py
@@ -1,82 +1,161 @@
from datetime import datetime
from werkzeug.utils import secure_filename
-from . import docker_client
from .. import db, mail
from ..email import create_message
-from ..models import JobResult
+from ..models import Job, JobResult
import docker
import logging
import json
import os
-def create_job_service(job):
- cmd = '{} -i /files -o /files/output'.format(job.service)
- if job.service == 'file-setup':
- cmd += ' -f {}'.format(secure_filename(job.title))
- cmd += ' --log-dir /files'
- cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
- cmd += ' ' + ' '.join(json.loads(job.service_args))
- service_kwargs = {'command': cmd,
- 'constraints': ['node.role==worker'],
- 'labels': {'origin': 'nopaque',
- 'type': 'service.{}'.format(job.service),
- 'job_id': str(job.id)},
- 'mounts': [job.path + ':/files:rw'],
- 'name': 'job_{}'.format(job.id),
- 'resources': docker.types.Resources(
- cpu_reservation=job.n_cores * (10 ** 9),
- mem_reservation=job.mem_mb * (10 ** 6)
- ),
- 'restart_policy': docker.types.RestartPolicy()}
- service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
- job.service, job.service_version)
- try:
- docker_client.services.create(service_image, **service_kwargs)
- except docker.errors.APIError as e:
- logging.error(
- 'Create "{}" service raised '.format(service_kwargs['name'])
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- else:
- job.status = 'queued'
- finally:
- send_notification(job)
+class CheckJobsMixin:
+ def check_jobs(self):
+ jobs = Job.query.all()
+ canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs))
+ queued_jobs = list(filter(lambda job: job.status == 'queued', jobs))
+ running_jobs = list(filter(lambda job: job.status == 'running', jobs))
+ submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs))
+ for job in submitted_jobs:
+ self.create_job_service(job)
+ for job in queued_jobs + running_jobs:
+ self.checkout_job_service(job)
+ for job in canceling_jobs:
+ self.remove_job_service(job)
-
-def checkout_job_service(job):
- service_name = 'job_{}'.format(job.id)
- try:
- service = docker_client.services.get(service_name)
- except docker.errors.NotFound:
- logging.error('Get "{}" service raised '.format(service_name)
- + '"docker.errors.NotFound" The service does not exist. '
- + '(job.status: {} -> failed)'.format(job.status))
- job.status = 'failed'
- except docker.errors.APIError as e:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- except docker.errors.InvalidVersion:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.InvalidVersion" One of the arguments is '
- + 'not supported with the current API version.'
- )
- return
- else:
- service_tasks = service.tasks()
- if not service_tasks:
+ def create_job_service(self, job):
+ cmd = '{} -i /files -o /files/output'.format(job.service)
+ if job.service == 'file-setup':
+ cmd += ' -f {}'.format(secure_filename(job.title))
+ cmd += ' --log-dir /files'
+ cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
+ cmd += ' ' + ' '.join(json.loads(job.service_args))
+ service_kwargs = {'command': cmd,
+ 'constraints': ['node.role==worker'],
+ 'labels': {'origin': 'nopaque',
+ 'type': 'service.{}'.format(job.service),
+ 'job_id': str(job.id)},
+ 'mounts': [job.path + ':/files:rw'],
+ 'name': 'job_{}'.format(job.id),
+ 'resources': docker.types.Resources(
+ cpu_reservation=job.n_cores * (10 ** 9),
+ mem_reservation=job.mem_mb * (10 ** 6)
+ ),
+ 'restart_policy': docker.types.RestartPolicy()}
+ service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version)
+ try:
+ self.docker.services.create(service_image, **service_kwargs)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Create "{}" service raised '.format(service_kwargs['name'])
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
return
- task_state = service_tasks[0].get('Status').get('State')
- if job.status == 'queued' and task_state != 'pending':
- job.status = 'running'
- elif job.status == 'running' and task_state in ['complete', 'failed']:
+ else:
+ job.status = 'queued'
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
+ self.buffer_user_patch_operation(job, patch_operation)
+ finally:
+ self.send_job_notification(job)
+
+ def checkout_job_service(self, job):
+ service_name = 'job_{}'.format(job.id)
+ try:
+ service = self.docker.services.get(service_name)
+ except docker.errors.NotFound:
+ logging.error('Get "{}" service raised '.format(service_name)
+ + '"docker.errors.NotFound" The service does not exist. '
+ + '(job.status: {} -> failed)'.format(job.status))
+ job.status = 'failed'
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
+ self.buffer_user_patch_operation(job, patch_operation)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
+ except docker.errors.InvalidVersion:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.InvalidVersion" One of the arguments is '
+ + 'not supported with the current API version.'
+ )
+ return
+ else:
+ service_tasks = service.tasks()
+ if not service_tasks:
+ return
+ task_state = service_tasks[0].get('Status').get('State')
+ if job.status == 'queued' and task_state != 'pending':
+ job.status = 'running'
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
+ self.buffer_user_patch_operation(job, patch_operation)
+ elif job.status == 'running' and task_state in ['complete', 'failed']:
+ try:
+ service.remove()
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Remove "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
+ else:
+ if task_state == 'complete':
+ results_dir = os.path.join(job.path, 'output')
+ result_files = filter(lambda x: x.endswith('.zip'),
+ os.listdir(results_dir))
+ for result_file in result_files:
+ job_result = JobResult(filename=result_file, job=job)
+ db.session.add(job_result)
+ db.session.flush()
+ db.session.refresh(job_result)
+ patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()}
+ self.buffer_user_patch_operation(job, patch_operation)
+ job.end_date = datetime.utcnow()
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()}
+ self.buffer_user_patch_operation(job, patch_operation)
+ job.status = task_state
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
+ self.buffer_user_patch_operation(job, patch_operation)
+ finally:
+ self.send_job_notification(job)
+
+ def remove_job_service(self, job):
+ service_name = 'job_{}'.format(job.id)
+ try:
+ service = self.docker.services.get(service_name)
+ except docker.errors.NotFound:
+ job.status = 'canceled'
+ patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status}
+ self.buffer_user_patch_operation(job, patch_operation)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
+ except docker.errors.InvalidVersion:
+ logging.error(
+ 'Get "{}" service raised '.format(service_name)
+ + '"docker.errors.InvalidVersion" One of the arguments is '
+ + 'not supported with the current API version.'
+ )
+ return
+ else:
+ try:
+ service.update(mounts=None)
+ except docker.errors.APIError as e:
+ logging.error(
+ 'Update "{}" service raised '.format(service_name)
+ + '"docker.errors.APIError" The server returned an error. '
+ + 'Details: {}'.format(e)
+ )
+ return
try:
service.remove()
except docker.errors.APIError as e:
@@ -85,68 +164,14 @@ def checkout_job_service(job):
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
- return
- else:
- if task_state == 'complete':
- job_results_dir = os.path.join(job.path, 'output')
- job_results = filter(lambda x: x.endswith('.zip'),
- os.listdir(job_results_dir))
- for job_result in job_results:
- job_result = JobResult(filename=job_result, job=job)
- db.session.add(job_result)
- job.end_date = datetime.utcnow()
- job.status = task_state
- finally:
- send_notification(job)
-
-def remove_job_service(job):
- service_name = 'job_{}'.format(job.id)
- try:
- service = docker_client.services.get(service_name)
- except docker.errors.NotFound:
- job.status = 'canceled'
- except docker.errors.APIError as e:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
- return
- except docker.errors.InvalidVersion:
- logging.error(
- 'Get "{}" service raised '.format(service_name)
- + '"docker.errors.InvalidVersion" One of the arguments is '
- + 'not supported with the current API version.'
- )
- return
- else:
- try:
- service.update(mounts=None)
- except docker.errors.APIError as e:
- logging.error(
- 'Update "{}" service raised '.format(service_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
+ def send_job_notification(self, job):
+ if job.creator.setting_job_status_mail_notifications == 'none':
return
- try:
- service.remove()
- except docker.errors.APIError as e:
- logging.error(
- 'Remove "{}" service raised '.format(service_name)
- + '"docker.errors.APIError" The server returned an error. '
- + 'Details: {}'.format(e)
- )
-
-
-def send_notification(job):
- if job.creator.setting_job_status_mail_notifications == 'none':
- return
- if (job.creator.setting_job_status_mail_notifications == 'end'
- and job.status not in ['complete', 'failed']):
- return
- msg = create_message(job.creator.email,
- 'Status update for your Job "{}"'.format(job.title),
- 'tasks/email/notification', job=job)
- mail.send(msg)
+ if (job.creator.setting_job_status_mail_notifications == 'end'
+ and job.status not in ['complete', 'failed']):
+ return
+ msg = create_message(job.creator.email,
+ 'Status update for your Job "{}"'.format(job.title),
+ 'tasks/email/notification', job=job)
+ mail.send(msg)
diff --git a/web/nopaque.py b/web/nopaque.py
index 5c5c5af5..16662a23 100644
--- a/web/nopaque.py
+++ b/web/nopaque.py
@@ -50,9 +50,8 @@ def deploy():
@app.cli.command()
def tasks():
- from app.tasks import check_corpora, check_jobs
- check_corpora()
- check_jobs()
+ from app.tasks import task_runner
+ task_runner.run()
@app.cli.command()