diff --git a/web/app/daemon.py b/web/app/daemon.py new file mode 100644 index 00000000..9e9d27c9 --- /dev/null +++ b/web/app/daemon.py @@ -0,0 +1,43 @@ +from app import create_app +from time import sleep +from ..decorators import background +import docker + + +app = create_app() +docker_client = docker.from_env() + +app.app_context().push() +from . import check_corpora, check_jobs, notify # noqa + + +def run(): + check_corpora_thread = check_corpora() + check_jobs_thread = check_jobs() + notify_thread = notify() + + while True: + if not check_corpora_thread.is_alive(): + check_corpora_thread = check_corpora() + if not check_jobs_thread.is_alive(): + check_jobs_thread = check_jobs() + if not notify_thread.is_alive(): + notify_thread = notify() + sleep(3) + + +@background +def check_corpora(): + corpora = Corpus.query.all() + for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): + __create_build_corpus_service(corpus) + for corpus in filter(lambda corpus: (corpus.status == 'queued' + or corpus.status == 'running'), + corpora): + __checkout_build_corpus_service(corpus) + for corpus in filter(lambda corpus: corpus.status == 'start analysis', + corpora): + __create_cqpserver_container(corpus) + for corpus in filter(lambda corpus: corpus.status == 'stop analysis', + corpora): + __remove_cqpserver_container(corpus) diff --git a/web/app/daemon/check_corpora.py b/web/app/daemon/check_corpora.py new file mode 100644 index 00000000..1150ea6c --- /dev/null +++ b/web/app/daemon/check_corpora.py @@ -0,0 +1,139 @@ +from . import docker_client +from .. import db +from ..decorators import background +from ..models import Corpus +import docker +import logging +import os +import shutil + + +@background +def check_corpora(): + corpora = Corpus.query.all() + for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): + __create_build_corpus_service(corpus) + for corpus in filter(lambda corpus: (corpus.status == 'queued' + or corpus.status == 'running'), + corpora): + __checkout_build_corpus_service(corpus) + for corpus in filter(lambda corpus: corpus.status == 'start analysis', + corpora): + __create_cqpserver_container(corpus) + for corpus in filter(lambda corpus: corpus.status == 'stop analysis', + corpora): + __remove_cqpserver_container(corpus) + db.session.commit() + Session.remove() + + +def __create_build_corpus_service(corpus): + corpus_dir = os.path.join(config.DATA_DIR, + str(corpus.user_id), + 'corpora', + str(corpus.id)) + corpus_data_dir = os.path.join(corpus_dir, 'data') + corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt') + corpus_registry_dir = os.path.join(corpus_dir, 'registry') + if os.path.exists(corpus_data_dir): + shutil.rmtree(corpus_data_dir) + if os.path.exists(corpus_registry_dir): + shutil.rmtree(corpus_registry_dir) + os.mkdir(corpus_data_dir) + os.mkdir(corpus_registry_dir) + service_args = {'command': 'docker-entrypoint.sh build-corpus', + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'corpus.prepare', + 'corpus_id': str(corpus.id)}, + 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', + corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'build-corpus_{}'.format(corpus.id), + 'restart_policy': docker.types.RestartPolicy()} + service_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + try: + service = docker_client.services.get(service_args['name']) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + service.remove() + try: + docker_client.services.create(service_image, **service_args) + except docker.errors.DockerException: + corpus.status = 'failed' + else: + corpus.status = 'queued' + + +def __checkout_build_corpus_service(corpus): + service_name = 'build-corpus_{}'.format(corpus.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound: + logging.error('__checkout_build_corpus_service({}):'.format(corpus.id) + + ' The service does not exist.' + + ' (stauts: {} -> failed)'.format(corpus.status)) + corpus.status = 'failed' + return + except docker.errors.DockerException: + return + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if corpus.status == 'queued' and task_state != 'pending': + corpus.status = 'running' + elif corpus.status == 'running' and task_state == 'complete': + service.remove() + corpus.status = 'prepared' + elif corpus.status == 'running' and task_state == 'failed': + service.remove() + corpus.status = task_state + + +def __create_cqpserver_container(corpus): + corpus_dir = os.path.join(config.DATA_DIR, + str(corpus.user_id), + 'corpora', + str(corpus.id)) + corpus_data_dir = os.path.join(corpus_dir, 'data') + corpus_registry_dir = os.path.join(corpus_dir, 'registry') + container_args = {'command': 'cqpserver', + 'detach': True, + 'volumes': [corpus_data_dir + ':/corpora/data:rw', + corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], + 'name': 'cqpserver_{}'.format(corpus.id), + 'network': 'nopaque_default'} + container_image = \ + 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + try: + container = docker_client.containers.get(container_args['name']) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + container.remove(force=True) + try: + docker_client.containers.run(container_image, **container_args) + except docker.errors.DockerException: + return + else: + corpus.status = 'analysing' + + +def __remove_cqpserver_container(corpus): + container_name = 'cqpserver_{}'.format(corpus.id) + try: + container = docker_client.containers.get(container_name) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + container.remove(force=True) + corpus.status = 'prepared' diff --git a/web/app/daemon/check_jobs.py b/web/app/daemon/check_jobs.py new file mode 100644 index 00000000..f5530e1e --- /dev/null +++ b/web/app/daemon/check_jobs.py @@ -0,0 +1,147 @@ +from datetime import datetime +from .. import configuration as config +from .. import docker_client, Session +from ..decorators import background +from ..models import Job, JobResult, NotificationData, NotificationEmailData +import docker +import logging +import json +import os + + +@background +def check_jobs(): + session = Session() + jobs = session.query(Job).all() + for job in filter(lambda job: job.status == 'submitted', jobs): + __create_job_service(job) + for job in filter(lambda job: job.status == 'queued', jobs): + __checkout_job_service(job, session) + __add_notification_data(job, 'queued', session) + for job in filter(lambda job: job.status == 'running', jobs): + __checkout_job_service(job, session) + __add_notification_data(job, 'running', session) + for job in filter(lambda job: job.status == 'complete', jobs): + __add_notification_data(job, 'complete', session) + for job in filter(lambda job: job.status == 'failed', jobs): + __add_notification_data(job, 'failed', session) + for job in filter(lambda job: job.status == 'canceling', jobs): + __remove_job_service(job) + session.commit() + Session.remove() + + +def __add_notification_data(job, notified_on_status, session): + # checks if user wants any notifications at all + if (job.user.setting_job_status_mail_notifications == 'none'): + return + # checks if user wants only notification on completed jobs + elif (job.user.setting_job_status_mail_notifications == 'end' + and notified_on_status != 'complete'): + return + else: + # check if a job already has associated NotificationData + notification_exists = len(job.notification_data) + # create notification_data for current job if there is none + if (notification_exists == 0): + notification_data = NotificationData(job_id=job.id) + session.add(notification_data) + # If no commit job will have no NotificationData + session.commit() + if (job.notification_data[0].notified_on != notified_on_status): + notification_email_data = NotificationEmailData(job_id=job.id) + notification_email_data.notify_status = notified_on_status + notification_email_data.creation_date = datetime.utcnow() + job.notification_data[0].notified_on = notified_on_status + session.add(notification_email_data) + + +def __create_job_service(job): + job_dir = os.path.join(config.DATA_DIR, + str(job.user_id), + 'jobs', + str(job.id)) + cmd = '{} -i /files -o /files/output'.format(job.service) + if job.service == 'file-setup': + cmd += ' -f {}'.format(job.secure_filename) + cmd += ' --log-dir /files' + cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename) + cmd += ' ' + ' '.join(json.loads(job.service_args)) + service_args = {'command': cmd, + 'constraints': ['node.role==worker'], + 'labels': {'origin': 'nopaque', + 'type': 'service.{}'.format(job.service), + 'job_id': str(job.id)}, + 'mounts': [job_dir + ':/files:rw'], + 'name': 'job_{}'.format(job.id), + 'resources': docker.types.Resources( + cpu_reservation=job.n_cores * (10 ** 9), + mem_reservation=job.mem_mb * (10 ** 6)), + 'restart_policy': docker.types.RestartPolicy()} + service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/' + + job.service + ':' + job.service_version) + try: + service = docker_client.services.get(service_args['name']) + except docker.errors.NotFound: + pass + except docker.errors.DockerException: + return + else: + service.remove() + try: + docker_client.services.create(service_image, **service_args) + except docker.errors.DockerException: + job.status = 'failed' + else: + job.status = 'queued' + + +def __checkout_job_service(job, session): + service_name = 'job_{}'.format(job.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound: + logging.error('__checkout_job_service({}): '.format(job.id) + + 'The service does not exist. ' + + '(status: {} -> failed)'.format(job.status)) + job.status = 'failed' + return + except docker.errors.DockerException: + return + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if job.status == 'queued' and task_state != 'pending': + job.status = 'running' + elif (job.status == 'running' + and (task_state == 'complete' or task_state == 'failed')): + service.remove() + job.end_date = datetime.utcnow() + job.status = task_state + if task_state == 'complete': + results_dir = os.path.join(config.DATA_DIR, + str(job.user_id), + 'jobs', + str(job.id), + 'output') + results = filter(lambda x: x.endswith('.zip'), + os.listdir(results_dir)) + for result in results: + job_result = JobResult(dir=results_dir, + filename=result, + job_id=job.id) + session.add(job_result) + + +def __remove_job_service(job): + service_name = 'job_{}'.format(job.id) + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound: + job.status = 'canceled' + except docker.errors.DockerException: + return + else: + service.update(mounts=None) + service.remove() diff --git a/web/app/daemon/libnotify/__init__.py b/web/app/daemon/libnotify/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/web/app/daemon/libnotify/notification.py b/web/app/daemon/libnotify/notification.py new file mode 100644 index 00000000..488471c3 --- /dev/null +++ b/web/app/daemon/libnotify/notification.py @@ -0,0 +1,28 @@ +from email.message import EmailMessage + + +class Notification(EmailMessage): + """docstring for Email.""" + + def set_notification_content(self, + subject_template, + subject_template_values_dict, + body_txt_template_path, + body_html_template_path, + body_template_values_dict): + # Create subject with subject_template_values_dict + self['subject'] = subject_template.format( + **subject_template_values_dict) + # Open template files and insert values from body_template_values_dict + with open(body_txt_template_path) as nfile: + self.body = nfile.read().format(**body_template_values_dict) + with open(body_html_template_path) as nfile: + self.html = nfile.read().format(**body_template_values_dict) + # Set txt of email + self.set_content(self.body) + # Set html alternative + self.add_alternative(self.html, subtype='html') + + def set_addresses(self, sender, recipient): + self['From'] = sender + self['to'] = recipient diff --git a/web/app/daemon/libnotify/service.py b/web/app/daemon/libnotify/service.py new file mode 100644 index 00000000..633fb386 --- /dev/null +++ b/web/app/daemon/libnotify/service.py @@ -0,0 +1,16 @@ +class NotificationService: + """This is a nopaque notifcation service object.""" + + def __init__(self, smtp): + # Bool to show if the mail server stoped sending mails due to exceeding + # its sending limit + self.mail_limit_exceeded = False + # Holds due to an error unsent email notifications + self.not_sent = {} + self.smtp = smtp + + def send(self, email): + self.smtp.send_message(email) + + def quit(self): + self.smtp.quit() diff --git a/web/app/daemon/libnotify/templates/notification.html b/web/app/daemon/libnotify/templates/notification.html new file mode 100644 index 00000000..e2edfe75 --- /dev/null +++ b/web/app/daemon/libnotify/templates/notification.html @@ -0,0 +1,15 @@ + + +

