diff --git a/daemon/Dockerfile b/daemon/Dockerfile index 52b8844e..1a0f28ee 100644 --- a/daemon/Dockerfile +++ b/daemon/Dockerfile @@ -28,7 +28,7 @@ WORKDIR /home/nopaqued COPY ["logger", "logger"] COPY ["notify", "notify"] COPY ["tasks", "tasks"] -COPY ["decorators.py", "nopaqued.py", "requirements.txt", "./"] +COPY ["nopaqued.py", "requirements.txt", "./"] RUN python -m venv venv \ && venv/bin/pip install --requirement requirements.txt \ && mkdir logs diff --git a/daemon/decorators.py b/daemon/decorators.py deleted file mode 100644 index 040250a8..00000000 --- a/daemon/decorators.py +++ /dev/null @@ -1,14 +0,0 @@ -from functools import wraps -from threading import Thread - - -def background(f): - ''' - ' This decorator executes a function in a Thread. - ''' - @wraps(f) - def wrapped(*args, **kwargs): - thread = Thread(target=f, args=args, kwargs=kwargs) - thread.start() - return thread - return wrapped diff --git a/daemon/logger/logger.py b/daemon/logger/logger.py index de602eb6..f347bc41 100644 --- a/daemon/logger/logger.py +++ b/daemon/logger/logger.py @@ -23,4 +23,4 @@ def init_logger(): if __name__ == '__main__': - init_logger() \ No newline at end of file + init_logger() diff --git a/daemon/nopaqued.py b/daemon/nopaqued.py index 2498f75e..18e6b273 100644 --- a/daemon/nopaqued.py +++ b/daemon/nopaqued.py @@ -1,34 +1,23 @@ -from tasks.check_jobs import check_jobs +from concurrent.futures import ThreadPoolExecutor from tasks.check_corpora import check_corpora +from tasks.check_jobs import check_jobs from tasks.notify import notify from time import sleep # TODO: Check if thread is still alive and execute next thread after that -# TODO: Remove unnecessary commits # TODO: Check line length -# check_jobs_thread = None -# check_corpora_thread = None -# notify_thread = None -# -# -# def nopaqued(): -# # executing background functions -# while True: -# check_jobs_thread = check_jobs() -# check_corpora_thread = check_corpora() -# notify_thread = notify(True) # If True mails are sent. If False no mails are sent. -# # But notification status will be set nonetheless. -# sleep(3) def nopaqued(): # executing background functions while True: - check_jobs() - check_corpora() - notify(True) # If True mails are sent. If False no mails are sent. - # But notification status will be set nonetheless. + with ThreadPoolExecutor(max_workers=3) as executor: + executor.submit(check_jobs) + executor.submit(check_corpora) + executor.submit(notify, True) # If True mails are sent. + # If False no mails are sent. + # But notification status will be set nonetheless. sleep(3) diff --git a/daemon/notify/notification.py b/daemon/notify/notification.py index ed88b6be..f6063386 100644 --- a/daemon/notify/notification.py +++ b/daemon/notify/notification.py @@ -24,4 +24,4 @@ class Notification(EmailMessage): def set_addresses(self, sender, recipient): self['From'] = sender - self['to'] = recipient \ No newline at end of file + self['to'] = recipient diff --git a/daemon/notify/service.py b/daemon/notify/service.py index 5687ac42..0b08037d 100644 --- a/daemon/notify/service.py +++ b/daemon/notify/service.py @@ -8,10 +8,10 @@ class NotificationService(object): def __init__(self, execute_flag): super(NotificationService, self).__init__() self.execute_flag = execute_flag # If True mails are sent normaly - # If False mails are not sent. Used to avoid sending mails for jobs that - # have been completed a long time ago. Use this if you implement notify - # into an already existing nopaque instance. Change it to True after the - # daemon has run one time with the flag set to False + # If False mails are not sent. Used to avoid sending mails for jobs + # that have been completed a long time ago. Use this if you implement + # notify into an already existing nopaque instance. Change it to True + # after the daemon has run one time with the flag set to False self.not_sent = {} # Holds due to an error unsent email notifications self.mail_limit_exceeded = False # Bool to show if the mail server # stoped sending mails due to exceeding its sending limit @@ -38,4 +38,4 @@ class NotificationService(object): return def quit(self): - self.smtp_server.quit() \ No newline at end of file + self.smtp_server.quit() diff --git a/daemon/tasks/check_corpora.py b/daemon/tasks/check_corpora.py index 9ade3db5..fb8e13fc 100644 --- a/daemon/tasks/check_corpora.py +++ b/daemon/tasks/check_corpora.py @@ -1,4 +1,3 @@ -from decorators import background from logger.logger import init_logger from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks.Models import Corpus @@ -7,7 +6,6 @@ import os import shutil -@background def check_corpora(): c_session = Session() corpora = c_session.query(Corpus).all() @@ -131,4 +129,4 @@ def __remove_cqpserver_container(corpus): return else: container.remove(force=True) - corpus.status = 'prepared' \ No newline at end of file + corpus.status = 'prepared' diff --git a/daemon/tasks/check_jobs.py b/daemon/tasks/check_jobs.py index 2b43f0b2..09dd24ac 100644 --- a/daemon/tasks/check_jobs.py +++ b/daemon/tasks/check_jobs.py @@ -1,5 +1,4 @@ from datetime import datetime -from decorators import background from logger.logger import init_logger from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult @@ -8,7 +7,6 @@ import json import os -@background def check_jobs(): # logger = init_logger() cj_session = Session() @@ -49,7 +47,8 @@ def __add_notification_data(job, notified_on_status, scoped_session): if (notification_exists == 0): notification_data = NotificationData(job_id=job.id) scoped_session.add(notification_data) - scoped_session.commit() # If no commit job will have no NotificationData + scoped_session.commit() + # If no commit job will have no NotificationData # logger.warning('Created NotificationData for current Job.')) else: pass @@ -70,10 +69,14 @@ def __create_job_service(job): job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs', str(job.id)) service_args = {'command': ('{} /files /files/output'.format(job.service) - + ' {}'.format(job.secure_filename if job.service == 'file-setup' else '') + + ' {}'.format(job.secure_filename + if job.service == 'file-setup' + else '') + ' --log-dir /files' - + ' --zip [{}]_{}'.format(job.service, job.secure_filename) - + ' ' + ' '.join(json.loads(job.service_args))), + + ' --zip [{}]_{}'.format(job.service, + job.secure_filename) + + ' ' + ' '.join(json.loads(job.service_args)) + ), 'constraints': ['node.role==worker'], 'labels': {'origin': 'nopaque', 'type': 'service.{}'.format(job.service), @@ -147,4 +150,4 @@ def __remove_job_service(job): return else: service.update(mounts=None) - service.remove() \ No newline at end of file + service.remove() diff --git a/daemon/tasks/notify.py b/daemon/tasks/notify.py index e28b66a3..a24dd19b 100644 --- a/daemon/tasks/notify.py +++ b/daemon/tasks/notify.py @@ -1,4 +1,3 @@ -from decorators import background from notify.notification import Notification from notify.service import NotificationService from sqlalchemy import asc @@ -7,57 +6,6 @@ from tasks.Models import NotificationEmailData import os -# Email notification functions -def __create_mail_notifications(notification_service): - mn_session = Session() - notification_email_data = mn_session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() - notifications = {} - for data in notification_email_data: - notification = Notification() - notification.set_addresses(notification_service.email_address, - data.job.user.email) - subject_template = '[nopaque] Status update for your Job/Corpora: {title}!' - subject_template_values_dict = {'title': data.job.title} - domain = os.environ.get('NOPAQUE_DOMAIN') - url = '{domain}/{jobs}/{id}'.format(domain=domain, - jobs='jobs', - id=data.job.id) - body_template_values_dict = {'username': data.job.user.username, - 'id': data.job.id, - 'title': data.job.title, - 'status': data.notify_status, - 'time': data.creation_date, - 'url': url} - notification.set_notification_content(subject_template, - subject_template_values_dict, - 'notify/templates/notification_messages/notification.txt', - 'notify/templates/notification_messages/notification.html', - body_template_values_dict) - notifications[data.job.id] = notification - # Using a dictionary for notifications avoids sending multiple mails - # if the status of a job changes in a few seconds. The user will not get - # swamped with mails for queued, running and complete if those happen in - # in a few seconds. Only the last update will be sent. This depends on - # the sleep time interval though. - mn_session.delete(data) - mn_session.commit() - Session.remove() - return notifications - - -def __send_mail_notifications(notifications, notification_service): - for key, notification in notifications.items(): - try: - notification_service.send(notification) - notification_service.mail_limit_exceeded = False - except Exception as e: - # Adds notifications to unsent if mail server exceded limit for - # consecutive mail sending - notification_service.not_sent[key] = notification - notification_service.mail_limit_exceeded = True - - -@background def notify(execute_flag): # If True mails are sent normaly # If False mails are not sent. Used to avoid sending mails for jobs that @@ -84,4 +32,57 @@ def notify(execute_flag): notification_service.not_sent = {} notification_service.quit() except Exception as e: - notification_service.not_sent.update(notifications) \ No newline at end of file + notification_service.not_sent.update(notifications) + notification_service.quit() + + +# Email notification functions +def __create_mail_notifications(notification_service): + mn_session = Session() + notification_email_data = mn_session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() + notifications = {} + for data in notification_email_data: + notification = Notification() + notification.set_addresses(notification_service.email_address, + data.job.user.email) + subject_template = '[nopaque] Status update for your Job/Corpora: {title}!' + subject_template_values_dict = {'title': data.job.title} + domain = os.environ.get('NOPAQUE_DOMAIN') + url = '{domain}/{jobs}/{id}'.format(domain=domain, + jobs='jobs', + id=data.job.id) + body_template_values_dict = {'username': data.job.user.username, + 'id': data.job.id, + 'title': data.job.title, + 'status': data.notify_status, + 'time': data.creation_date, + 'url': url} + txt_tmplt = 'notify/templates/notification_messages/notification.txt' + html_tmplt = 'notify/templates/notification_messages/notification.html' + notification.set_notification_content(subject_template, + subject_template_values_dict, + txt_tmplt, + html_tmplt, + body_template_values_dict) + notifications[data.job.id] = notification + # Using a dictionary for notifications avoids sending multiple mails + # if the status of a job changes in a few seconds. The user will not + # get swamped with mails for queued, running and complete if those + # happen in in a few seconds. Only the last update will be sent. + # This depends on the sleep time interval though. + mn_session.delete(data) + mn_session.commit() + Session.remove() + return notifications + + +def __send_mail_notifications(notifications, notification_service): + for key, notification in notifications.items(): + try: + notification_service.send(notification) + notification_service.mail_limit_exceeded = False + except Exception as e: + # Adds notifications to unsent if mail server exceded limit for + # consecutive mail sending + notification_service.not_sent[key] = notification + notification_service.mail_limit_exceeded = True