Update daemon

This commit is contained in:
Patrick Jentsch 2020-06-10 14:18:02 +02:00
parent 6e26e38326
commit b3e467beac
5 changed files with 40 additions and 12 deletions

View File

@ -1,25 +1,33 @@
from concurrent.futures import ThreadPoolExecutor from logger.logger import init_logger
from tasks.check_corpora import check_corpora from tasks.check_corpora import check_corpora
from tasks.check_jobs import check_jobs from tasks.check_jobs import check_jobs
from tasks.notify import notify from tasks.notify import notify
from time import sleep
import os import os
# TODO: Check if thread is still alive and execute next thread after that
# TODO: Check line length
def nopaqued(): def nopaqued():
execute_notifications = bool(os.environ.get('NOPAQUE_EXECUTE_NOTIFICATIONS', True)) # noqa logger = init_logger()
# executing background functions NOPAQUE_EXECUTE_NOTIFICATIONS = os.environ.get('NOPAQUE_EXECUTE_NOTIFICATIONS', 'True').lower() == 'true' # noqa
threads = {'check_corpora': None, 'check_jobs': None, 'notify': None}
threads['check_corpora'] = check_corpora()
threads['check_jobs'] = check_jobs()
threads['notify'] = notify(NOPAQUE_EXECUTE_NOTIFICATIONS)
while True: while True:
with ThreadPoolExecutor(max_workers=3) as executor: logger.warning('check_corpora: {}'.format(threads['check_corpora'].is_alive()))
executor.submit(check_jobs) if not threads['check_corpora'].is_alive():
executor.submit(check_corpora) threads['check_corpora'] = check_corpora()
executor.submit(notify, execute_notifications) logger.warning('check_jobs: {}'.format(threads['check_jobs'].is_alive()))
if not threads['check_jobs'].is_alive():
threads['check_jobs'] = check_jobs()
logger.warning('notify: {}'.format(threads['notify'].is_alive()))
if not threads['notify'].is_alive():
threads['notify'] = notify(NOPAQUE_EXECUTE_NOTIFICATIONS)
# If execute_notifications True mails are sent. # If execute_notifications True mails are sent.
# If execute_notifications False no mails are sent. # If execute_notifications False no mails are sent.
# But notification status will be set nonetheless. # But notification status will be set nonetheless.
sleep(3)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,11 +1,13 @@
from logger.logger import init_logger from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.decorators import background
from tasks.Models import Corpus from tasks.Models import Corpus
import docker import docker
import os import os
import shutil import shutil
@background
def check_corpora(): def check_corpora():
c_session = Session() c_session = Session()
corpora = c_session.query(Corpus).all() corpora = c_session.query(Corpus).all()
@ -101,7 +103,7 @@ def __create_cqpserver_container(corpus):
'volumes': [corpus_data_dir + ':/corpora/data:rw', 'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id), 'name': 'cqpserver_{}'.format(corpus.id),
'network': 'opaque_default'} 'network': 'nopaque_default'}
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest') container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try: try:
container = docker_client.containers.get(container_args['name']) container = docker_client.containers.get(container_args['name'])

View File

@ -1,12 +1,14 @@
from datetime import datetime from datetime import datetime
from logger.logger import init_logger from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks import Session, docker_client, NOPAQUE_STORAGE
from tasks.decorators import background
from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult from tasks.Models import Job, NotificationData, NotificationEmailData, JobResult
import docker import docker
import json import json
import os import os
@background
def check_jobs(): def check_jobs():
# logger = init_logger() # logger = init_logger()
cj_session = Session() cj_session = Session()

View File

@ -0,0 +1,14 @@
from functools import wraps
from threading import Thread
def background(f):
'''
' This decorator executes a function in a Thread.
'''
@wraps(f)
def wrapped(*args, **kwargs):
thread = Thread(target=f, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapped

View File

@ -2,10 +2,12 @@ from notify.notification import Notification
from notify.service import NotificationService from notify.service import NotificationService
from sqlalchemy import asc from sqlalchemy import asc
from tasks import Session from tasks import Session
from tasks.decorators import background
from tasks.Models import NotificationEmailData from tasks.Models import NotificationEmailData
import os import os
@background
def notify(execute_flag): def notify(execute_flag):
# If True mails are sent normaly # If True mails are sent normaly
# If False mails are not sent. Used to avoid sending mails for jobs that # If False mails are not sent. Used to avoid sending mails for jobs that