Simplify daemon logic

This commit is contained in:
Patrick Jentsch 2020-11-09 16:14:19 +01:00
parent 0f30e518af
commit cb9da5c7dd
21 changed files with 308 additions and 555 deletions

View File

@ -13,11 +13,3 @@ services:
- "./web/nopaque.py:/home/nopaque/nopaque.py" - "./web/nopaque.py:/home/nopaque/nopaque.py"
- "./web/requirements.txt:/home/nopaque/requirements.txt" - "./web/requirements.txt:/home/nopaque/requirements.txt"
- "./web/tests:/home/nopaque/tests" - "./web/tests:/home/nopaque/tests"
nopaqued:
volumes:
# Mount code as volumes
- "./daemon/app:/home/nopaqued/app"
- "./daemon/boot.sh:/home/nopaqued/boot.sh"
- "./daemon/config.py:/home/nopaqued/config.py"
- "./daemon/nopaqued.py:/home/nopaqued/nopaqued.py"
- "./daemon/requirements.txt:/home/nopaqued/requirements.txt"

View File

@ -1,9 +1,23 @@
version: "3.5" version: "3.5"
services: services:
db:
env_file: db.env
image: postgres:11
restart: unless-stopped
volumes:
- "${HOST_DB_DIR:-./db}:/var/lib/postgresql/data"
mq:
image: redis:6
restart: unless-stopped
volumes:
- "${HOST_MQ_DIR:-./mq}:/data"
nopaque: nopaque:
build: build:
args: args:
DOCKER_GID: ${HOST_DOCKER_GID}
GID: ${HOST_GID} GID: ${HOST_GID}
UID: ${HOST_UID} UID: ${HOST_UID}
context: ./web context: ./web
@ -16,31 +30,3 @@ services:
volumes: volumes:
- "${NOPAQUE_DATA_DIR:-/mnt/nopaque}:${NOPAQUE_DATA_DIR:-/mnt/nopaque}" - "${NOPAQUE_DATA_DIR:-/mnt/nopaque}:${NOPAQUE_DATA_DIR:-/mnt/nopaque}"
- "${HOST_NOPAQUE_LOG_FILE-./nopaque.log}:${NOPAQUE_LOG_FILE:-/home/nopaque/nopaque.log}" - "${HOST_NOPAQUE_LOG_FILE-./nopaque.log}:${NOPAQUE_LOG_FILE:-/home/nopaque/nopaque.log}"
nopaqued:
build:
args:
DOCKER_GID: ${HOST_DOCKER_GID}
GID: ${HOST_GID}
UID: ${HOST_UID}
context: ./daemon
depends_on:
- db
- nopaque
env_file: .env
image: nopaqued:development
restart: unless-stopped
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
- "${NOPAQUE_DATA_DIR:-/mnt/nopaque}:${NOPAQUE_DATA_DIR:-/mnt/nopaque}"
- "${HOST_NOPAQUE_DAEMON_LOG_FILE-./nopaqued.log}:${NOPAQUE_DAEMON_LOG_FILE:-/home/nopaqued/nopaqued.log}"
db:
env_file: db.env
image: postgres:11
restart: unless-stopped
volumes:
- "${HOST_DB_DIR:-./db}:/var/lib/postgresql/data"
mq:
image: redis:6
restart: unless-stopped
volumes:
- "${HOST_MQ_DIR:-./mq}:/data"

1
web/.flaskenv Normal file
View File

@ -0,0 +1 @@
FLASK_APP=nopaque.py

View File

@ -4,9 +4,9 @@ FROM python:3.9.0-slim-buster
LABEL authors="Patrick Jentsch <p.jentsch@uni-bielefeld.de>, Stephan Porada <sporada@uni-bielefeld.de>" LABEL authors="Patrick Jentsch <p.jentsch@uni-bielefeld.de>, Stephan Porada <sporada@uni-bielefeld.de>"
ARG DOCKER_GID
ARG UID ARG UID
ARG GID ARG GID
ENV FLASK_APP=nopaque.py
ENV LANG=C.UTF-8 ENV LANG=C.UTF-8

View File

