diff --git a/docker-compose.development.yml b/docker-compose.development.yml index 0a3248db..d1ac7719 100644 --- a/docker-compose.development.yml +++ b/docker-compose.development.yml @@ -13,11 +13,3 @@ services: - "./web/nopaque.py:/home/nopaque/nopaque.py" - "./web/requirements.txt:/home/nopaque/requirements.txt" - "./web/tests:/home/nopaque/tests" - nopaqued: - volumes: - # Mount code as volumes - - "./daemon/app:/home/nopaqued/app" - - "./daemon/boot.sh:/home/nopaqued/boot.sh" - - "./daemon/config.py:/home/nopaqued/config.py" - - "./daemon/nopaqued.py:/home/nopaqued/nopaqued.py" - - "./daemon/requirements.txt:/home/nopaqued/requirements.txt" diff --git a/docker-compose.yml b/docker-compose.yml index fe90fada..57f8b5bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,23 @@ version: "3.5" services: + db: + env_file: db.env + image: postgres:11 + restart: unless-stopped + volumes: + - "${HOST_DB_DIR:-./db}:/var/lib/postgresql/data" + + mq: + image: redis:6 + restart: unless-stopped + volumes: + - "${HOST_MQ_DIR:-./mq}:/data" + nopaque: build: args: + DOCKER_GID: ${HOST_DOCKER_GID} GID: ${HOST_GID} UID: ${HOST_UID} context: ./web @@ -16,31 +30,3 @@ services: volumes: - "${NOPAQUE_DATA_DIR:-/mnt/nopaque}:${NOPAQUE_DATA_DIR:-/mnt/nopaque}" - "${HOST_NOPAQUE_LOG_FILE-./nopaque.log}:${NOPAQUE_LOG_FILE:-/home/nopaque/nopaque.log}" - nopaqued: - build: - args: - DOCKER_GID: ${HOST_DOCKER_GID} - GID: ${HOST_GID} - UID: ${HOST_UID} - context: ./daemon - depends_on: - - db - - nopaque - env_file: .env - image: nopaqued:development - restart: unless-stopped - volumes: - - "/var/run/docker.sock:/var/run/docker.sock" - - "${NOPAQUE_DATA_DIR:-/mnt/nopaque}:${NOPAQUE_DATA_DIR:-/mnt/nopaque}" - - "${HOST_NOPAQUE_DAEMON_LOG_FILE-./nopaqued.log}:${NOPAQUE_DAEMON_LOG_FILE:-/home/nopaqued/nopaqued.log}" - db: - env_file: db.env - image: postgres:11 - restart: unless-stopped - volumes: - - "${HOST_DB_DIR:-./db}:/var/lib/postgresql/data" - mq: - image: redis:6 - restart: unless-stopped - volumes: - - "${HOST_MQ_DIR:-./mq}:/data" diff --git a/web/.flaskenv b/web/.flaskenv new file mode 100644 index 00000000..1fd672d3 --- /dev/null +++ b/web/.flaskenv @@ -0,0 +1 @@ +FLASK_APP=nopaque.py diff --git a/web/Dockerfile b/web/Dockerfile index 1b41b48a..4d8037e1 100644 --- a/web/Dockerfile +++ b/web/Dockerfile @@ -4,9 +4,9 @@ FROM python:3.9.0-slim-buster LABEL authors="Patrick Jentsch , Stephan Porada " +ARG DOCKER_GID ARG UID ARG GID -ENV FLASK_APP=nopaque.py ENV LANG=C.UTF-8 diff --git a/web/app/__init__.py b/web/app/__init__.py index a39a51a5..75108bd3 100644 --- a/web/app/__init__.py +++ b/web/app/__init__.py @@ -38,6 +38,7 @@ def create_app(config_name): from .main import main as main_blueprint from .services import services as services_blueprint from .settings import settings as settings_blueprint + app.register_blueprint(admin_blueprint, url_prefix='/admin') app.register_blueprint(auth_blueprint, url_prefix='/auth') app.register_blueprint(corpora_blueprint, url_prefix='/corpora') diff --git a/web/app/daemon.py b/web/app/daemon.py deleted file mode 100644 index 9e9d27c9..00000000 --- a/web/app/daemon.py +++ /dev/null @@ -1,43 +0,0 @@ -from app import create_app -from time import sleep -from ..decorators import background -import docker - - -app = create_app() -docker_client = docker.from_env() - -app.app_context().push() -from . import check_corpora, check_jobs, notify # noqa - - -def run(): - check_corpora_thread = check_corpora() - check_jobs_thread = check_jobs() - notify_thread = notify() - - while True: - if not check_corpora_thread.is_alive(): - check_corpora_thread = check_corpora() - if not check_jobs_thread.is_alive(): - check_jobs_thread = check_jobs() - if not notify_thread.is_alive(): - notify_thread = notify() - sleep(3) - - -@background -def check_corpora(): - corpora = Corpus.query.all() - for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): - __create_build_corpus_service(corpus) - for corpus in filter(lambda corpus: (corpus.status == 'queued' - or corpus.status == 'running'), - corpora): - __checkout_build_corpus_service(corpus) - for corpus in filter(lambda corpus: corpus.status == 'start analysis', - corpora): - __create_cqpserver_container(corpus) - for corpus in filter(lambda corpus: corpus.status == 'stop analysis', - corpora): - __remove_cqpserver_container(corpus) diff --git a/web/app/daemon/check_corpora.py b/web/app/daemon/check_corpora.py deleted file mode 100644 index 1150ea6c..00000000 --- a/web/app/daemon/check_corpora.py +++ /dev/null @@ -1,139 +0,0 @@ -from . import docker_client -from .. import db -from ..decorators import background -from ..models import Corpus -import docker -import logging -import os -import shutil - - -@background -def check_corpora(): - corpora = Corpus.query.all() - for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): - __create_build_corpus_service(corpus) - for corpus in filter(lambda corpus: (corpus.status == 'queued' - or corpus.status == 'running'), - corpora): - __checkout_build_corpus_service(corpus) - for corpus in filter(lambda corpus: corpus.status == 'start analysis', - corpora): - __create_cqpserver_container(corpus) - for corpus in filter(lambda corpus: corpus.status == 'stop analysis', - corpora): - __remove_cqpserver_container(corpus) - db.session.commit() - Session.remove() - - -def __create_build_corpus_service(corpus): - corpus_dir = os.path.join(config.DATA_DIR, - str(corpus.user_id), - 'corpora', - str(corpus.id)) - corpus_data_dir = os.path.join(corpus_dir, 'data') - corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt') - corpus_registry_dir = os.path.join(corpus_dir, 'registry') - if os.path.exists(corpus_data_dir): - shutil.rmtree(corpus_data_dir) - if os.path.exists(corpus_registry_dir): - shutil.rmtree(corpus_registry_dir) - os.mkdir(corpus_data_dir) - os.mkdir(corpus_registry_dir) - service_args = {'command': 'docker-entrypoint.sh build-corpus', - 'constraints': ['node.role==worker'], - 'labels': {'origin': 'nopaque', - 'type': 'corpus.prepare', - 'corpus_id': str(corpus.id)}, - 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', - corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'build-corpus_{}'.format(corpus.id), - 'restart_policy': docker.types.RestartPolicy()} - service_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' - try: - service = docker_client.services.get(service_args['name']) - except docker.errors.NotFound: - pass - except docker.errors.DockerException: - return - else: - service.remove() - try: - docker_client.services.create(service_image, **service_args) - except docker.errors.DockerException: - corpus.status = 'failed' - else: - corpus.status = 'queued' - - -def __checkout_build_corpus_service(corpus): - service_name = 'build-corpus_{}'.format(corpus.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - logging.error('__checkout_build_corpus_service({}):'.format(corpus.id) - + ' The service does not exist.' - + ' (stauts: {} -> failed)'.format(corpus.status)) - corpus.status = 'failed' - return - except docker.errors.DockerException: - return - service_tasks = service.tasks() - if not service_tasks: - return - task_state = service_tasks[0].get('Status').get('State') - if corpus.status == 'queued' and task_state != 'pending': - corpus.status = 'running' - elif corpus.status == 'running' and task_state == 'complete': - service.remove() - corpus.status = 'prepared' - elif corpus.status == 'running' and task_state == 'failed': - service.remove() - corpus.status = task_state - - -def __create_cqpserver_container(corpus): - corpus_dir = os.path.join(config.DATA_DIR, - str(corpus.user_id), - 'corpora', - str(corpus.id)) - corpus_data_dir = os.path.join(corpus_dir, 'data') - corpus_registry_dir = os.path.join(corpus_dir, 'registry') - container_args = {'command': 'cqpserver', - 'detach': True, - 'volumes': [corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'cqpserver_{}'.format(corpus.id), - 'network': 'nopaque_default'} - container_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' - try: - container = docker_client.containers.get(container_args['name']) - except docker.errors.NotFound: - pass - except docker.errors.DockerException: - return - else: - container.remove(force=True) - try: - docker_client.containers.run(container_image, **container_args) - except docker.errors.DockerException: - return - else: - corpus.status = 'analysing' - - -def __remove_cqpserver_container(corpus): - container_name = 'cqpserver_{}'.format(corpus.id) - try: - container = docker_client.containers.get(container_name) - except docker.errors.NotFound: - pass - except docker.errors.DockerException: - return - else: - container.remove(force=True) - corpus.status = 'prepared' diff --git a/web/app/daemon/check_jobs.py b/web/app/daemon/check_jobs.py deleted file mode 100644 index f5530e1e..00000000 --- a/web/app/daemon/check_jobs.py +++ /dev/null @@ -1,147 +0,0 @@ -from datetime import datetime -from .. import configuration as config -from .. import docker_client, Session -from ..decorators import background -from ..models import Job, JobResult, NotificationData, NotificationEmailData -import docker -import logging -import json -import os - - -@background -def check_jobs(): - session = Session() - jobs = session.query(Job).all() - for job in filter(lambda job: job.status == 'submitted', jobs): - __create_job_service(job) - for job in filter(lambda job: job.status == 'queued', jobs): - __checkout_job_service(job, session) - __add_notification_data(job, 'queued', session) - for job in filter(lambda job: job.status == 'running', jobs): - __checkout_job_service(job, session) - __add_notification_data(job, 'running', session) - for job in filter(lambda job: job.status == 'complete', jobs): - __add_notification_data(job, 'complete', session) - for job in filter(lambda job: job.status == 'failed', jobs): - __add_notification_data(job, 'failed', session) - for job in filter(lambda job: job.status == 'canceling', jobs): - __remove_job_service(job) - session.commit() - Session.remove() - - -def __add_notification_data(job, notified_on_status, session): - # checks if user wants any notifications at all - if (job.user.setting_job_status_mail_notifications == 'none'): - return - # checks if user wants only notification on completed jobs - elif (job.user.setting_job_status_mail_notifications == 'end' - and notified_on_status != 'complete'): - return - else: - # check if a job already has associated NotificationData - notification_exists = len(job.notification_data) - # create notification_data for current job if there is none - if (notification_exists == 0): - notification_data = NotificationData(job_id=job.id) - session.add(notification_data) - # If no commit job will have no NotificationData - session.commit() - if (job.notification_data[0].notified_on != notified_on_status): - notification_email_data = NotificationEmailData(job_id=job.id) - notification_email_data.notify_status = notified_on_status - notification_email_data.creation_date = datetime.utcnow() - job.notification_data[0].notified_on = notified_on_status - session.add(notification_email_data) - - -def __create_job_service(job): - job_dir = os.path.join(config.DATA_DIR, - str(job.user_id), - 'jobs', - str(job.id)) - cmd = '{} -i /files -o /files/output'.format(job.service) - if job.service == 'file-setup': - cmd += ' -f {}'.format(job.secure_filename) - cmd += ' --log-dir /files' - cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename) - cmd += ' ' + ' '.join(json.loads(job.service_args)) - service_args = {'command': cmd, - 'constraints': ['node.role==worker'], - 'labels': {'origin': 'nopaque', - 'type': 'service.{}'.format(job.service), - 'job_id': str(job.id)}, - 'mounts': [job_dir + ':/files:rw'], - 'name': 'job_{}'.format(job.id), - 'resources': docker.types.Resources( - cpu_reservation=job.n_cores * (10 ** 9), - mem_reservation=job.mem_mb * (10 ** 6)), - 'restart_policy': docker.types.RestartPolicy()} - service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/' - + job.service + ':' + job.service_version) - try: - service = docker_client.services.get(service_args['name']) - except docker.errors.NotFound: - pass - except docker.errors.DockerException: - return - else: - service.remove() - try: - docker_client.services.create(service_image, **service_args) - except docker.errors.DockerException: - job.status = 'failed' - else: - job.status = 'queued' - - -def __checkout_job_service(job, session): - service_name = 'job_{}'.format(job.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - logging.error('__checkout_job_service({}): '.format(job.id) - + 'The service does not exist. ' - + '(status: {} -> failed)'.format(job.status)) - job.status = 'failed' - return - except docker.errors.DockerException: - return - service_tasks = service.tasks() - if not service_tasks: - return - task_state = service_tasks[0].get('Status').get('State') - if job.status == 'queued' and task_state != 'pending': - job.status = 'running' - elif (job.status == 'running' - and (task_state == 'complete' or task_state == 'failed')): - service.remove() - job.end_date = datetime.utcnow() - job.status = task_state - if task_state == 'complete': - results_dir = os.path.join(config.DATA_DIR, - str(job.user_id), - 'jobs', - str(job.id), - 'output') - results = filter(lambda x: x.endswith('.zip'), - os.listdir(results_dir)) - for result in results: - job_result = JobResult(dir=results_dir, - filename=result, - job_id=job.id) - session.add(job_result) - - -def __remove_job_service(job): - service_name = 'job_{}'.format(job.id) - try: - service = docker_client.services.get(service_name) - except docker.errors.NotFound: - job.status = 'canceled' - except docker.errors.DockerException: - return - else: - service.update(mounts=None) - service.remove() diff --git a/web/app/daemon/libnotify/__init__.py b/web/app/daemon/libnotify/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/web/app/daemon/libnotify/notification.py b/web/app/daemon/libnotify/notification.py deleted file mode 100644 index 488471c3..00000000 --- a/web/app/daemon/libnotify/notification.py +++ /dev/null @@ -1,28 +0,0 @@ -from email.message import EmailMessage - - -class Notification(EmailMessage): - """docstring for Email.""" - - def set_notification_content(self, - subject_template, - subject_template_values_dict, - body_txt_template_path, - body_html_template_path, - body_template_values_dict): - # Create subject with subject_template_values_dict - self['subject'] = subject_template.format( - **subject_template_values_dict) - # Open template files and insert values from body_template_values_dict - with open(body_txt_template_path) as nfile: - self.body = nfile.read().format(**body_template_values_dict) - with open(body_html_template_path) as nfile: - self.html = nfile.read().format(**body_template_values_dict) - # Set txt of email - self.set_content(self.body) - # Set html alternative - self.add_alternative(self.html, subtype='html') - - def set_addresses(self, sender, recipient): - self['From'] = sender - self['to'] = recipient diff --git a/web/app/daemon/libnotify/service.py b/web/app/daemon/libnotify/service.py deleted file mode 100644 index 633fb386..00000000 --- a/web/app/daemon/libnotify/service.py +++ /dev/null @@ -1,16 +0,0 @@ -class NotificationService: - """This is a nopaque notifcation service object.""" - - def __init__(self, smtp): - # Bool to show if the mail server stoped sending mails due to exceeding - # its sending limit - self.mail_limit_exceeded = False - # Holds due to an error unsent email notifications - self.not_sent = {} - self.smtp = smtp - - def send(self, email): - self.smtp.send_message(email) - - def quit(self): - self.smtp.quit() diff --git a/web/app/daemon/libnotify/templates/notification.html b/web/app/daemon/libnotify/templates/notification.html deleted file mode 100644 index e2edfe75..00000000 --- a/web/app/daemon/libnotify/templates/notification.html +++ /dev/null @@ -1,15 +0,0 @@ - - -