Dear {username},

+ +

The status of your Job/Corpus({id}) with the title "{title}" has changed!

+

It is now {status}!

+

Time of this status update was: {time} UTC

+ +

You can access your Job/Corpus here: {url} +

+ +

Kind regards!
+ Your nopaque team

+ + diff --git a/web/app/daemon/libnotify/templates/notification.txt b/web/app/daemon/libnotify/templates/notification.txt new file mode 100644 index 00000000..0e221c54 --- /dev/null +++ b/web/app/daemon/libnotify/templates/notification.txt @@ -0,0 +1,10 @@ +Dear {username}, + +The status of your Job/Corpus({id}) with the title "{title}" has changed! +It is now {status}! +Time of this status update was: {time} UTC + +You can access your Job/Corpus here: {url} + +Kind regards! +Your nopaque team \ No newline at end of file diff --git a/web/app/daemon/notify.py b/web/app/daemon/notify.py new file mode 100644 index 00000000..5d3d23f3 --- /dev/null +++ b/web/app/daemon/notify.py @@ -0,0 +1,111 @@ +from sqlalchemy import asc +from .libnotify.notification import Notification +from .libnotify.service import NotificationService +from .. import configuration as config +from .. import Session +from ..decorators import background +from ..models import NotificationEmailData +import logging +import os +import smtplib + + +ROOT_DIR = os.path.abspath(os.path.dirname(__file__)) + + +@background +def notify(): + session = Session() + if config.SMTP_USE_SSL: + smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT) + else: + smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT) + if config.SMTP_USE_TLS: + smtp.starttls() + try: + smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD) + except smtplib.SMTPHeloError: + logging.warning('The server didn’t reply properly to the HELO ' + 'greeting.') + return + except smtplib.SMTPAuthenticationError as e: + logging.warning('The server didn’t accept the username/password ' + 'combination.') + logging.warning(e) + return + except smtplib.SMTPNotSupportedError: + logging.warning('The AUTH command is not supported by the server.') + return + except smtplib.SMTPException: + logging.warning('No suitable authentication method was found.') + return + notification_service = NotificationService(smtp) + # create notifications (content, recipient etc.) + notifications = __create_mail_notifications(notification_service, session) + # only login and send mails if there are any notifications + if (len(notifications) > 0): + # combine new and unsent notifications + notifications.update(notification_service.not_sent) + # send all notifications + __send_mail_notifications(notifications, notification_service) + # remove unsent notifications because they have been sent now + # but only if mail limit has not been exceeded + if (notification_service.mail_limit_exceeded is not True): + notification_service.not_sent = {} + smtp.quit() + Session.remove() + + +# Email notification functions +def __create_mail_notifications(notification_service, session): + notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa + notifications = {} + for data in notification_email_data: + notification = Notification() + notification.set_addresses(config.SMTP_DEFAULT_SENDER, + data.job.user.email) + subject_template = ('[nopaque] Status update for your Job/Corpora: ' + '{title}!') + subject_template_values_dict = {'title': data.job.title} + url = '{}://{}/{}/{}'.format(config.PROTOCOL, + config.DOMAIN, + 'jobs', + data.job.id) + body_template_values_dict = {'username': data.job.user.username, + 'id': data.job.id, + 'title': data.job.title, + 'status': data.notify_status, + 'time': data.creation_date, + 'url': url} + txt_tmplt = os.path.join(ROOT_DIR, + 'libnotify/templates/notification.txt') + html_tmplt = os.path.join(ROOT_DIR, + 'libnotify/templates/notification.html') + notification.set_notification_content(subject_template, + subject_template_values_dict, + txt_tmplt, + html_tmplt, + body_template_values_dict) + notifications[data.job.id] = notification + # Using a dictionary for notifications avoids sending multiple mails + # if the status of a job changes in a few seconds. The user will not + # get swamped with mails for queued, running and complete if those + # happen in in a few seconds. Only the last update will be sent. + # This depends on the sleep time interval though. + session.delete(data) + session.commit() + return notifications + + +def __send_mail_notifications(notifications, notification_service): + for key, notification in notifications.items(): + try: + notification_service.send(notification) + notification_service.mail_limit_exceeded = False + except Exception: + # Adds notifications to unsent if mail server exceded limit for + # consecutive mail sending + logging.warning('limit') + notification_service.not_sent[key] = notification + notification_service.mail_limit_exceeded = True + notification_service.not_sent.update(notifications) diff --git a/web/requirements.txt b/web/requirements.txt index 0d7f6e68..a47ebf8c 100644 --- a/web/requirements.txt +++ b/web/requirements.txt @@ -1,5 +1,6 @@ cqi dnspython==1.16.0 +docker eventlet Flask Flask-Login