@ -38,6 +38,7 @@ def create_app(config_name):
from .main import main as main_blueprint from .main import main as main_blueprint
from .services import services as services_blueprint from .services import services as services_blueprint
from .settings import settings as settings_blueprint from .settings import settings as settings_blueprint
app.register_blueprint(admin_blueprint, url_prefix='/admin') app.register_blueprint(admin_blueprint, url_prefix='/admin')
app.register_blueprint(auth_blueprint, url_prefix='/auth') app.register_blueprint(auth_blueprint, url_prefix='/auth')
app.register_blueprint(corpora_blueprint, url_prefix='/corpora') app.register_blueprint(corpora_blueprint, url_prefix='/corpora')

View File

@ -1,43 +0,0 @@
from app import create_app
from time import sleep
from ..decorators import background
import docker
app = create_app()
docker_client = docker.from_env()
app.app_context().push()
from . import check_corpora, check_jobs, notify # noqa
def run():
check_corpora_thread = check_corpora()
check_jobs_thread = check_jobs()
notify_thread = notify()
while True:
if not check_corpora_thread.is_alive():
check_corpora_thread = check_corpora()
if not check_jobs_thread.is_alive():
check_jobs_thread = check_jobs()
if not notify_thread.is_alive():
notify_thread = notify()
sleep(3)
@background
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)

View File

@ -1,139 +0,0 @@
from . import docker_client
from .. import db
from ..decorators import background
from ..models import Corpus
import docker
import logging
import os
import shutil
@background
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
db.session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'nopaque_default'}
container_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'

View File

@ -1,147 +0,0 @@
from datetime import datetime
from .. import configuration as config
from .. import docker_client, Session
from ..decorators import background
from ..models import Job, JobResult, NotificationData, NotificationEmailData
import docker
import logging
import json
import os
@background
def check_jobs():
session = Session()
jobs = session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: job.status == 'queued', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'queued', session)
for job in filter(lambda job: job.status == 'running', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'running', session)
for job in filter(lambda job: job.status == 'complete', jobs):
__add_notification_data(job, 'complete', session)
for job in filter(lambda job: job.status == 'failed', jobs):
__add_notification_data(job, 'failed', session)
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
session.commit()
Session.remove()
def __add_notification_data(job, notified_on_status, session):
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
return
else:
# check if a job already has associated NotificationData
notification_exists = len(job.notification_data)
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
session.add(notification_data)
# If no commit job will have no NotificationData
session.commit()
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
session.add(notification_email_data)
def __create_job_service(job):
job_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(job.secure_filename)
cmd += ' --log-dir /files'
cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
cmd += ' ' + ' '.join(json.loads(job.service_args))
service_args = {'command': cmd,
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
job.status = 'failed'
else:
job.status = 'queued'
def __checkout_job_service(job, session):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_job_service({}): '.format(job.id)
+ 'The service does not exist. '
+ '(status: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif (job.status == 'running'
and (task_state == 'complete' or task_state == 'failed')):
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id),
'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir,
filename=result,
job_id=job.id)
session.add(job_result)
def __remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
except docker.errors.DockerException:
return
else:
service.update(mounts=None)
service.remove()

View File

@ -1,28 +0,0 @@
from email.message import EmailMessage
class Notification(EmailMessage):
"""docstring for Email."""
def set_notification_content(self,
subject_template,
subject_template_values_dict,
body_txt_template_path,
body_html_template_path,
body_template_values_dict):
# Create subject with subject_template_values_dict
self['subject'] = subject_template.format(
**subject_template_values_dict)
# Open template files and insert values from body_template_values_dict
with open(body_txt_template_path) as nfile:
self.body = nfile.read().format(**body_template_values_dict)
with open(body_html_template_path) as nfile:
self.html = nfile.read().format(**body_template_values_dict)
# Set txt of email
self.set_content(self.body)
# Set html alternative
self.add_alternative(self.html, subtype='html')
def set_addresses(self, sender, recipient):
self['From'] = sender
self['to'] = recipient

View File

@ -1,16 +0,0 @@
class NotificationService:
"""This is a nopaque notifcation service object."""
def __init__(self, smtp):
# Bool to show if the mail server stoped sending mails due to exceeding
# its sending limit
self.mail_limit_exceeded = False
# Holds due to an error unsent email notifications
self.not_sent = {}
self.smtp = smtp
def send(self, email):
self.smtp.send_message(email)
def quit(self):
self.smtp.quit()