Dear {username},

- -

The status of your Job/Corpus({id}) with the title "{title}" has changed!

-

It is now {status}!

-

Time of this status update was: {time} UTC

- -

You can access your Job/Corpus here: {url} -

- -

Kind regards!
- Your nopaque team

- - diff --git a/web/app/daemon/libnotify/templates/notification.txt b/web/app/daemon/libnotify/templates/notification.txt deleted file mode 100644 index 0e221c54..00000000 --- a/web/app/daemon/libnotify/templates/notification.txt +++ /dev/null @@ -1,10 +0,0 @@ -Dear {username}, - -The status of your Job/Corpus({id}) with the title "{title}" has changed! -It is now {status}! -Time of this status update was: {time} UTC - -You can access your Job/Corpus here: {url} - -Kind regards! -Your nopaque team \ No newline at end of file diff --git a/web/app/daemon/notify.py b/web/app/daemon/notify.py deleted file mode 100644 index 5d3d23f3..00000000 --- a/web/app/daemon/notify.py +++ /dev/null @@ -1,111 +0,0 @@ -from sqlalchemy import asc -from .libnotify.notification import Notification -from .libnotify.service import NotificationService -from .. import configuration as config -from .. import Session -from ..decorators import background -from ..models import NotificationEmailData -import logging -import os -import smtplib - - -ROOT_DIR = os.path.abspath(os.path.dirname(__file__)) - - -@background -def notify(): - session = Session() - if config.SMTP_USE_SSL: - smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT) - else: - smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT) - if config.SMTP_USE_TLS: - smtp.starttls() - try: - smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD) - except smtplib.SMTPHeloError: - logging.warning('The server didn’t reply properly to the HELO ' - 'greeting.') - return - except smtplib.SMTPAuthenticationError as e: - logging.warning('The server didn’t accept the username/password ' - 'combination.') - logging.warning(e) - return - except smtplib.SMTPNotSupportedError: - logging.warning('The AUTH command is not supported by the server.') - return - except smtplib.SMTPException: - logging.warning('No suitable authentication method was found.') - return - notification_service = NotificationService(smtp) - # create notifications (content, recipient etc.) - notifications = __create_mail_notifications(notification_service, session) - # only login and send mails if there are any notifications - if (len(notifications) > 0): - # combine new and unsent notifications - notifications.update(notification_service.not_sent) - # send all notifications - __send_mail_notifications(notifications, notification_service) - # remove unsent notifications because they have been sent now - # but only if mail limit has not been exceeded - if (notification_service.mail_limit_exceeded is not True): - notification_service.not_sent = {} - smtp.quit() - Session.remove() - - -# Email notification functions -def __create_mail_notifications(notification_service, session): - notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa - notifications = {} - for data in notification_email_data: - notification = Notification() - notification.set_addresses(config.SMTP_DEFAULT_SENDER, - data.job.user.email) - subject_template = ('[nopaque] Status update for your Job/Corpora: ' - '{title}!') - subject_template_values_dict = {'title': data.job.title} - url = '{}://{}/{}/{}'.format(config.PROTOCOL, - config.DOMAIN, - 'jobs', - data.job.id) - body_template_values_dict = {'username': data.job.user.username, - 'id': data.job.id, - 'title': data.job.title, - 'status': data.notify_status, - 'time': data.creation_date, - 'url': url} - txt_tmplt = os.path.join(ROOT_DIR, - 'libnotify/templates/notification.txt') - html_tmplt = os.path.join(ROOT_DIR, - 'libnotify/templates/notification.html') - notification.set_notification_content(subject_template, - subject_template_values_dict, - txt_tmplt, - html_tmplt, - body_template_values_dict) - notifications[data.job.id] = notification - # Using a dictionary for notifications avoids sending multiple mails - # if the status of a job changes in a few seconds. The user will not - # get swamped with mails for queued, running and complete if those - # happen in in a few seconds. Only the last update will be sent. - # This depends on the sleep time interval though. - session.delete(data) - session.commit() - return notifications - - -def __send_mail_notifications(notifications, notification_service): - for key, notification in notifications.items(): - try: - notification_service.send(notification) - notification_service.mail_limit_exceeded = False - except Exception: - # Adds notifications to unsent if mail server exceded limit for - # consecutive mail sending - logging.warning('limit') - notification_service.not_sent[key] = notification - notification_service.mail_limit_exceeded = True - notification_service.not_sent.update(notifications) diff --git a/web/app/tasks/__init__.py b/web/app/tasks/__init__.py new file mode 100644 index 00000000..ba33a1fe --- /dev/null +++ b/web/app/tasks/__init__.py @@ -0,0 +1,35 @@ +from .. import db +from ..models import Corpus, Job +import docker + + +docker_client = docker.from_env() +from . import corpus_utils, job_utils # noqa + + +def check_corpora(): + corpora = Corpus.query.all() + for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): + corpus_utils.create_build_corpus_service(corpus) + for corpus in filter(lambda corpus: (corpus.status == 'queued' + or corpus.status == 'running'), + corpora): + corpus_utils.checkout_build_corpus_service(corpus) + for corpus in filter(lambda corpus: corpus.status == 'start analysis', + corpora): + corpus_utils.create_cqpserver_container(corpus) + for corpus in filter(lambda corpus: corpus.status == 'stop analysis', + corpora): + corpus_utils.remove_cqpserver_container(corpus) + db.session.commit() + + +def check_jobs(): + jobs = Job.query.all() + for job in filter(lambda job: job.status == 'submitted', jobs): + job_utils.create_job_service(job) + for job in filter(lambda job: job.status == 'queued', jobs): + job_utils.checkout_job_service(job) + for job in filter(lambda job: job.status == 'running', jobs): + job_utils.checkout_job_service(job) + db.session.commit() diff --git a/web/app/tasks/corpus_utils.py b/web/app/tasks/corpus_utils.py new file mode 100644 index 00000000..c06b19ac --- /dev/null +++ b/web/app/tasks/corpus_utils.py @@ -0,0 +1,120 @@ +from flask import current_app +from . import docker_client +import docker +import logging +import os +import shutil + + +def create_build_corpus_service(corpus): + corpus_dir = os.path.join(current_app.config['DATA_DIR'], + str(corpus.user_id), + 'corpora', + str(corpus.id)) + corpus_data_dir = os.path.join(corpus_dir, 'data') + corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt') + corpus_registry_dir = os.path.join(corpus_dir, 'registry') + if os.path.exists(corpus_data_dir): + shutil.rmtree(corpus_data_dir) + if os.path.exists(corpus_registry_dir): + shutil.rmtree(corpus_registry_dir) + os.mkdir(corpus_data_dir) + os.mkdir(corpus_registry_dir) + service_args = { + 'command': 'docker-entrypoint.sh build-corpus', + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'corpus.prepare', + 'corpus_id': str(corpus.id)}, + 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', + corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'build-corpus_{}'.format(corpus.id), + 'restart_policy': docker.types.RestartPolicy() + } + service_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + try: + docker_client.services.create(service_image, **service_args) + except docker.errors.APIError as e: + logging.error('create_build_corpus_service({}): '.format(corpus.id) + + '{} (status: {} -> failed)'.format(e, corpus.status)) + corpus.status = 'failed' + else: + corpus.status = 'queued' + finally: + # TODO: send email + pass + + +def checkout_build_corpus_service(corpus): + service_name = 'build-corpus_{}'.format(corpus.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound as e: + logging.error('checkout_build_corpus_service({}):'.format(corpus.id) + + ' {} (stauts: {} -> failed)'.format(e, corpus.status)) + corpus.status = 'failed' + # TODO: handle docker.errors.APIError and docker.errors.InvalidVersion + else: + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if corpus.status == 'queued' and task_state != 'pending': + corpus.status = 'running' + elif corpus.status == 'running' and task_state == 'complete': + service.remove() + corpus.status = 'prepared' + elif corpus.status == 'running' and task_state == 'failed': + service.remove() + corpus.status = task_state + finally: + # TODO: send email + pass + + +def create_cqpserver_container(corpus): + corpus_dir = os.path.join(current_app.config['DATA_DIR'], + str(corpus.user_id), + 'corpora', + str(corpus.id)) + corpus_data_dir = os.path.join(corpus_dir, 'data') + corpus_registry_dir = os.path.join(corpus_dir, 'registry') + container_args = { + 'command': 'cqpserver', + 'detach': True, + 'volumes': [corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'cqpserver_{}'.format(corpus.id), + 'network': 'nopaque_default' + } + container_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + try: + container = docker_client.containers.get(container_args['name']) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + container.remove(force=True) + try: + docker_client.containers.run(container_image, **container_args) + except docker.errors.DockerException: + return + else: + corpus.status = 'analysing' + + +def remove_cqpserver_container(corpus): + container_name = 'cqpserver_{}'.format(corpus.id) + try: + container = docker_client.containers.get(container_name) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + container.remove(force=True) + corpus.status = 'prepared' diff --git a/web/app/tasks/job_utils.py b/web/app/tasks/job_utils.py new file mode 100644 index 00000000..2094e7cd --- /dev/null +++ b/web/app/tasks/job_utils.py @@ -0,0 +1,101 @@ +from datetime import datetime +from flask import current_app +from . import docker_client +from .. import db +from ..models import JobResult +import docker +import logging +import json +import os + + +def create_job_service(job): + job_dir = os.path.join(current_app.config['DATA_DIR'], + str(job.user_id), + 'jobs', + str(job.id)) + cmd = '{} -i /files -o /files/output'.format(job.service) + if job.service == 'file-setup': + cmd += ' -f {}'.format(job.secure_filename) + cmd += ' --log-dir /files' + cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename) + cmd += ' ' + ' '.join(json.loads(job.service_args)) + service_args = {'command': cmd, + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'service.{}'.format(job.service), + 'job_id': str(job.id)}, + 'mounts': [job_dir + ':/files:rw'], + 'name': 'job_{}'.format(job.id), + 'resources': docker.types.Resources( + cpu_reservation=job.n_cores * (10 ** 9), + mem_reservation=job.mem_mb * (10 ** 6)), + 'restart_policy': docker.types.RestartPolicy()} + service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/' + + job.service + ':' + job.service_version) + try: + docker_client.services.create(service_image, **service_args) + except docker.errors.APIError as e: + logging.error('create_job_service({}): {} '.format(job.id, e) + + '(status: {} -> failed)'.format(job.status)) + job.status = 'failed' + else: + job.status = 'queued' + finally: + # TODO: send email + pass + + +def checkout_job_service(job): + service_name = 'job_{}'.format(job.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound as e: + logging.error('checkout_job_service({}): {} '.format(job.id, e) + + '(status: {} -> submitted)'.format(job.status)) + job.status = 'submitted' + # TODO: handle docker.errors.APIError and docker.errors.InvalidVersion + else: + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if job.status == 'queued' and task_state != 'pending': + job.status = 'running' + elif job.status == 'queued' and task_state == 'complete': + service.remove() + job.end_date = datetime.utcnow() + job.status = task_state + if task_state == 'complete': + results_dir = os.path.join(current_app.config['DATA_DIR'], + str(job.user_id), + 'jobs', + str(job.id), + 'output') + results = filter(lambda x: x.endswith('.zip'), + os.listdir(results_dir)) + for result in results: + job_result = JobResult(dir=results_dir, + filename=result, + job_id=job.id) + db.session.add(job_result) + elif job.status == 'running' and task_state == 'failed': + service.remove() + job.end_date = datetime.utcnow() + job.status = task_state + finally: + # TODO: send email + pass + + +def remove_job_service(job): + service_name = 'job_{}'.format(job.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound: + # TODO: send email + job.status = 'canceled' + # TODO: handle docker.errors.APIError and docker.errors.InvalidVersion + else: + service.update(mounts=None) + service.remove() diff --git a/web/app/templates/tasks/email/notification.html.j2 b/web/app/templates/tasks/email/notification.html.j2 new file mode 100644 index 00000000..79f0e2dd --- /dev/null +++ b/web/app/templates/tasks/email/notification.html.j2 @@ -0,0 +1,9 @@ +

