integrate nopaque repo

This commit is contained in:
Patrick Jentsch
2020-06-05 14:42:04 +02:00
parent 450ddf69fc
commit cb2b64fa9d
164 changed files with 1212 additions and 168 deletions

40
daemon/Dockerfile Normal file
View File

@ -0,0 +1,40 @@
FROM python:3.6-slim-stretch
LABEL maintainer="inf_sfb1288@lists.uni-bielefeld.de"
ARG docker_gid=998
ARG gid=1000
ARG uid=1000
ENV LANG=C.UTF-8
RUN apt-get update \
&& apt-get install --no-install-recommends --yes \
build-essential \
libpq-dev \
wait-for-it \
&& rm -rf /var/lib/apt/lists/*
RUN groupadd --gid ${docker_gid} --system docker \
&& groupadd --gid ${gid} --system nopaqued \
&& useradd --create-home --gid ${gid} --groups ${docker_gid} --no-log-init --system --uid ${uid} nopaqued
USER nopaqued
WORKDIR /home/nopaqued
COPY ["notify", "notify"]
COPY ["tasks", "tasks"]
COPY ["logger", "logger"]
COPY ["decorators.py", "nopaqued.py", "requirements.txt", "./"]
RUN python -m venv venv \
&& venv/bin/pip install --requirement requirements.txt \
&& mkdir logs
COPY docker-entrypoint.sh /usr/local/bin/
ENTRYPOINT ["docker-entrypoint.sh"]

14
daemon/decorators.py Normal file
View File

@ -0,0 +1,14 @@
from functools import wraps
from threading import Thread
def background(f):
'''
' This decorator executes a function in a Thread.
'''
@wraps(f)
def wrapped(*args, **kwargs):
thread = Thread(target=f, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapped

View File

@ -0,0 +1,8 @@
#!/bin/bash
echo "Waiting for db..."
wait-for-it db:5432 --strict --timeout=0
echo "Waiting for web..."
wait-for-it web:5000 --strict --timeout=0
venv/bin/python nopaqued.py

View File

26
daemon/logger/logger.py Normal file
View File

@ -0,0 +1,26 @@
import os
import logging
def init_logger():
'''
Functions initiates a logger instance.
'''
if not os.path.isfile('logs/nopaqued.log'):
file_path = os.path.join(os.getcwd(), 'logs/nopaqued.log')
log = open(file_path, 'w+')
log.close()
logging.basicConfig(datefmt='%Y-%m-%d %H:%M:%S',
filemode='w', filename='logs/nopaqued.log',
format='%(asctime)s - %(levelname)s - %(name)s - '
'%(filename)s - %(lineno)d - %(message)s')
logger = logging.getLogger(__name__)
if os.environ.get('FLASK_CONFIG') == 'development':
logger.setLevel(logging.DEBUG)
if os.environ.get('FLASK_CONFIG') == 'production':
logger.setLevel(logging.WARNING)
return logger
if __name__ == '__main__':
init_logger()

455
daemon/nopaqued.bak.py Normal file
View File

@ -0,0 +1,455 @@
from notify.notification import Notification
from notify.service import NotificationService
from sqlalchemy import create_engine, asc
from sqlalchemy.orm import Session, relationship
from sqlalchemy.ext.automap import automap_base
from datetime import datetime
from time import sleep
import docker
import json
import logging
import os
import shutil
''' Global constants '''
NOPAQUE_STORAGE = os.environ.get('NOPAQUE_STORAGE')
''' Global variables '''
docker_client = None
session = None
# Classes for database models
Base = automap_base()
class Corpus(Base):
__tablename__ = 'corpora'
files = relationship('CorpusFile', collection_class=set)
class CorpusFile(Base):
__tablename__ = 'corpus_files'
class Job(Base):
__tablename__ = 'jobs'
inputs = relationship('JobInput', collection_class=set)
results = relationship('JobResult', collection_class=set)
notification_data = relationship('NotificationData', collection_class=list)
notification_email_data = relationship('NotificationEmailData', collection_class=list)
class NotificationData(Base):
__tablename__ = 'notification_data'
job = relationship('Job', collection_class=set)
class NotificationEmailData(Base):
__tablename__ = 'notification_email_data'
job = relationship('Job', collection_class=set)
class JobInput(Base):
__tablename__ = 'job_results'
class JobResult(Base):
__tablename__ = 'job_results'
class User(Base):
__tablename__ = 'users'
jobs = relationship('Job', collection_class=set)
corpora = relationship('Corpus', collection_class=set)
def check_corpora():
corpora = session.query(Corpus).all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'opaque_default'}
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'
def check_jobs():
jobs = session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: (job.status == 'queued'), jobs):
__checkout_job_service(job)
# __add_notification_data(job, 'queued')
for job in filter(lambda job: (job.status == 'running'), jobs):
__checkout_job_service(job)
# __add_notification_data(job, 'running')
# for job in filter(lambda job: job.status == 'complete', jobs):
# __add_notification_data(job, 'complete')
# for job in filter(lambda job: job.status == 'failed', jobs):
#__add_notification_data(job, 'failed')
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
def __add_notification_data(job, notified_on_status):
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
# logger.warning('User does not want any notifications!')
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
# logger.warning('User only wants notifications on job completed!')
return
else:
# check if a job already has associated NotificationData
notification_exists = len(job.notification_data)
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
session.add(notification_data)
session.commit() # If no commit job will have no NotificationData
# logger.warning('Created NotificationData for current Job.'))
else:
pass
# logger.warning('Job already had notification: {}'.format(notification_exists))
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
session.add(notification_email_data)
# logger.warning('Created NotificationEmailData for current Job.')
else:
# logger.warning('NotificationEmailData has already been created for current Job!')
pass
def __create_job_service(job):
job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs',
str(job.id))
service_args = {'command': ('{} /files /files/output'.format(job.service)
+ ' {}'.format(job.secure_filename if job.service == 'file-setup' else '')
+ ' --log-dir /files'
+ ' --zip [{}]_{}'.format(job.service, job.secure_filename)
+ ' ' + ' '.join(json.loads(job.service_args))),
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
job.status = 'failed'
else:
job.status = 'queued'
def __checkout_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_job_service({}):'.format(job.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif (job.status == 'running'
and (task_state == 'complete' or task_state == 'failed')):
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id),
'jobs', str(job.id), 'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir, filename=result,
job_id=job.id)
session.add(job_result)
def __remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
except docker.errors.DockerException:
return
else:
service.update(mounts=None)
service.remove()
def handle_jobs():
check_jobs()
def handle_corpora():
check_corpora()
# Email notification functions
def create_mail_notifications(notification_service):
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all()
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(notification_service.email_address,
data.job.user.email)
subject_template = '[nopaque] Status update for your Job/Corpora: {title}!'
subject_template_values_dict = {'title': data.job.title}
domain = os.environ.get('NOPAQUE_DOMAIN')
url = '{domain}/{jobs}/{id}'.format(domain=domain,
jobs='jobs',
id=data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
'status': data.notify_status,
'time': data.creation_date,
'url': url}
notification.set_notification_content(subject_template,
subject_template_values_dict,
'templates/notification_messages/notification.txt',
'templates/notification_messages/notification.html',
body_template_values_dict)
notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not get
# swamped with mails for queued, running and complete if those happen in
# in a few seconds. Only the last update will be sent.
session.delete(data)
return notifications
def send_mail_notifications(notifications, notification_service):
for key, notification in notifications.items():
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception as e:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
def notify():
# Initialize notification service
notification_service = NotificationService()
notification_service.get_smtp_configs()
notification_service.set_server()
# create notifications (content, recipient etc.)
notifications = create_mail_notifications(notification_service)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
try:
notification_service.login()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
notification_service.quit()
except Exception as e:
notification_service.not_sent.update(notifications)
# Logger functions #
def init_logger():
'''
Functions initiates a logger instance.
'''
global logger
if not os.path.isfile('logs/nopaqued.log'):
file_path = os.path.join(os.getcwd(), 'logs/nopaqued.log')
log = open(file_path, 'w+')
log.close()
logging.basicConfig(datefmt='%Y-%m-%d %H:%M:%S',
filemode='w', filename='logs/nopaqued.log',
format='%(asctime)s - %(levelname)s - %(name)s - '
'%(filename)s - %(lineno)d - %(message)s')
logger = logging.getLogger(__name__)
if os.environ.get('FLASK_CONFIG') == 'development':
logger.setLevel(logging.DEBUG)
if os.environ.get('FLASK_CONFIG') == 'production':
logger.setLevel(logging.WARNING)
def nopaqued():
global Base
global docker_client
global session
engine = create_engine(
'postgresql://{}:{}@db/{}'.format(
os.environ.get('POSTGRES_USER'),
os.environ.get('POSTGRES_PASSWORD'),
os.environ.get('POSTGRES_DB_NAME')))
Base.prepare(engine, reflect=True)
session = Session(engine)
session.commit()
docker_client = docker.from_env()
docker_client.login(password=os.environ.get('GITLAB_PASSWORD'),
registry="gitlab.ub.uni-bielefeld.de:4567",
username=os.environ.get('GITLAB_USERNAME'))
# executing background functions
while True:
handle_jobs()
handle_corpora()
# notify()
session.commit()
sleep(3)
if __name__ == '__main__':
init_logger()
nopaqued()

18
daemon/nopaqued.py Normal file
View File

@ -0,0 +1,18 @@
from tasks.check_jobs import check_jobs
from tasks.check_corpora import check_corpora
from tasks.notify import notify
from time import sleep
def nopaqued():
# executing background functions
while True:
check_jobs()
check_corpora()
notify(True) # If True mails are sent. If False no mails are sent.
# But notification status will be set nonetheless.
sleep(3)
if __name__ == '__main__':
nopaqued()

View File

View File

@ -0,0 +1,27 @@
from email.message import EmailMessage
class Notification(EmailMessage):
"""docstring for Email."""
def set_notification_content(self,
subject_template,
subject_template_values_dict,
body_txt_template_path,
body_html_template_path,
body_template_values_dict):
# Create subject with subject_template_values_dict
self['subject'] = subject_template.format(**subject_template_values_dict)
# Open template files and insert values from body_template_values_dict
with open(body_txt_template_path) as nfile:
self.body_txt = nfile.read().format(**body_template_values_dict)
with open(body_html_template_path) as nfile:
self.body_html = nfile.read().format(**body_template_values_dict)
# Set txt of email
self.set_content(self.body_txt)
# Set html alternative
self.add_alternative(self.body_html, subtype='html')
def set_addresses(self, sender, recipient):
self['From'] = sender
self['to'] = recipient

41
daemon/notify/service.py Normal file
View File

@ -0,0 +1,41 @@
import os
import smtplib
class NotificationService(object):
"""This is a nopaque notifcation service object."""
def __init__(self, execute_flag):
super(NotificationService, self).__init__()
self.execute_flag = execute_flag # If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that
# have been completed a long time ago. Use this if you implement notify
# into an already existing nopaque instance. Change it to True after the
# daemon has run one time with the flag set to False
self.not_sent = {} # Holds due to an error unsent email notifications
self.mail_limit_exceeded = False # Bool to show if the mail server
# stoped sending mails due to exceeding its sending limit
def get_smtp_configs(self):
self.password = os.environ.get('MAIL_PASSWORD')
self.port = os.environ.get('MAIL_PORT')
self.server_str = os.environ.get('MAIL_SERVER')
self.tls = os.environ.get('MAIL_USE_TLS')
self.username = os.environ.get('MAIL_USERNAME').split("@")[0]
self.email_address = os.environ.get('MAIL_USERNAME')
def set_server(self):
self.smtp_server = smtplib.SMTP(host=self.server_str, port=self.port)
def login(self):
self.smtp_server.starttls()
self.smtp_server.login(self.username, self.password)
def send(self, email):
if self.execute_flag:
self.smtp_server.send_message(email)
else:
return
def quit(self):
self.smtp_server.quit()

View File

@ -0,0 +1,15 @@
<html>
<body>
<p>Dear <b>{username}</b>,</p>
<p>The status of your Job/Corpus({id}) with the title <b>"{title}"</b> has changed!</p>
<p>It is now <b>{status}</b>!</p>
<p>Time of this status update was: <b>{time} UTC</b></p>
<p>You can access your Job/Corpus here: <a href="{url}">{url}</a>
</p>
<p>Kind regards!<br>
Your nopaque team</p>
</body>
</html>

View File

@ -0,0 +1,10 @@
Dear {username},
The status of your Job/Corpus({id}) with the title "{title}" has changed!
It is now {status}!
Time of this status update was: {time} UTC
You can access your Job/Corpus here: {url}
Kind regards!
Your nopaque team

3
daemon/requirements.txt Normal file
View File

@ -0,0 +1,3 @@
docker
psycopg2
SQLAlchemy

52
daemon/tasks/Models.py Normal file
View File

@ -0,0 +1,52 @@
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import relationship
from tasks import engine
Base = automap_base()
# Classes for database models
class Corpus(Base):
__tablename__ = 'corpora'
files = relationship('CorpusFile', collection_class=set)
class CorpusFile(Base):
__tablename__ = 'corpus_files'
class Job(Base):
__tablename__ = 'jobs'
inputs = relationship('JobInput', collection_class=set)
results = relationship('JobResult', collection_class=set)
notification_data = relationship('NotificationData', collection_class=list)
notification_email_data = relationship('NotificationEmailData',
collection_class=list)
class JobInput(Base):
__tablename__ = 'job_results'
class JobResult(Base):
__tablename__ = 'job_results'
class NotificationData(Base):
__tablename__ = 'notification_data'
job = relationship('Job', collection_class=set)
class NotificationEmailData(Base):
__tablename__ = 'notification_email_data'
job = relationship('Job', collection_class=set)
class User(Base):
__tablename__ = 'users'
jobs = relationship('Job', collection_class=set)
corpora = relationship('Corpus', collection_class=set)
Base.prepare(engine, reflect=True)

22
daemon/tasks/__init__.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import os
import docker
''' Global constants '''
NOPAQUE_STORAGE = os.environ.get('NOPAQUE_STORAGE')
''' Docker client '''
docker_client = docker.from_env()
docker_client.login(password=os.environ.get('GITLAB_PASSWORD'),
registry="gitlab.ub.uni-bielefeld.de:4567",
username=os.environ.get('GITLAB_USERNAME'))
''' Scoped session '''
engine = create_engine(
'postgresql://{}:{}@db/{}'.format(
os.environ.get('POSTGRES_USER'),
os.environ.get('POSTGRES_PASSWORD'),
os.environ.get('POSTGRES_DB_NAME')))
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

View File

@ -0,0 +1,134 @@
from decorators import background
from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.Models import Corpus
import docker
import os
import shutil
@background
def check_corpora():
c_session = Session()
corpora = c_session.query(Corpus).all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
c_session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
logger = init_logger()
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'opaque_default'}
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'

152
daemon/tasks/check_jobs.py Normal file
View File

@ -0,0 +1,152 @@
from datetime import datetime
from decorators import background
from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult
import docker
import json
import os
@background
def check_jobs():
# logger = init_logger()
cj_session = Session()
jobs = cj_session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: (job.status == 'queued'), jobs):
__checkout_job_service(job, cj_session)
__add_notification_data(job, 'queued', cj_session)
for job in filter(lambda job: (job.status == 'running'), jobs):
__checkout_job_service(job, cj_session)
__add_notification_data(job, 'running', cj_session)
for job in filter(lambda job: job.status == 'complete', jobs):
__add_notification_data(job, 'complete', cj_session)
for job in filter(lambda job: job.status == 'failed', jobs):
__add_notification_data(job, 'failed', cj_session)
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
cj_session.commit()
Session.remove()
def __add_notification_data(job, notified_on_status, scoped_session):
logger = init_logger()
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
# logger.warning('User does not want any notifications!')
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
# logger.warning('User only wants notifications on job completed!')
return
else:
# check if a job already has associated NotificationData
notification_exists = len(job.notification_data)
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
scoped_session.add(notification_data)
scoped_session.commit() # If no commit job will have no NotificationData
# logger.warning('Created NotificationData for current Job.'))
else:
pass
# logger.warning('Job already had notification: {}'.format(notification_exists))
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
scoped_session.add(notification_email_data)
logger.warning('Created NotificationEmailData for current Job.')
else:
# logger.warning('NotificationEmailData has already been created for current Job!')
pass
def __create_job_service(job):
job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs',
str(job.id))
service_args = {'command': ('{} /files /files/output'.format(job.service)
+ ' {}'.format(job.secure_filename if job.service == 'file-setup' else '')
+ ' --log-dir /files'
+ ' --zip [{}]_{}'.format(job.service, job.secure_filename)
+ ' ' + ' '.join(json.loads(job.service_args))),
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
job.status = 'failed'
else:
job.status = 'queued'
def __checkout_job_service(job, scoped_session):
logger = init_logger()
scoped_session = Session()
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_job_service({}):'.format(job.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif (job.status == 'running'
and (task_state == 'complete' or task_state == 'failed')):
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id),
'jobs', str(job.id), 'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir, filename=result,
job_id=job.id)
scoped_session.add(job_result)
scoped_session.commit()
def __remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
except docker.errors.DockerException:
return
else:
service.update(mounts=None)
service.remove()

87
daemon/tasks/notify.py Normal file
View File

@ -0,0 +1,87 @@
from decorators import background
from notify.notification import Notification
from notify.service import NotificationService
from sqlalchemy import asc
from tasks import Session
from tasks.Models import NotificationEmailData
import os
# Email notification functions
def __create_mail_notifications(notification_service):
mn_session = Session()
notification_email_data = mn_session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all()
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(notification_service.email_address,
data.job.user.email)
subject_template = '[nopaque] Status update for your Job/Corpora: {title}!'
subject_template_values_dict = {'title': data.job.title}
domain = os.environ.get('NOPAQUE_DOMAIN')
url = '{domain}/{jobs}/{id}'.format(domain=domain,
jobs='jobs',
id=data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
'status': data.notify_status,
'time': data.creation_date,
'url': url}
notification.set_notification_content(subject_template,
subject_template_values_dict,
'notify/templates/notification_messages/notification.txt',
'notify/templates/notification_messages/notification.html',
body_template_values_dict)
notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not get
# swamped with mails for queued, running and complete if those happen in
# in a few seconds. Only the last update will be sent. This depends on
# the sleep time interval though.
mn_session.delete(data)
mn_session.commit()
Session.remove()
return notifications
def __send_mail_notifications(notifications, notification_service):
for key, notification in notifications.items():
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception as e:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
@background
def notify(execute_flag):
# If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that
# have been completed a long time ago. Use this if you implement notify
# into an already existing nopaque instance. Change it to True after the
# daemon has run one time with the flag set to False.
# Initialize notification service
notification_service = NotificationService(execute_flag)
notification_service.get_smtp_configs()
notification_service.set_server()
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
try:
notification_service.login()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
notification_service.quit()
except Exception as e:
notification_service.not_sent.update(notifications)