mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-01-18 05:50:34 +00:00
456 lines
18 KiB
Python
456 lines
18 KiB
Python
from notify.notification import Notification
|
|
from notify.service import NotificationService
|
|
from sqlalchemy import create_engine, asc
|
|
from sqlalchemy.orm import Session, relationship
|
|
from sqlalchemy.ext.automap import automap_base
|
|
from datetime import datetime
|
|
from time import sleep
|
|
import docker
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
|
|
|
|
''' Global constants '''
|
|
NOPAQUE_STORAGE = os.environ.get('NOPAQUE_STORAGE')
|
|
|
|
''' Global variables '''
|
|
docker_client = None
|
|
session = None
|
|
|
|
|
|
# Classes for database models
|
|
Base = automap_base()
|
|
|
|
|
|
class Corpus(Base):
|
|
__tablename__ = 'corpora'
|
|
files = relationship('CorpusFile', collection_class=set)
|
|
|
|
|
|
class CorpusFile(Base):
|
|
__tablename__ = 'corpus_files'
|
|
|
|
|
|
class Job(Base):
|
|
__tablename__ = 'jobs'
|
|
inputs = relationship('JobInput', collection_class=set)
|
|
results = relationship('JobResult', collection_class=set)
|
|
notification_data = relationship('NotificationData', collection_class=list)
|
|
notification_email_data = relationship('NotificationEmailData', collection_class=list)
|
|
|
|
|
|
class NotificationData(Base):
|
|
__tablename__ = 'notification_data'
|
|
job = relationship('Job', collection_class=set)
|
|
|
|
|
|
class NotificationEmailData(Base):
|
|
__tablename__ = 'notification_email_data'
|
|
job = relationship('Job', collection_class=set)
|
|
|
|
|
|
class JobInput(Base):
|
|
__tablename__ = 'job_results'
|
|
|
|
|
|
class JobResult(Base):
|
|
__tablename__ = 'job_results'
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = 'users'
|
|
jobs = relationship('Job', collection_class=set)
|
|
corpora = relationship('Corpus', collection_class=set)
|
|
|
|
|
|
def check_corpora():
|
|
corpora = session.query(Corpus).all()
|
|
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
|
|
__create_build_corpus_service(corpus)
|
|
for corpus in filter(lambda corpus: (corpus.status == 'queued'
|
|
or corpus.status == 'running'),
|
|
corpora):
|
|
__checkout_build_corpus_service(corpus)
|
|
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
|
|
corpora):
|
|
__create_cqpserver_container(corpus)
|
|
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
|
|
corpora):
|
|
__remove_cqpserver_container(corpus)
|
|
|
|
|
|
def __create_build_corpus_service(corpus):
|
|
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
|
|
'corpora', str(corpus.id))
|
|
corpus_data_dir = os.path.join(corpus_dir, 'data')
|
|
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
|
|
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
|
|
if os.path.exists(corpus_data_dir):
|
|
shutil.rmtree(corpus_data_dir)
|
|
if os.path.exists(corpus_registry_dir):
|
|
shutil.rmtree(corpus_registry_dir)
|
|
os.mkdir(corpus_data_dir)
|
|
os.mkdir(corpus_registry_dir)
|
|
service_args = {'command': 'docker-entrypoint.sh build-corpus',
|
|
'constraints': ['node.role==worker'],
|
|
'labels': {'origin': 'nopaque',
|
|
'type': 'corpus.prepare',
|
|
'corpus_id': str(corpus.id)},
|
|
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
|
|
corpus_data_dir + ':/corpora/data:rw',
|
|
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
|
|
'name': 'build-corpus_{}'.format(corpus.id),
|
|
'restart_policy': docker.types.RestartPolicy()}
|
|
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
|
|
try:
|
|
service = docker_client.services.get(service_args['name'])
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
service.remove()
|
|
try:
|
|
docker_client.services.create(service_image, **service_args)
|
|
except docker.errors.DockerException:
|
|
corpus.status = 'failed'
|
|
else:
|
|
corpus.status = 'queued'
|
|
|
|
|
|
def __checkout_build_corpus_service(corpus):
|
|
service_name = 'build-corpus_{}'.format(corpus.id)
|
|
try:
|
|
service = docker_client.services.get(service_name)
|
|
except docker.errors.NotFound:
|
|
logger.error('__checkout_build_corpus_service({}):'.format(corpus.id)
|
|
+ ' The service does not exist.'
|
|
+ ' (stauts: {} -> failed)'.format(corpus.status))
|
|
corpus.status = 'failed'
|
|
return
|
|
except docker.errors.DockerException:
|
|
return
|
|
service_tasks = service.tasks()
|
|
if not service_tasks:
|
|
return
|
|
task_state = service_tasks[0].get('Status').get('State')
|
|
if corpus.status == 'queued' and task_state != 'pending':
|
|
corpus.status = 'running'
|
|
elif corpus.status == 'running' and task_state == 'complete':
|
|
service.remove()
|
|
corpus.status = 'prepared'
|
|
elif corpus.status == 'running' and task_state == 'failed':
|
|
service.remove()
|
|
corpus.status = task_state
|
|
|
|
|
|
def __create_cqpserver_container(corpus):
|
|
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
|
|
'corpora', str(corpus.id))
|
|
corpus_data_dir = os.path.join(corpus_dir, 'data')
|
|
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
|
|
container_args = {'command': 'cqpserver',
|
|
'detach': True,
|
|
'volumes': [corpus_data_dir + ':/corpora/data:rw',
|
|
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
|
|
'name': 'cqpserver_{}'.format(corpus.id),
|
|
'network': 'opaque_default'}
|
|
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
|
|
try:
|
|
container = docker_client.containers.get(container_args['name'])
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
container.remove(force=True)
|
|
try:
|
|
docker_client.containers.run(container_image, **container_args)
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
corpus.status = 'analysing'
|
|
|
|
|
|
def __remove_cqpserver_container(corpus):
|
|
container_name = 'cqpserver_{}'.format(corpus.id)
|
|
try:
|
|
container = docker_client.containers.get(container_name)
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
container.remove(force=True)
|
|
corpus.status = 'prepared'
|
|
|
|
|
|
def check_jobs():
|
|
jobs = session.query(Job).all()
|
|
for job in filter(lambda job: job.status == 'submitted', jobs):
|
|
__create_job_service(job)
|
|
for job in filter(lambda job: (job.status == 'queued'), jobs):
|
|
__checkout_job_service(job)
|
|
# __add_notification_data(job, 'queued')
|
|
for job in filter(lambda job: (job.status == 'running'), jobs):
|
|
__checkout_job_service(job)
|
|
# __add_notification_data(job, 'running')
|
|
# for job in filter(lambda job: job.status == 'complete', jobs):
|
|
# __add_notification_data(job, 'complete')
|
|
# for job in filter(lambda job: job.status == 'failed', jobs):
|
|
#__add_notification_data(job, 'failed')
|
|
for job in filter(lambda job: job.status == 'canceling', jobs):
|
|
__remove_job_service(job)
|
|
|
|
|
|
def __add_notification_data(job, notified_on_status):
|
|
# checks if user wants any notifications at all
|
|
if (job.user.setting_job_status_mail_notifications == 'none'):
|
|
# logger.warning('User does not want any notifications!')
|
|
return
|
|
# checks if user wants only notification on completed jobs
|
|
elif (job.user.setting_job_status_mail_notifications == 'end'
|
|
and notified_on_status != 'complete'):
|
|
# logger.warning('User only wants notifications on job completed!')
|
|
return
|
|
else:
|
|
# check if a job already has associated NotificationData
|
|
notification_exists = len(job.notification_data)
|
|
# create notification_data for current job if there is none
|
|
if (notification_exists == 0):
|
|
notification_data = NotificationData(job_id=job.id)
|
|
session.add(notification_data)
|
|
session.commit() # If no commit job will have no NotificationData
|
|
# logger.warning('Created NotificationData for current Job.'))
|
|
else:
|
|
pass
|
|
# logger.warning('Job already had notification: {}'.format(notification_exists))
|
|
if (job.notification_data[0].notified_on != notified_on_status):
|
|
notification_email_data = NotificationEmailData(job_id=job.id)
|
|
notification_email_data.notify_status = notified_on_status
|
|
notification_email_data.creation_date = datetime.utcnow()
|
|
job.notification_data[0].notified_on = notified_on_status
|
|
session.add(notification_email_data)
|
|
# logger.warning('Created NotificationEmailData for current Job.')
|
|
else:
|
|
# logger.warning('NotificationEmailData has already been created for current Job!')
|
|
pass
|
|
|
|
|
|
def __create_job_service(job):
|
|
job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs',
|
|
str(job.id))
|
|
service_args = {'command': ('{} /files /files/output'.format(job.service)
|
|
+ ' {}'.format(job.secure_filename if job.service == 'file-setup' else '')
|
|
+ ' --log-dir /files'
|
|
+ ' --zip [{}]_{}'.format(job.service, job.secure_filename)
|
|
+ ' ' + ' '.join(json.loads(job.service_args))),
|
|
'constraints': ['node.role==worker'],
|
|
'labels': {'origin': 'nopaque',
|
|
'type': 'service.{}'.format(job.service),
|
|
'job_id': str(job.id)},
|
|
'mounts': [job_dir + ':/files:rw'],
|
|
'name': 'job_{}'.format(job.id),
|
|
'resources': docker.types.Resources(
|
|
cpu_reservation=job.n_cores * (10 ** 9),
|
|
mem_reservation=job.mem_mb * (10 ** 6)),
|
|
'restart_policy': docker.types.RestartPolicy()}
|
|
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
|
|
+ job.service + ':' + job.service_version)
|
|
try:
|
|
service = docker_client.services.get(service_args['name'])
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
service.remove()
|
|
try:
|
|
docker_client.services.create(service_image, **service_args)
|
|
except docker.errors.DockerException:
|
|
job.status = 'failed'
|
|
else:
|
|
job.status = 'queued'
|
|
|
|
|
|
def __checkout_job_service(job):
|
|
service_name = 'job_{}'.format(job.id)
|
|
try:
|
|
service = docker_client.services.get(service_name)
|
|
except docker.errors.NotFound:
|
|
logger.error('__checkout_job_service({}):'.format(job.id)
|
|
+ ' The service does not exist.'
|
|
+ ' (stauts: {} -> failed)'.format(job.status))
|
|
job.status = 'failed'
|
|
return
|
|
except docker.errors.DockerException:
|
|
return
|
|
service_tasks = service.tasks()
|
|
if not service_tasks:
|
|
return
|
|
task_state = service_tasks[0].get('Status').get('State')
|
|
if job.status == 'queued' and task_state != 'pending':
|
|
job.status = 'running'
|
|
elif (job.status == 'running'
|
|
and (task_state == 'complete' or task_state == 'failed')):
|
|
service.remove()
|
|
job.end_date = datetime.utcnow()
|
|
job.status = task_state
|
|
if task_state == 'complete':
|
|
results_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id),
|
|
'jobs', str(job.id), 'output')
|
|
results = filter(lambda x: x.endswith('.zip'),
|
|
os.listdir(results_dir))
|
|
for result in results:
|
|
job_result = JobResult(dir=results_dir, filename=result,
|
|
job_id=job.id)
|
|
session.add(job_result)
|
|
|
|
|
|
def __remove_job_service(job):
|
|
service_name = 'job_{}'.format(job.id)
|
|
try:
|
|
service = docker_client.services.get(service_name)
|
|
except docker.errors.NotFound:
|
|
job.status = 'canceled'
|
|
except docker.errors.DockerException:
|
|
return
|
|
else:
|
|
service.update(mounts=None)
|
|
service.remove()
|
|
|
|
|
|
def handle_jobs():
|
|
check_jobs()
|
|
|
|
|
|
def handle_corpora():
|
|
check_corpora()
|
|
|
|
|
|
# Email notification functions
|
|
def create_mail_notifications(notification_service):
|
|
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all()
|
|
notifications = {}
|
|
for data in notification_email_data:
|
|
notification = Notification()
|
|
notification.set_addresses(notification_service.email_address,
|
|
data.job.user.email)
|
|
subject_template = '[nopaque] Status update for your Job/Corpora: {title}!'
|
|
subject_template_values_dict = {'title': data.job.title}
|
|
protocol = os.environ.get('NOPAQUE_PROTOCOL')
|
|
domain = os.environ.get('NOPAQUE_DOMAIN')
|
|
url = '{protocol}://{domain}/{jobs}/{id}'.format(
|
|
protocol=protocol, domain=domain, jobs='jobs', id=data.job.id)
|
|
body_template_values_dict = {'username': data.job.user.username,
|
|
'id': data.job.id,
|
|
'title': data.job.title,
|
|
'status': data.notify_status,
|
|
'time': data.creation_date,
|
|
'url': url}
|
|
notification.set_notification_content(subject_template,
|
|
subject_template_values_dict,
|
|
'templates/notification_messages/notification.txt',
|
|
'templates/notification_messages/notification.html',
|
|
body_template_values_dict)
|
|
notifications[data.job.id] = notification
|
|
# Using a dictionary for notifications avoids sending multiple mails
|
|
# if the status of a job changes in a few seconds. The user will not get
|
|
# swamped with mails for queued, running and complete if those happen in
|
|
# in a few seconds. Only the last update will be sent.
|
|
session.delete(data)
|
|
return notifications
|
|
|
|
|
|
def send_mail_notifications(notifications, notification_service):
|
|
for key, notification in notifications.items():
|
|
try:
|
|
notification_service.send(notification)
|
|
notification_service.mail_limit_exceeded = False
|
|
except Exception as e:
|
|
# Adds notifications to unsent if mail server exceded limit for
|
|
# consecutive mail sending
|
|
notification_service.not_sent[key] = notification
|
|
notification_service.mail_limit_exceeded = True
|
|
|
|
|
|
def notify():
|
|
# Initialize notification service
|
|
notification_service = NotificationService()
|
|
notification_service.get_smtp_configs()
|
|
notification_service.set_server()
|
|
# create notifications (content, recipient etc.)
|
|
notifications = create_mail_notifications(notification_service)
|
|
# only login and send mails if there are any notifications
|
|
if (len(notifications) > 0):
|
|
try:
|
|
notification_service.login()
|
|
# combine new and unsent notifications
|
|
notifications.update(notification_service.not_sent)
|
|
# send all notifications
|
|
send_mail_notifications(notifications, notification_service)
|
|
# remove unsent notifications because they have been sent now
|
|
# but only if mail limit has not been exceeded
|
|
if (notification_service.mail_limit_exceeded is not True):
|
|
notification_service.not_sent = {}
|
|
notification_service.quit()
|
|
except Exception as e:
|
|
notification_service.not_sent.update(notifications)
|
|
|
|
|
|
# Logger functions #
|
|
def init_logger():
|
|
'''
|
|
Functions initiates a logger instance.
|
|
'''
|
|
global logger
|
|
|
|
if not os.path.isfile('logs/nopaqued.log'):
|
|
file_path = os.path.join(os.getcwd(), 'logs/nopaqued.log')
|
|
log = open(file_path, 'w+')
|
|
log.close()
|
|
logging.basicConfig(datefmt='%Y-%m-%d %H:%M:%S',
|
|
filemode='w', filename='logs/nopaqued.log',
|
|
format='%(asctime)s - %(levelname)s - %(name)s - '
|
|
'%(filename)s - %(lineno)d - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
if os.environ.get('FLASK_CONFIG') == 'development':
|
|
logger.setLevel(logging.DEBUG)
|
|
if os.environ.get('FLASK_CONFIG') == 'production':
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
|
|
def nopaqued():
|
|
global Base
|
|
global docker_client
|
|
global session
|
|
|
|
engine = create_engine(
|
|
'postgresql://{}:{}@db/{}'.format(
|
|
os.environ.get('POSTGRES_USER'),
|
|
os.environ.get('POSTGRES_PASSWORD'),
|
|
os.environ.get('POSTGRES_DB_NAME')))
|
|
Base.prepare(engine, reflect=True)
|
|
session = Session(engine)
|
|
session.commit()
|
|
|
|
docker_client = docker.from_env()
|
|
docker_client.login(password=os.environ.get('GITLAB_PASSWORD'),
|
|
registry="gitlab.ub.uni-bielefeld.de:4567",
|
|
username=os.environ.get('GITLAB_USERNAME'))
|
|
|
|
# executing background functions
|
|
while True:
|
|
handle_jobs()
|
|
handle_corpora()
|
|
# notify()
|
|
session.commit()
|
|
sleep(3)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
init_logger()
|
|
nopaqued()
|