View File

@ -1,15 +0,0 @@
<html>
<body>
<p>Dear <b>{username}</b>,</p>
<p>The status of your Job/Corpus({id}) with the title <b>"{title}"</b> has changed!</p>
<p>It is now <b>{status}</b>!</p>
<p>Time of this status update was: <b>{time} UTC</b></p>
<p>You can access your Job/Corpus here: <a href="{url}">{url}</a>
</p>
<p>Kind regards!<br>
Your nopaque team</p>
</body>
</html>

View File

@ -1,10 +0,0 @@
Dear {username},
The status of your Job/Corpus({id}) with the title "{title}" has changed!
It is now {status}!
Time of this status update was: {time} UTC
You can access your Job/Corpus here: {url}
Kind regards!
Your nopaque team

View File

@ -1,111 +0,0 @@
from sqlalchemy import asc
from .libnotify.notification import Notification
from .libnotify.service import NotificationService
from .. import configuration as config
from .. import Session
from ..decorators import background
from ..models import NotificationEmailData
import logging
import os
import smtplib
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
@background
def notify():
session = Session()
if config.SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT)
else:
smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT)
if config.SMTP_USE_TLS:
smtp.starttls()
try:
smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
except smtplib.SMTPHeloError:
logging.warning('The server didnt reply properly to the HELO '
'greeting.')
return
except smtplib.SMTPAuthenticationError as e:
logging.warning('The server didnt accept the username/password '
'combination.')
logging.warning(e)
return
except smtplib.SMTPNotSupportedError:
logging.warning('The AUTH command is not supported by the server.')
return
except smtplib.SMTPException:
logging.warning('No suitable authentication method was found.')
return
notification_service = NotificationService(smtp)
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service, session)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
smtp.quit()
Session.remove()
# Email notification functions
def __create_mail_notifications(notification_service, session):
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(config.SMTP_DEFAULT_SENDER,
data.job.user.email)
subject_template = ('[nopaque] Status update for your Job/Corpora: '
'{title}!')
subject_template_values_dict = {'title': data.job.title}
url = '{}://{}/{}/{}'.format(config.PROTOCOL,
config.DOMAIN,
'jobs',
data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
'status': data.notify_status,
'time': data.creation_date,
'url': url}
txt_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.txt')
html_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.html')
notification.set_notification_content(subject_template,
subject_template_values_dict,
txt_tmplt,
html_tmplt,
body_template_values_dict)
notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not
# get swamped with mails for queued, running and complete if those
# happen in in a few seconds. Only the last update will be sent.
# This depends on the sleep time interval though.
session.delete(data)
session.commit()
return notifications
def __send_mail_notifications(notifications, notification_service):
for key, notification in notifications.items():
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
logging.warning('limit')
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
notification_service.not_sent.update(notifications)

35
web/app/tasks/__init__.py Normal file
View File

@ -0,0 +1,35 @@
from .. import db
from ..models import Corpus, Job
import docker
docker_client = docker.from_env()
from . import corpus_utils, job_utils # noqa
def check_corpora():
corpora = Corpus.query.all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
corpus_utils.create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
corpus_utils.checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
corpus_utils.create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
corpus_utils.remove_cqpserver_container(corpus)
db.session.commit()
def check_jobs():
jobs = Job.query.all()
for job in filter(lambda job: job.status == 'submitted', jobs):
job_utils.create_job_service(job)
for job in filter(lambda job: job.status == 'queued', jobs):
job_utils.checkout_job_service(job)
for job in filter(lambda job: job.status == 'running', jobs):
job_utils.checkout_job_service(job)
db.session.commit()

View File

