Huge config update and smtp fix for daemon

This commit is contained in:
Patrick Jentsch
2020-10-08 12:34:02 +02:00
parent 5e221d90ad
commit dff92cbf4d
43 changed files with 613 additions and 1204 deletions

View File

@ -1,6 +1,6 @@
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import relationship
from tasks import engine
from . import engine
Base = automap_base()

View File

@ -1,22 +1,11 @@
from config import Config
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import os
import docker
''' Global constants '''
NOPAQUE_STORAGE = os.environ.get('NOPAQUE_STORAGE')
''' Docker client '''
config = Config()
config.init_app()
docker_client = docker.from_env()
docker_client.login(password=os.environ.get('GITLAB_PASSWORD'),
registry="gitlab.ub.uni-bielefeld.de:4567",
username=os.environ.get('GITLAB_USERNAME'))
''' Scoped session '''
engine = create_engine(
'postgresql://{}:{}@db/{}'.format(
os.environ.get('POSTGRES_USER'),
os.environ.get('POSTGRES_PASSWORD'),
os.environ.get('POSTGRES_DB_NAME')))
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
engine = create_engine(config.SQLALCHEMY_DATABASE_URI)
Session = scoped_session(sessionmaker(bind=engine))

View File

@ -1,16 +1,16 @@
from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.decorators import background
from tasks.Models import Corpus
from . import config, docker_client, Session
from .decorators import background
from .models import Corpus
import docker
import logging
import os
import shutil
@background
def check_corpora():
c_session = Session()
corpora = c_session.query(Corpus).all()
session = Session()
corpora = session.query(Corpus).all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
@ -23,13 +23,15 @@ def check_corpora():
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
c_session.commit()
session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
@ -49,7 +51,8 @@ def __create_build_corpus_service(corpus):
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
service_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
@ -67,14 +70,13 @@ def __create_build_corpus_service(corpus):
def __checkout_build_corpus_service(corpus):
logger = init_logger()
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
logging.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
@ -94,8 +96,10 @@ def __checkout_build_corpus_service(corpus):
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_dir = os.path.join(config.DATA_DIR,
str(corpus.user_id),
'corpora',
str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
@ -104,7 +108,8 @@ def __create_cqpserver_container(corpus):
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
'network': 'nopaque_default'}
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
container_image = \
'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest'
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:

View File