Dear {{ user.username }},

+ +

The status of your Job/Corpus({{ job.id }}) with the title "{{ job.title }}" has changed!

+

It is now {{ job.status }}!

+

Time of this status update was: {time} UTC

+ +

You can access your Job/Corpus here: {{ url_for('jobs.job', job_id=job.id) }}

+ +

Kind regards!
Your nopaque team

diff --git a/web/app/templates/tasks/email/notification.txt.j2 b/web/app/templates/tasks/email/notification.txt.j2 new file mode 100644 index 00000000..25d797c8 --- /dev/null +++ b/web/app/templates/tasks/email/notification.txt.j2 @@ -0,0 +1,10 @@ +Dear {{ user.username }}, + +The status of your Job/Corpus({{ job.id }}) with the title "{{ job.title }}" has changed! +It is now {{ job.status }}! +Time of this status update was: {time} UTC + +You can access your Job/Corpus here: {{ url_for('jobs.job', job_id=job.id) }} + +Kind regards! +Your nopaque team diff --git a/web/boot.sh b/web/boot.sh index 0d088ac2..f39bb4c8 100755 --- a/web/boot.sh +++ b/web/boot.sh @@ -1,15 +1,15 @@ #!/bin/bash source venv/bin/activate -export FLASK_APP=nopaque.py +while true; do + flask deploy + if [[ "$?" == "0" ]]; then + break + fi + echo Deploy command failed, retrying in 5 secs... + sleep 5 +done + if [[ "$#" -eq 0 ]]; then - while true; do - flask deploy - if [[ "$?" == "0" ]]; then - break - fi - echo Deploy command failed, retrying in 5 secs... - sleep 5 - done python nopaque.py elif [[ "$1" == "flask" ]]; then exec ${@:1} diff --git a/web/nopaque.py b/web/nopaque.py index d636cfd5..43d69c38 100644 --- a/web/nopaque.py +++ b/web/nopaque.py @@ -51,6 +51,13 @@ def deploy(): Role.insert_roles() +@app.cli.command() +def tasks(): + from app.tasks import process_corpora, process_jobs + process_corpora() + process_jobs() + + @app.cli.command() def test(): """Run the unit tests."""