First steps

This commit is contained in:
Patrick Jentsch 2020-11-06 15:07:58 +01:00
parent 9e2cc6486c
commit 0f30e518af
10 changed files with 510 additions and 0 deletions

43
web/app/daemon.py Normal file
View File

@ -0,0 +1,43 @@
from app import create_app
from time import sleep
from ..decorators import background
import docker
app = create_app()
docker_client = docker.from_env()
app.app_context().push()
from . import check_corpora, check_jobs, notify # noqa
def run():
check_corpora_thread = check_corpora()
check_jobs_thread = check_jobs()
notify_thread = notify()
while True:
if not check_corpora_thread.is_alive():
check_corpora_thread = check_corpora()
if not check_jobs_thread.is_alive():
check_jobs_thread = check_jobs()
if not notify_thread.is_alive():
notify_thread = notify()
sleep(3)
@background
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)

View File

@ -0,0 +1,139 @@
from . import docker_client
from .. import db
from ..decorators import background
from ..models import Corpus
import docker
import logging
import os
import shutil
@background
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
db.session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'nopaque_default'}
container_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'

View File

@ -0,0 +1,147 @@
from datetime import datetime
from .. import configuration as config
from .. import docker_client, Session
from ..decorators import background
from ..models import Job, JobResult, NotificationData, NotificationEmailData
import docker
import logging
import json
import os
@background
def check_jobs():
session = Session()
jobs = session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: job.status == 'queued', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'queued', session)
for job in filter(lambda job: job.status == 'running', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'running', session)
for job in filter(lambda job: job.status == 'complete', jobs):
__add_notification_data(job, 'complete', session)
for job in filter(lambda job: job.status == 'failed', jobs):
__add_notification_data(job, 'failed', session)
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
session.commit()
Session.remove()
def __add_notification_data(job, notified_on_status, session):
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
return
else:
# check if a job already has associated NotificationData
notification_exists = len(job.notification_data)
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
session.add(notification_data)
# If no commit job will have no NotificationData
session.commit()
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
session.add(notification_email_data)
def __create_job_service(job):
job_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(job.secure_filename)
cmd += ' --log-dir /files'
cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
cmd += ' ' + ' '.join(json.loads(job.service_args))
service_args = {'command': cmd,
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
job.status = 'failed'
else:
job.status = 'queued'
def __checkout_job_service(job, session):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_job_service({}): '.format(job.id)
+ 'The service does not exist. '
+ '(status: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif (job.status == 'running'
and (task_state == 'complete' or task_state == 'failed')):
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id),
'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir,
filename=result,
job_id=job.id)
session.add(job_result)
def __remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
except docker.errors.DockerException:
return
else:
service.update(mounts=None)
service.remove()

View File

View File

@ -0,0 +1,28 @@
from email.message import EmailMessage
class Notification(EmailMessage):
"""docstring for Email."""
def set_notification_content(self,
subject_template,
subject_template_values_dict,
body_txt_template_path,
body_html_template_path,
body_template_values_dict):
# Create subject with subject_template_values_dict
self['subject'] = subject_template.format(
**subject_template_values_dict)
# Open template files and insert values from body_template_values_dict
with open(body_txt_template_path) as nfile:
self.body = nfile.read().format(**body_template_values_dict)
with open(body_html_template_path) as nfile:
self.html = nfile.read().format(**body_template_values_dict)
# Set txt of email
self.set_content(self.body)
# Set html alternative
self.add_alternative(self.html, subtype='html')
def set_addresses(self, sender, recipient):
self['From'] = sender
self['to'] = recipient

View File

@ -0,0 +1,16 @@
class NotificationService:
"""This is a nopaque notifcation service object."""
def __init__(self, smtp):
# Bool to show if the mail server stoped sending mails due to exceeding
# its sending limit
self.mail_limit_exceeded = False
# Holds due to an error unsent email notifications
self.not_sent = {}
self.smtp = smtp
def send(self, email):
self.smtp.send_message(email)
def quit(self):
self.smtp.quit()

View File

@ -0,0 +1,15 @@
<html>
<body>
<p>Dear <b>{username}</b>,</p>
<p>The status of your Job/Corpus({id}) with the title <b>"{title}"</b> has changed!</p>
<p>It is now <b>{status}</b>!</p>
<p>Time of this status update was: <b>{time} UTC</b></p>
<p>You can access your Job/Corpus here: <a href="{url}">{url}</a>
</p>
<p>Kind regards!<br>
Your nopaque team</p>
</body>
</html>

View File

@ -0,0 +1,10 @@
Dear {username},
The status of your Job/Corpus({id}) with the title "{title}" has changed!
It is now {status}!
Time of this status update was: {time} UTC
You can access your Job/Corpus here: {url}
Kind regards!
Your nopaque team

111
web/app/daemon/notify.py Normal file
View File

@ -0,0 +1,111 @@
from sqlalchemy import asc
from .libnotify.notification import Notification
from .libnotify.service import NotificationService
from .. import configuration as config
from .. import Session
from ..decorators import background
from ..models import NotificationEmailData
import logging
import os
import smtplib
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
@background
def notify():
session = Session()
if config.SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT)
else:
smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT)
if config.SMTP_USE_TLS:
smtp.starttls()
try:
smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
except smtplib.SMTPHeloError:
logging.warning('The server didnt reply properly to the HELO '
'greeting.')
return
except smtplib.SMTPAuthenticationError as e:
logging.warning('The server didnt accept the username/password '
'combination.')
logging.warning(e)
return
except smtplib.SMTPNotSupportedError:
logging.warning('The AUTH command is not supported by the server.')
return
except smtplib.SMTPException:
logging.warning('No suitable authentication method was found.')
return
notification_service = NotificationService(smtp)
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service, session)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
smtp.quit()
Session.remove()
# Email notification functions
def __create_mail_notifications(notification_service, session):
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(config.SMTP_DEFAULT_SENDER,
data.job.user.email)
subject_template = ('[nopaque] Status update for your Job/Corpora: '
'{title}!')
subject_template_values_dict = {'title': data.job.title}
url = '{}://{}/{}/{}'.format(config.PROTOCOL,
config.DOMAIN,
'jobs',
data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
'status': data.notify_status,
'time': data.creation_date,
'url': url}
txt_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.txt')
html_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.html')
notification.set_notification_content(subject_template,
subject_template_values_dict,
txt_tmplt,
html_tmplt,
body_template_values_dict)
notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not
# get swamped with mails for queued, running and complete if those
# happen in in a few seconds. Only the last update will be sent.
# This depends on the sleep time interval though.
session.delete(data)
session.commit()
return notifications
def __send_mail_notifications(notifications, notification_service):
for key, notification in notifications.items():
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
logging.warning('limit')
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
notification_service.not_sent.update(notifications)

View File

@ -1,5 +1,6 @@
cqi
dnspython==1.16.0
docker
eventlet
Flask
Flask-Login