@ -1,46 +1,42 @@
from datetime import datetime
from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.decorators import background
from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult
from . import config, docker_client, Session
from .decorators import background
from .models import Job, JobResult, NotificationData, NotificationEmailData
import docker
import logging
import json
import os
@background
def check_jobs():
# logger = init_logger()
cj_session = Session()
jobs = cj_session.query(Job).all()
session = Session()
jobs = session.query(Job).all()
for job in filter(lambda job: job.status == 'submitted', jobs):
__create_job_service(job)
for job in filter(lambda job: (job.status == 'queued'), jobs):
__checkout_job_service(job, cj_session)
__add_notification_data(job, 'queued', cj_session)
for job in filter(lambda job: (job.status == 'running'), jobs):
__checkout_job_service(job, cj_session)
__add_notification_data(job, 'running', cj_session)
for job in filter(lambda job: job.status == 'queued', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'queued', session)
for job in filter(lambda job: job.status == 'running', jobs):
__checkout_job_service(job, session)
__add_notification_data(job, 'running', session)
for job in filter(lambda job: job.status == 'complete', jobs):
__add_notification_data(job, 'complete', cj_session)
__add_notification_data(job, 'complete', session)
for job in filter(lambda job: job.status == 'failed', jobs):
__add_notification_data(job, 'failed', cj_session)
__add_notification_data(job, 'failed', session)
for job in filter(lambda job: job.status == 'canceling', jobs):
__remove_job_service(job)
cj_session.commit()
session.commit()
Session.remove()
def __add_notification_data(job, notified_on_status, scoped_session):
logger = init_logger()
def __add_notification_data(job, notified_on_status, session):
# checks if user wants any notifications at all
if (job.user.setting_job_status_mail_notifications == 'none'):
# logger.warning('User does not want any notifications!')
return
# checks if user wants only notification on completed jobs
elif (job.user.setting_job_status_mail_notifications == 'end'
and notified_on_status != 'complete'):
# logger.warning('User only wants notifications on job completed!')
return
else:
# check if a job already has associated NotificationData
@ -48,27 +44,21 @@ def __add_notification_data(job, notified_on_status, scoped_session):
# create notification_data for current job if there is none
if (notification_exists == 0):
notification_data = NotificationData(job_id=job.id)
scoped_session.add(notification_data)
scoped_session.commit()
session.add(notification_data)
# If no commit job will have no NotificationData
# logger.warning('Created NotificationData for current Job.'))
else:
pass
# logger.warning('Job already had notification: {}'.format(notification_exists))
session.commit()
if (job.notification_data[0].notified_on != notified_on_status):
notification_email_data = NotificationEmailData(job_id=job.id)
notification_email_data.notify_status = notified_on_status
notification_email_data.creation_date = datetime.utcnow()
job.notification_data[0].notified_on = notified_on_status
scoped_session.add(notification_email_data)
logger.warning('Created NotificationEmailData for current Job.')
else:
# logger.warning('NotificationEmailData has already been created for current Job!')
pass
session.add(notification_email_data)
def __create_job_service(job):
job_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id), 'jobs',
job_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id))
cmd = '{} -i /files -o /files/output'.format(job.service)
if job.service == 'file-setup':
@ -105,15 +95,14 @@ def __create_job_service(job):
job.status = 'queued'
def __checkout_job_service(job, scoped_session):
logger = init_logger()
def __checkout_job_service(job, session):
service_name = 'job_{}'.format(job.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_job_service({}):'.format(job.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(job.status))
logging.error('__checkout_job_service({}): '.format(job.id)
+ 'The service does not exist. '
+ '(status: {} -> failed)'.format(job.status))
job.status = 'failed'
return
except docker.errors.DockerException:
@ -130,14 +119,18 @@ def __checkout_job_service(job, scoped_session):
job.end_date = datetime.utcnow()
job.status = task_state
if task_state == 'complete':
results_dir = os.path.join(NOPAQUE_STORAGE, str(job.user_id),
'jobs', str(job.id), 'output')
results_dir = os.path.join(config.DATA_DIR,
str(job.user_id),
'jobs',
str(job.id),
'output')
results = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result in results:
job_result = JobResult(dir=results_dir, filename=result,
job_result = JobResult(dir=results_dir,
filename=result,
job_id=job.id)
scoped_session.add(job_result)
session.add(job_result)
def __remove_job_service(job):

View File

@ -1,58 +1,71 @@
from notify.notification import Notification
from notify.service import NotificationService
from sqlalchemy import asc
from tasks import Session
from tasks.decorators import background
from tasks.Models import NotificationEmailData
import os
from . import config, Session
from .decorators import background
from .models import NotificationEmailData
import logging
import smtplib
@background
def notify(execute_flag):
# If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that
# have been completed a long time ago. Use this if you implement notify
# into an already existing nopaque instance. Change it to True after the
# daemon has run one time with the flag set to False.
# Initialize notification service
notification_service = NotificationService(execute_flag)
notification_service.get_smtp_configs()
notification_service.set_server()
def notify():
session = Session()
if config.SMTP_USE_SSL:
smtp = smtplib.SMTP_SSL(host=config.SMTP_SERVER, port=config.SMTP_PORT)
else:
smtp = smtplib.SMTP(host=config.SMTP_SERVER, port=config.SMTP_PORT)
if config.SMTP_USE_TLS:
smtp.starttls()
try:
smtp.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
except smtplib.SMTPHeloError:
logging.warning('The server didnt reply properly to the HELO '
'greeting.')
return
except smtplib.SMTPAuthenticationError as e:
logging.warning('The server didnt accept the username/password '
'combination.')
logging.warning(e)
return
except smtplib.SMTPNotSupportedError:
logging.warning('The AUTH command is not supported by the server.')
return
except smtplib.SMTPException:
logging.warning('No suitable authentication method was found.')
return
notification_service = NotificationService(smtp)
# create notifications (content, recipient etc.)
notifications = __create_mail_notifications(notification_service)
notifications = __create_mail_notifications(notification_service, session)
# only login and send mails if there are any notifications
if (len(notifications) > 0):
try:
notification_service.login()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
notification_service.quit()
except Exception as e:
notification_service.not_sent.update(notifications)
notification_service.quit()
# combine new and unsent notifications
notifications.update(notification_service.not_sent)
# send all notifications
__send_mail_notifications(notifications, notification_service)
# remove unsent notifications because they have been sent now
# but only if mail limit has not been exceeded
if (notification_service.mail_limit_exceeded is not True):
notification_service.not_sent = {}
smtp.quit()
Session.remove()
# Email notification functions
def __create_mail_notifications(notification_service):
mn_session = Session()
notification_email_data = mn_session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all()
def __create_mail_notifications(notification_service, session):
notification_email_data = session.query(NotificationEmailData).order_by(asc(NotificationEmailData.creation_date)).all() # noqa
notifications = {}
for data in notification_email_data:
notification = Notification()
notification.set_addresses(notification_service.email_address,
notification.set_addresses(config.SMTP_DEFAULT_SENDER,
data.job.user.email)
subject_template = '[nopaque] Status update for your Job/Corpora: {title}!'
subject_template = ('[nopaque] Status update for your Job/Corpora: '
'{title}!')
subject_template_values_dict = {'title': data.job.title}
protocol = os.environ.get('NOPAQUE_PROTOCOL')
domain = os.environ.get('NOPAQUE_DOMAIN')
url = '{protocol}://{domain}/{jobs}/{id}'.format(
protocol=protocol, domain=domain, jobs='jobs', id=data.job.id)
url = '{}://{}/{}/{}'.format(config.PROTOCOL,
config.DOMAIN,
'jobs',
data.job.id)
body_template_values_dict = {'username': data.job.user.username,
'id': data.job.id,
'title': data.job.title,
@ -72,9 +85,8 @@ def __create_mail_notifications(notification_service):
# get swamped with mails for queued, running and complete if those
# happen in in a few seconds. Only the last update will be sent.
# This depends on the sleep time interval though.
mn_session.delete(data)
mn_session.commit()
Session.remove()
session.delete(data)
session.commit()
return notifications
@ -83,8 +95,10 @@ def __send_mail_notifications(notifications, notification_service):
try:
notification_service.send(notification)
notification_service.mail_limit_exceeded = False
except Exception as e:
except Exception:
# Adds notifications to unsent if mail server exceded limit for
# consecutive mail sending
logging.warning('limit')
notification_service.not_sent[key] = notification
notification_service.mail_limit_exceeded = True
notification_service.not_sent.update(notifications)