Delete old daemon package and fix typos

This commit is contained in:
Patrick Jentsch 2020-11-17 14:58:03 +01:00
parent d187a83b54
commit ddee38e2a5
20 changed files with 6 additions and 696 deletions

View File

@ -39,7 +39,7 @@ HOST_DOCKER_GID=
# HINT: Use this bash command `python -c "import uuid; print(uuid.uuid4().hex)"`
# SECRET_KEY=
# Example: nopaque.example.com nopaque.example.com:5000
# Example: nopaque.example.com/nopaque.example.com:5000
# HINT: If your instance is publicly available on a different Port then 80/443,
# you will have to add this to the server name
SERVER_NAME=
@ -124,9 +124,12 @@ NOPAQUE_ADMIN=
# DEFAULT: False
# NOPAQUE_DAEMON_ENABLED=
# The hostname or IP address for the server to listen on.
# HINT: To use a domain locally, add any names that should route to the app to your hosts file.
# DEFAULT: 0.0.0.0
# NOPAQUE_HOST=
# The port number for the server to listen on.
# DEFAULT: 5000
# NOPAQUE_PORT=

View File

@ -1,6 +0,0 @@
# Docker related files
Dockerfile
.dockerignore
# Packages
__pycache__

View File

@ -1,32 +0,0 @@
FROM python:3.9.0-slim-buster
LABEL authors="Patrick Jentsch <p.jentsch@uni-bielefeld.de>, Stephan Porada <sporada@uni-bielefeld.de>"
ARG DOCKER_GID
ARG GID
ARG UID
ENV LANG=C.UTF-8
RUN apt-get update \
&& apt-get install --no-install-recommends --yes \
build-essential \
libpq-dev \
&& rm -r /var/lib/apt/lists/*
RUN groupadd --gid ${DOCKER_GID} --system docker \
&& groupadd --gid ${GID} --system nopaqued \
&& useradd --create-home --gid ${GID} --groups ${DOCKER_GID} --no-log-init --system --uid ${UID} nopaqued
USER nopaqued
WORKDIR /home/nopaqued
COPY --chown=nopaqued:nopaqued [".", "."]
RUN python -m venv venv \
&& venv/bin/pip install --requirement requirements.txt
ENTRYPOINT ["./boot.sh"]

View File

@ -1,31 +0,0 @@
from config import config
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from time import sleep
import docker
import os
configuration = config[os.environ.get('NOPAQUE_CONFIG', 'development')]
configuration.init()
docker_client = docker.from_env()
engine = create_engine(configuration.SQLALCHEMY_DATABASE_URI)
Session = scoped_session(sessionmaker(bind=engine))
def run():
from .tasks.check_corpora import check_corpora
check_corpora_thread = check_corpora()
from .tasks.check_jobs import check_jobs
check_jobs_thread = check_jobs()
from .tasks.notify import notify
notify_thread = notify()
while True:
if not check_corpora_thread.is_alive():
check_corpora_thread = check_corpora()
if not check_jobs_thread.is_alive():
check_jobs_thread = check_jobs()
if not notify_thread.is_alive():
notify_thread = notify()
sleep(3)

View File

@ -1,14 +0,0 @@
from functools import wraps
from threading import Thread
def background(f):
'''
' This decorator executes a function in a Thread.
'''
@wraps(f)
def wrapped(*args, **kwargs):
thread = Thread(target=f, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapped

View File

@ -1,52 +0,0 @@
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import relationship
from . import engine
Base = automap_base()
# Classes for database models
class Corpus(Base):
__tablename__ = 'corpora'
files = relationship('CorpusFile', collection_class=set)
class CorpusFile(Base):
__tablename__ = 'corpus_files'
class Job(Base):
__tablename__ = 'jobs'
inputs = relationship('JobInput', collection_class=set)
results = relationship('JobResult', collection_class=set)
notification_data = relationship('NotificationData', collection_class=list)
notification_email_data = relationship('NotificationEmailData',
collection_class=list)
class JobInput(Base):
__tablename__ = 'job_results'
class JobResult(Base):
__tablename__ = 'job_results'
class NotificationData(Base):
__tablename__ = 'notification_data'
job = relationship('Job', collection_class=set)
class NotificationEmailData(Base):
__tablename__ = 'notification_email_data'
job = relationship('Job', collection_class=set)
class User(Base):
__tablename__ = 'users'
jobs = relationship('Job', collection_class=set)
corpora = relationship('Corpus', collection_class=set)
Base.prepare(engine, reflect=True)

View File

@ -1,140 +0,0 @@
from .. import configuration as config
from .. import docker_client, Session
from ..decorators import background
from ..models import Corpus
import docker
import logging
import os
import shutil
@background
def check_corpora():
session = Session()
corpora = session.query(Corpus).all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'nopaque_default'}
container_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'

View File

@ -1,147 +0,0 @@
from datetime import datetime
from .. import configuration as config
from .. import docker_client, Session
from ..decorators import background
from ..models import Job, JobResult, NotificationData, NotificationEmailData
import docker
import logging
import json
import os
@background
def check_jobs():
session = Session()
jobs = session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: job.status == 'queued', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'queued', session)
for job in filter(lambda job: job.status == 'running', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'running', session)
for job in filter(lambda job: job.status == 'complete', jobs):
__add_notification_data(job, 'complete', session)
for job in filter(lambda job: job.status == 'failed', jobs):
__add_notification_data(job, 'failed', session)
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
session.commit()
Session.remove()
def __add_notification_data(job, notified_on_status, session):
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
return
else:
# check if a job already has associated NotificationData
notification_exists = len(job.notification_data)
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
session.add(notification_data)
# If no commit job will have no NotificationData
session.commit()
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
session.add(notification_email_data)
def __create_job_service(job):
job_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
cmd += ' -f {}'.format(job.secure_filename)
cmd += ' --log-dir /files'
cmd += ' --zip [{}]_{}'.format(job.service, job.secure_filename)
cmd += ' ' + ' '.join(json.loads(job.service_args))
service_args = {'command': cmd,
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'service.{}'.format(job.service),
'job_id': str(job.id)},
'mounts': [job_dir + ':/files:rw'],
'name': 'job_{}'.format(job.id),
'resources': docker.types.Resources(
cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=job.mem_mb * (10 ** 6)),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/'
+ job.service + ':' + job.service_version)
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
job.status = 'failed'
else:
job.status = 'queued'
def __checkout_job_service(job, session):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logging.error('__checkout_job_service({}): '.format(job.id)
+ 'The service does not exist. '
+ '(status: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
elif (job.status == 'running'
and (task_state == 'complete' or task_state == 'failed')):
service.remove()
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id),
'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir,
filename=result,
job_id=job.id)
session.add(job_result)
def __remove_job_service(job):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
except docker.errors.DockerException:
return
else:
service.update(mounts=None)
service.remove()

View File

@ -1,28 +0,0 @@
from email.message import EmailMessage
class Notification(EmailMessage):
"""docstring for Email."""
def set_notification_content(self,
subject_template,
subject_template_values_dict,
body_txt_template_path,
body_html_template_path,
body_template_values_dict):
# Create subject with subject_template_values_dict
self['subject'] = subject_template.format(
**subject_template_values_dict)
# Open template files and insert values from body_template_values_dict
with open(body_txt_template_path) as nfile:
self.body = nfile.read().format(**body_template_values_dict)
with open(body_html_template_path) as nfile:
self.html = nfile.read().format(**body_template_values_dict)
# Set txt of email
self.set_content(self.body)
# Set html alternative
self.add_alternative(self.html, subtype='html')
def set_addresses(self, sender, recipient):
self['From'] = sender
self['to'] = recipient

View File

@ -1,16 +0,0 @@
class NotificationService:
"""This is a nopaque notifcation service object."""
def __init__(self, smtp):
# Bool to show if the mail server stoped sending mails due to exceeding
# its sending limit
self.mail_limit_exceeded = False
# Holds due to an error unsent email notifications
self.not_sent = {}
self.smtp = smtp
def send(self, email):
self.smtp.send_message(email)
def quit(self):
self.smtp.quit()

View File

@ -1,15 +0,0 @@
<html>
<body>
<p>Dear <b>{username}</b>,</p>
<p>The status of your Job/Corpus({id}) with the title <b>"{title}"</b> has changed!</p>
<p>It is now <b>{status}</b>!</p>
<p>Time of this status update was: <b>{time} UTC</b></p>
<p>You can access your Job/Corpus here: <a href="{url}">{url}</a>
</p>
<p>Kind regards!<br>
Your nopaque team</p>
</body>
</html>

View File

@ -1,10 +0,0 @@
Dear {username},
The status of your Job/Corpus({id}) with the title "{title}" has changed!
It is now {status}!
Time of this status update was: {time} UTC
You can access your Job/Corpus here: {url}
Kind regards!
Your nopaque team

View File

@ -1,111 +0,0 @@
from sqlalchemy import asc
from .libnotify.notification import Notification
from .libnotify.service import NotificationService
from .. import configuration as config
from .. import Session
from ..decorators import background
from ..models import NotificationEmailData
import logging
import os
import smtplib
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
@background
def notify():
session = Session()
if config.SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT)
else:
smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT)
if config.SMTP_USE_TLS:
smtp.starttls()
try:
smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
except smtplib.SMTPHeloError:
logging.warning('The server didnt reply properly to the HELO '
'greeting.')
return
except smtplib.SMTPAuthenticationError as e:
logging.warning('The server didnt accept the username/password '
'combination.')
logging.warning(e)
return
except smtplib.SMTPNotSupportedError:
logging.warning('The AUTH command is not supported by the server.')
return
except smtplib.SMTPException:
logging.warning('No suitable authentication method was found.')
return
notification_service = NotificationService(smtp)
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service, session)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
smtp.quit()
Session.remove()
# Email notification functions
def __create_mail_notifications(notification_service, session):
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(config.SMTP_DEFAULT_SENDER,
data.job.user.email)
subject_template = ('[nopaque] Status update for your Job/Corpora: '
'{title}!')
subject_template_values_dict = {'title': data.job.title}
url = '{}://{}/{}/{}'.format(config.PROTOCOL,
config.DOMAIN,
'jobs',
data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
'status': data.notify_status,
'time': data.creation_date,
'url': url}
txt_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.txt')
html_tmplt = os.path.join(ROOT_DIR,
'libnotify/templates/notification.html')
notification.set_notification_content(subject_template,
subject_template_values_dict,
txt_tmplt,
html_tmplt,
body_template_values_dict)
notifications[data.job.id] = notification
# Using a dictionary for notifications avoids sending multiple mails
# if the status of a job changes in a few seconds. The user will not
# get swamped with mails for queued, running and complete if those
# happen in in a few seconds. Only the last update will be sent.
# This depends on the sleep time interval though.
session.delete(data)
session.commit()
return notifications
def __send_mail_notifications(notifications, notification_service):
for key, notification in notifications.items():
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
logging.warning('limit')
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
notification_service.not_sent.update(notifications)

View File

@ -1,3 +0,0 @@
#!/bin/bash
source venv/bin/activate
python nopaqued.py

View File

@ -1,71 +0,0 @@
import logging
import os
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
class Config:
''' # Email # '''
SMTP_DEFAULT_SENDER = os.environ.get('NOPAQUE_SMTP_DEFAULT_SENDER')
SMTP_PASSWORD = os.environ.get('NOPAQUE_SMTP_PASSWORD')
SMTP_PORT = int(os.environ.get('NOPAQUE_SMTP_PORT'))
SMTP_SERVER = os.environ.get('NOPAQUE_SMTP_SERVER')
SMTP_USERNAME = os.environ.get('NOPAQUE_SMTP_USERNAME')
SMTP_USE_SSL = os.environ.get(
'NOPAQUE_SMTP_USE_SSL', 'false').lower() == 'true'
SMTP_USE_TLS = os.environ.get(
'NOPAQUE_SMTP_USE_TLS', 'false').lower() == 'true'
''' # General # '''
DATA_DIR = os.environ.get('NOPAQUE_DATA_DIR', '/mnt/nopaque')
DOMAIN = os.environ.get('NOPAQUE_DOMAIN', 'localhost')
PROTOCOL = os.environ.get('NOPAQUE_PROTOCOL', 'http')
SECRET_KEY = os.environ.get('NOPAQUE_SECRET_KEY', 'hard to guess string')
''' # Logging # '''
LOG_DATE_FORMAT = os.environ.get('NOPAQUE_LOG_DATE_FORMAT',
'%Y-%m-%d %H:%M:%S')
LOG_FILE = os.environ.get('NOPAQUED_LOG_FILE',
os.path.join(ROOT_DIR, 'nopaqued.log'))
LOG_FORMAT = os.environ.get(
'NOPAQUE_LOG_FORMAT',
'[%(asctime)s] %(levelname)s in '
'%(pathname)s (function: %(funcName)s, line: %(lineno)d): %(message)s'
)
LOG_LEVEL = os.environ.get('NOPAQUE_LOG_LEVEL', 'WARNING')
@classmethod
def init(cls):
# Set up logging according to the corresponding (LOG_*) variables
logging.basicConfig(datefmt=cls.LOG_DATE_FORMAT,
filename=cls.LOG_FILE,
format=cls.LOG_FORMAT,
level=cls.LOG_LEVEL)
class DevelopmentConfig(Config):
''' # Database # '''
SQLALCHEMY_DATABASE_URI = os.environ.get(
'NOPAQUE_DEV_DATABASE_URL',
'sqlite:///' + os.path.join(ROOT_DIR, 'data-dev.sqlite')
)
class ProductionConfig(Config):
''' # Database # '''
SQLALCHEMY_DATABASE_URI = os.environ.get(
'NOPAQUE_DATABASE_URL',
'sqlite:///' + os.path.join(ROOT_DIR, 'data.sqlite')
)
class TestingConfig(Config):
''' # Database # '''
SQLALCHEMY_DATABASE_URI = os.environ.get(
'NOPAQUE_TEST_DATABASE_URL', 'sqlite://')
config = {'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig}

View File

@ -1,13 +0,0 @@
from dotenv import load_dotenv
from app import run
import os
# Load environment variables
DOTENV_FILE = os.path.join(os.path.dirname(__file__), '.env')
if os.path.exists(DOTENV_FILE):
load_dotenv(DOTENV_FILE)
if __name__ == '__main__':
run()

View File

@ -1,4 +0,0 @@
docker
psycopg2
python-dotenv
SQLAlchemy

View File

@ -1,7 +1,7 @@
#!/bin/bash
if [[ "${NOPAQUE_DAEMON_ENABLED}" == "True" ]]; then
echo "Starting nopaque daemon..."
echo "INFO Starting nopaque daemon process..."
./nopaque-daemon.sh &
fi
@ -13,7 +13,7 @@ if [[ "${#}" -eq 0 ]]; then
if [[ "${?}" == "0" ]]; then
break
fi
echo Deploy command failed, retrying in 5 secs...
echo "Deploy command failed, retrying in 5 secs..."
sleep 5
done
python nopaque.py