@ -0,0 +1,120 @@
from flask import current_app
from . import docker_client
import docker
import logging
import os
import shutil
def create_build_corpus_service(corpus):
corpus_dir = os.path.join(current_app.config['DATA_DIR'],
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {
'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()
}
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.APIError as e:
logging.error('create_build_corpus_service({}): '.format(corpus.id)
+ '{} (status: {} -> failed)'.format(e, corpus.status))
corpus.status = 'failed'
else:
corpus.status = 'queued'
finally:
# TODO: send email
pass
def checkout_build_corpus_service(corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound as e:
logging.error('checkout_build_corpus_service({}):'.format(corpus.id)
+ ' {} (stauts: {} -> failed)'.format(e, corpus.status))
corpus.status = 'failed'
# TODO: handle docker.errors.APIError and docker.errors.InvalidVersion
else:
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
finally:
# TODO: send email
pass
def create_cqpserver_container(corpus):
corpus_dir = os.path.join(current_app.config['DATA_DIR'],
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {
'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'nopaque_default'
}
container_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'

101
web/app/tasks/job_utils.py Normal file
View File

@ -0,0 +1,101 @@
from datetime import datetime
from flask import current_app
from . import docker_client
from .. import db
from ..models import JobResult
import docker
import logging
import json
import os
def create_job_service(job):
job_dir = os.path.join(current_app.config['DATA_DIR'],
str(job.user_id),
'jobs',
str(job.id))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(job.secure_filename)
cmd += ' --log-dir /files'
cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
cmd += ' ' + ' '.join(json.loads(job.service_args))
service_args = {'command': cmd,
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.APIError as e:
logging.error('create_job_service({}): {} '.format(job.id, e)
+ '(status: {} -> failed)'.format(job.status))
job.status = 'failed'
else:
job.status = 'queued'
finally:
# TODO: send email
pass
def checkout_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound as e:
logging.error('checkout_job_service({}): {} '.format(job.id, e)
+ '(status: {} -> submitted)'.format(job.status))
job.status = 'submitted'
# TODO: handle docker.errors.APIError and docker.errors.InvalidVersion
else:
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif job.status == 'queued' and task_state == 'complete':
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(current_app.config['DATA_DIR'],
str(job.user_id),
'jobs',
str(job.id),
'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir,
filename=result,
job_id=job.id)
db.session.add(job_result)
elif job.status == 'running' and task_state == 'failed':
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
finally:
# TODO: send email
pass
def remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
# TODO: send email
job.status = 'canceled'
# TODO: handle docker.errors.APIError and docker.errors.InvalidVersion
else:
service.update(mounts=None)
service.remove()

View File

@ -0,0 +1,9 @@
<p>Dear <b>{{ user.username }}</b>,</p>
<p>The status of your Job/Corpus({{ job.id }}) with the title <b>"{{ job.title }}"</b> has changed!</p>
<p>It is now <b>{{ job.status }}</b>!</p>
<p>Time of this status update was: <b>{time} UTC</b></p>
<p>You can access your Job/Corpus here: <a href="{{ url_for('jobs.job', job_id=job.id) }}">{{ url_for('jobs.job', job_id=job.id) }}</a></p>
<p>Kind regards!<br>Your nopaque team</p>

View File

@ -0,0 +1,10 @@
Dear {{ user.username }},
The status of your Job/Corpus({{ job.id }}) with the title "{{ job.title }}" has changed!
It is now {{ job.status }}!
Time of this status update was: {time} UTC
You can access your Job/Corpus here: {{ url_for('jobs.job', job_id=job.id) }}
Kind regards!
Your nopaque team

View File

@ -1,15 +1,15 @@
#!/bin/bash #!/bin/bash
source venv/bin/activate source venv/bin/activate
export FLASK_APP=nopaque.py while true; do
if [[ "$#" -eq 0 ]]; then
while true; do
flask deploy flask deploy
if [[ "$?" == "0" ]]; then if [[ "$?" == "0" ]]; then
break break
fi fi
echo Deploy command failed, retrying in 5 secs... echo Deploy command failed, retrying in 5 secs...
sleep 5 sleep 5
done done
if [[ "$#" -eq 0 ]]; then
python nopaque.py python nopaque.py
elif [[ "$1" == "flask" ]]; then elif [[ "$1" == "flask" ]]; then
exec ${@:1} exec ${@:1}

View File

@ -51,6 +51,13 @@ def deploy():
Role.insert_roles() Role.insert_roles()
@app.cli.command()
def tasks():
from app.tasks import process_corpora, process_jobs
process_corpora()
process_jobs()
@app.cli.command() @app.cli.command()
def test(): def test():
"""Run the unit tests.""" """Run the unit tests."""