From 0f30e518afbe0a212063a0958ea2ec13bdae16cc Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Fri, 6 Nov 2020 15:07:58 +0100
Subject: [PATCH] First steps
---
web/app/daemon.py | 43 +++++
web/app/daemon/check_corpora.py | 139 +++++++++++++++++
web/app/daemon/check_jobs.py | 147 ++++++++++++++++++
web/app/daemon/libnotify/__init__.py | 0
web/app/daemon/libnotify/notification.py | 28 ++++
web/app/daemon/libnotify/service.py | 16 ++
.../libnotify/templates/notification.html | 15 ++
.../libnotify/templates/notification.txt | 10 ++
web/app/daemon/notify.py | 111 +++++++++++++
web/requirements.txt | 1 +
10 files changed, 510 insertions(+)
create mode 100644 web/app/daemon.py
create mode 100644 web/app/daemon/check_corpora.py
create mode 100644 web/app/daemon/check_jobs.py
create mode 100644 web/app/daemon/libnotify/__init__.py
create mode 100644 web/app/daemon/libnotify/notification.py
create mode 100644 web/app/daemon/libnotify/service.py
create mode 100644 web/app/daemon/libnotify/templates/notification.html
create mode 100644 web/app/daemon/libnotify/templates/notification.txt
create mode 100644 web/app/daemon/notify.py
diff --git a/web/app/daemon.py b/web/app/daemon.py
new file mode 100644
index 00000000..9e9d27c9
--- /dev/null
+++ b/web/app/daemon.py
@@ -0,0 +1,43 @@
+from app import create_app
+from time import sleep
+from ..decorators import background
+import docker
+
+
+app = create_app()
+docker_client = docker.from_env()
+
+app.app_context().push()
+from . import check_corpora, check_jobs, notify # noqa
+
+
+def run():
+ check_corpora_thread = check_corpora()
+ check_jobs_thread = check_jobs()
+ notify_thread = notify()
+
+ while True:
+ if not check_corpora_thread.is_alive():
+ check_corpora_thread = check_corpora()
+ if not check_jobs_thread.is_alive():
+ check_jobs_thread = check_jobs()
+ if not notify_thread.is_alive():
+ notify_thread = notify()
+ sleep(3)
+
+
+@background
+def check_corpora():
+ corpora = Corpus.query.all()
+ for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
+ __create_build_corpus_service(corpus)
+ for corpus in filter(lambda corpus: (corpus.status == 'queued'
+ or corpus.status == 'running'),
+ corpora):
+ __checkout_build_corpus_service(corpus)
+ for corpus in filter(lambda corpus: corpus.status == 'start analysis',
+ corpora):
+ __create_cqpserver_container(corpus)
+ for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
+ corpora):
+ __remove_cqpserver_container(corpus)
diff --git a/web/app/daemon/check_corpora.py b/web/app/daemon/check_corpora.py
new file mode 100644
index 00000000..1150ea6c
--- /dev/null
+++ b/web/app/daemon/check_corpora.py
@@ -0,0 +1,139 @@
+from . import docker_client
+from .. import db
+from ..decorators import background
+from ..models import Corpus
+import docker
+import logging
+import os
+import shutil
+
+
+@background
+def check_corpora():
+ corpora = Corpus.query.all()
+ for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
+ __create_build_corpus_service(corpus)
+ for corpus in filter(lambda corpus: (corpus.status == 'queued'
+ or corpus.status == 'running'),
+ corpora):
+ __checkout_build_corpus_service(corpus)
+ for corpus in filter(lambda corpus: corpus.status == 'start analysis',
+ corpora):
+ __create_cqpserver_container(corpus)
+ for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
+ corpora):
+ __remove_cqpserver_container(corpus)
+ db.session.commit()
+ Session.remove()
+
+
+def __create_build_corpus_service(corpus):
+ corpus_dir = os.path.join(config.DATA_DIR,
+ str(corpus.user_id),
+ 'corpora',
+ str(corpus.id))
+ corpus_data_dir = os.path.join(corpus_dir, 'data')
+ corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
+ corpus_registry_dir = os.path.join(corpus_dir, 'registry')
+ if os.path.exists(corpus_data_dir):
+ shutil.rmtree(corpus_data_dir)
+ if os.path.exists(corpus_registry_dir):
+ shutil.rmtree(corpus_registry_dir)
+ os.mkdir(corpus_data_dir)
+ os.mkdir(corpus_registry_dir)
+ service_args = {'command': 'docker-entrypoint.sh build-corpus',
+ 'constraints': ['node.role==worker'],
+ 'labels': {'origin': 'nopaque',
+ 'type': 'corpus.prepare',
+ 'corpus_id': str(corpus.id)},
+ 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
+ corpus_data_dir + ':/corpora/data:rw',
+ corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
+ 'name': 'build-corpus_{}'.format(corpus.id),
+ 'restart_policy': docker.types.RestartPolicy()}
+ service_image = \
+ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
+ try:
+ service = docker_client.services.get(service_args['name'])
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.DockerException:
+ return
+ else:
+ service.remove()
+ try:
+ docker_client.services.create(service_image, **service_args)
+ except docker.errors.DockerException:
+ corpus.status = 'failed'
+ else:
+ corpus.status = 'queued'
+
+
+def __checkout_build_corpus_service(corpus):
+ service_name = 'build-corpus_{}'.format(corpus.id)
+ try:
+ service = docker_client.services.get(service_name)
+ except docker.errors.NotFound:
+ logging.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ + ' The service does not exist.'
+ + ' (stauts: {} -> failed)'.format(corpus.status))
+ corpus.status = 'failed'
+ return
+ except docker.errors.DockerException:
+ return
+ service_tasks = service.tasks()
+ if not service_tasks:
+ return
+ task_state = service_tasks[0].get('Status').get('State')
+ if corpus.status == 'queued' and task_state != 'pending':
+ corpus.status = 'running'
+ elif corpus.status == 'running' and task_state == 'complete':
+ service.remove()
+ corpus.status = 'prepared'
+ elif corpus.status == 'running' and task_state == 'failed':
+ service.remove()
+ corpus.status = task_state
+
+
+def __create_cqpserver_container(corpus):
+ corpus_dir = os.path.join(config.DATA_DIR,
+ str(corpus.user_id),
+ 'corpora',
+ str(corpus.id))
+ corpus_data_dir = os.path.join(corpus_dir, 'data')
+ corpus_registry_dir = os.path.join(corpus_dir, 'registry')
+ container_args = {'command': 'cqpserver',
+ 'detach': True,
+ 'volumes': [corpus_data_dir + ':/corpora/data:rw',
+ corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
+ 'name': 'cqpserver_{}'.format(corpus.id),
+ 'network': 'nopaque_default'}
+ container_image = \
+ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
+ try:
+ container = docker_client.containers.get(container_args['name'])
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.DockerException:
+ return
+ else:
+ container.remove(force=True)
+ try:
+ docker_client.containers.run(container_image, **container_args)
+ except docker.errors.DockerException:
+ return
+ else:
+ corpus.status = 'analysing'
+
+
+def __remove_cqpserver_container(corpus):
+ container_name = 'cqpserver_{}'.format(corpus.id)
+ try:
+ container = docker_client.containers.get(container_name)
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.DockerException:
+ return
+ else:
+ container.remove(force=True)
+ corpus.status = 'prepared'
diff --git a/web/app/daemon/check_jobs.py b/web/app/daemon/check_jobs.py
new file mode 100644
index 00000000..f5530e1e
--- /dev/null
+++ b/web/app/daemon/check_jobs.py
@@ -0,0 +1,147 @@
+from datetime import datetime
+from .. import configuration as config
+from .. import docker_client, Session
+from ..decorators import background
+from ..models import Job, JobResult, NotificationData, NotificationEmailData
+import docker
+import logging
+import json
+import os
+
+
+@background
+def check_jobs():
+ session = Session()
+ jobs = session.query(Job).all()
+ for job in filter(lambda job: job.status == 'submitted', jobs):
+ __create_job_service(job)
+ for job in filter(lambda job: job.status == 'queued', jobs):
+ __checkout_job_service(job, session)
+ __add_notification_data(job, 'queued', session)
+ for job in filter(lambda job: job.status == 'running', jobs):
+ __checkout_job_service(job, session)
+ __add_notification_data(job, 'running', session)
+ for job in filter(lambda job: job.status == 'complete', jobs):
+ __add_notification_data(job, 'complete', session)
+ for job in filter(lambda job: job.status == 'failed', jobs):
+ __add_notification_data(job, 'failed', session)
+ for job in filter(lambda job: job.status == 'canceling', jobs):
+ __remove_job_service(job)
+ session.commit()
+ Session.remove()
+
+
+def __add_notification_data(job, notified_on_status, session):
+ # checks if user wants any notifications at all
+ if (job.user.setting_job_status_mail_notifications == 'none'):
+ return
+ # checks if user wants only notification on completed jobs
+ elif (job.user.setting_job_status_mail_notifications == 'end'
+ and notified_on_status != 'complete'):
+ return
+ else:
+ # check if a job already has associated NotificationData
+ notification_exists = len(job.notification_data)
+ # create notification_data for current job if there is none
+ if (notification_exists == 0):
+ notification_data = NotificationData(job_id=job.id)
+ session.add(notification_data)
+ # If no commit job will have no NotificationData
+ session.commit()
+ if (job.notification_data[0].notified_on != notified_on_status):
+ notification_email_data = NotificationEmailData(job_id=job.id)
+ notification_email_data.notify_status = notified_on_status
+ notification_email_data.creation_date = datetime.utcnow()
+ job.notification_data[0].notified_on = notified_on_status
+ session.add(notification_email_data)
+
+
+def __create_job_service(job):
+ job_dir = os.path.join(config.DATA_DIR,
+ str(job.user_id),
+ 'jobs',
+ str(job.id))
+ cmd = '{} -i /files -o /files/output'.format(job.service)
+ if job.service == 'file-setup':
+ cmd += ' -f {}'.format(job.secure_filename)
+ cmd += ' --log-dir /files'
+ cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
+ cmd += ' ' + ' '.join(json.loads(job.service_args))
+ service_args = {'command': cmd,
+ 'constraints': ['node.role==worker'],
+ 'labels': {'origin': 'nopaque',
+ 'type': 'service.{}'.format(job.service),
+ 'job_id': str(job.id)},
+ 'mounts': [job_dir + ':/files:rw'],
+ 'name': 'job_{}'.format(job.id),
+ 'resources': docker.types.Resources(
+ cpu_reservation=job.n_cores * (10 ** 9),
+ mem_reservation=job.mem_mb * (10 ** 6)),
+ 'restart_policy': docker.types.RestartPolicy()}
+ service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ + job.service + ':' + job.service_version)
+ try:
+ service = docker_client.services.get(service_args['name'])
+ except docker.errors.NotFound:
+ pass
+ except docker.errors.DockerException:
+ return
+ else:
+ service.remove()
+ try:
+ docker_client.services.create(service_image, **service_args)
+ except docker.errors.DockerException:
+ job.status = 'failed'
+ else:
+ job.status = 'queued'
+
+
+def __checkout_job_service(job, session):
+ service_name = 'job_{}'.format(job.id)
+ try:
+ service = docker_client.services.get(service_name)
+ except docker.errors.NotFound:
+ logging.error('__checkout_job_service({}): '.format(job.id)
+ + 'The service does not exist. '
+ + '(status: {} -> failed)'.format(job.status))
+ job.status = 'failed'
+ return
+ except docker.errors.DockerException:
+ return
+ service_tasks = service.tasks()
+ if not service_tasks:
+ return
+ task_state = service_tasks[0].get('Status').get('State')
+ if job.status == 'queued' and task_state != 'pending':
+ job.status = 'running'
+ elif (job.status == 'running'
+ and (task_state == 'complete' or task_state == 'failed')):
+ service.remove()
+ job.end_date = datetime.utcnow()
+ job.status = task_state
+ if task_state == 'complete':
+ results_dir = os.path.join(config.DATA_DIR,
+ str(job.user_id),
+ 'jobs',
+ str(job.id),
+ 'output')
+ results = filter(lambda x: x.endswith('.zip'),
+ os.listdir(results_dir))
+ for result in results:
+ job_result = JobResult(dir=results_dir,
+ filename=result,
+ job_id=job.id)
+ session.add(job_result)
+
+
+def __remove_job_service(job):
+ service_name = 'job_{}'.format(job.id)
+ try:
+ service = docker_client.services.get(service_name)
+ except docker.errors.NotFound:
+ job.status = 'canceled'
+ except docker.errors.DockerException:
+ return
+ else:
+ service.update(mounts=None)
+ service.remove()
diff --git a/web/app/daemon/libnotify/__init__.py b/web/app/daemon/libnotify/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/web/app/daemon/libnotify/notification.py b/web/app/daemon/libnotify/notification.py
new file mode 100644
index 00000000..488471c3
--- /dev/null
+++ b/web/app/daemon/libnotify/notification.py
@@ -0,0 +1,28 @@
+from email.message import EmailMessage
+
+
+class Notification(EmailMessage):
+ """docstring for Email."""
+
+ def set_notification_content(self,
+ subject_template,
+ subject_template_values_dict,
+ body_txt_template_path,
+ body_html_template_path,
+ body_template_values_dict):
+ # Create subject with subject_template_values_dict
+ self['subject'] = subject_template.format(
+ **subject_template_values_dict)
+ # Open template files and insert values from body_template_values_dict
+ with open(body_txt_template_path) as nfile:
+ self.body = nfile.read().format(**body_template_values_dict)
+ with open(body_html_template_path) as nfile:
+ self.html = nfile.read().format(**body_template_values_dict)
+ # Set txt of email
+ self.set_content(self.body)
+ # Set html alternative
+ self.add_alternative(self.html, subtype='html')
+
+ def set_addresses(self, sender, recipient):
+ self['From'] = sender
+ self['to'] = recipient
diff --git a/web/app/daemon/libnotify/service.py b/web/app/daemon/libnotify/service.py
new file mode 100644
index 00000000..633fb386
--- /dev/null
+++ b/web/app/daemon/libnotify/service.py
@@ -0,0 +1,16 @@
+class NotificationService:
+ """This is a nopaque notifcation service object."""
+
+ def __init__(self, smtp):
+ # Bool to show if the mail server stoped sending mails due to exceeding
+ # its sending limit
+ self.mail_limit_exceeded = False
+ # Holds due to an error unsent email notifications
+ self.not_sent = {}
+ self.smtp = smtp
+
+ def send(self, email):
+ self.smtp.send_message(email)
+
+ def quit(self):
+ self.smtp.quit()
diff --git a/web/app/daemon/libnotify/templates/notification.html b/web/app/daemon/libnotify/templates/notification.html
new file mode 100644
index 00000000..e2edfe75
--- /dev/null
+++ b/web/app/daemon/libnotify/templates/notification.html
@@ -0,0 +1,15 @@
+
+
+ Dear {username},
+
+ The status of your Job/Corpus({id}) with the title "{title}" has changed!
+ It is now {status}!
+ Time of this status update was: {time} UTC
+
+ You can access your Job/Corpus here: {url}
+
+
+ Kind regards!
+ Your nopaque team
+
+
diff --git a/web/app/daemon/libnotify/templates/notification.txt b/web/app/daemon/libnotify/templates/notification.txt
new file mode 100644
index 00000000..0e221c54
--- /dev/null
+++ b/web/app/daemon/libnotify/templates/notification.txt
@@ -0,0 +1,10 @@
+Dear {username},
+
+The status of your Job/Corpus({id}) with the title "{title}" has changed!
+It is now {status}!
+Time of this status update was: {time} UTC
+
+You can access your Job/Corpus here: {url}
+
+Kind regards!
+Your nopaque team
\ No newline at end of file
diff --git a/web/app/daemon/notify.py b/web/app/daemon/notify.py
new file mode 100644
index 00000000..5d3d23f3
--- /dev/null
+++ b/web/app/daemon/notify.py
@@ -0,0 +1,111 @@
+from sqlalchemy import asc
+from .libnotify.notification import Notification
+from .libnotify.service import NotificationService
+from .. import configuration as config
+from .. import Session
+from ..decorators import background
+from ..models import NotificationEmailData
+import logging
+import os
+import smtplib
+
+
+ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
+
+
+@background
+def notify():
+ session = Session()
+ if config.SMTP_USE_SSL:
+ smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT)
+ else:
+ smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT)
+ if config.SMTP_USE_TLS:
+ smtp.starttls()
+ try:
+ smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
+ except smtplib.SMTPHeloError:
+ logging.warning('The server didn’t reply properly to the HELO '
+ 'greeting.')
+ return
+ except smtplib.SMTPAuthenticationError as e:
+ logging.warning('The server didn’t accept the username/password '
+ 'combination.')
+ logging.warning(e)
+ return
+ except smtplib.SMTPNotSupportedError:
+ logging.warning('The AUTH command is not supported by the server.')
+ return
+ except smtplib.SMTPException:
+ logging.warning('No suitable authentication method was found.')
+ return
+ notification_service = NotificationService(smtp)
+ # create notifications (content, recipient etc.)
+ notifications = __create_mail_notifications(notification_service, session)
+ # only login and send mails if there are any notifications
+ if (len(notifications) > 0):
+ # combine new and unsent notifications
+ notifications.update(notification_service.not_sent)
+ # send all notifications
+ __send_mail_notifications(notifications, notification_service)
+ # remove unsent notifications because they have been sent now
+ # but only if mail limit has not been exceeded
+ if (notification_service.mail_limit_exceeded is not True):
+ notification_service.not_sent = {}
+ smtp.quit()
+ Session.remove()
+
+
+# Email notification functions
+def __create_mail_notifications(notification_service, session):
+ notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa
+ notifications = {}
+ for data in notification_email_data:
+ notification = Notification()
+ notification.set_addresses(config.SMTP_DEFAULT_SENDER,
+ data.job.user.email)
+ subject_template = ('[nopaque] Status update for your Job/Corpora: '
+ '{title}!')
+ subject_template_values_dict = {'title': data.job.title}
+ url = '{}://{}/{}/{}'.format(config.PROTOCOL,
+ config.DOMAIN,
+ 'jobs',
+ data.job.id)
+ body_template_values_dict = {'username': data.job.user.username,
+ 'id': data.job.id,
+ 'title': data.job.title,
+ 'status': data.notify_status,
+ 'time': data.creation_date,
+ 'url': url}
+ txt_tmplt = os.path.join(ROOT_DIR,
+ 'libnotify/templates/notification.txt')
+ html_tmplt = os.path.join(ROOT_DIR,
+ 'libnotify/templates/notification.html')
+ notification.set_notification_content(subject_template,
+ subject_template_values_dict,
+ txt_tmplt,
+ html_tmplt,
+ body_template_values_dict)
+ notifications[data.job.id] = notification
+ # Using a dictionary for notifications avoids sending multiple mails
+ # if the status of a job changes in a few seconds. The user will not
+ # get swamped with mails for queued, running and complete if those
+ # happen in in a few seconds. Only the last update will be sent.
+ # This depends on the sleep time interval though.
+ session.delete(data)
+ session.commit()
+ return notifications
+
+
+def __send_mail_notifications(notifications, notification_service):
+ for key, notification in notifications.items():
+ try:
+ notification_service.send(notification)
+ notification_service.mail_limit_exceeded = False
+ except Exception:
+ # Adds notifications to unsent if mail server exceded limit for
+ # consecutive mail sending
+ logging.warning('limit')
+ notification_service.not_sent[key] = notification
+ notification_service.mail_limit_exceeded = True
+ notification_service.not_sent.update(notifications)
diff --git a/web/requirements.txt b/web/requirements.txt
index 0d7f6e68..a47ebf8c 100644
--- a/web/requirements.txt
+++ b/web/requirements.txt
@@ -1,5 +1,6 @@
cqi
dnspython==1.16.0
+docker
eventlet
Flask
Flask-Login