nopaque/daemon/tasks/check_corpora.py

135 lines
5.2 KiB
Python
Raw Normal View History

2020-06-05 14:42:04 +02:00
from logger.logger import init_logger
from tasks import Session, docker_client, NOPAQUE_STORAGE
2020-06-10 14:18:02 +02:00
from tasks.decorators import background
2020-06-05 14:42:04 +02:00
from tasks.Models import Corpus
import docker
import os
import shutil
2020-06-10 14:18:02 +02:00
@background
2020-06-05 14:42:04 +02:00
def check_corpora():
c_session = Session()
corpora = c_session.query(Corpus).all()
for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora):
__create_build_corpus_service(corpus)
for corpus in filter(lambda corpus: (corpus.status == 'queued'
or corpus.status == 'running'),
corpora):
__checkout_build_corpus_service(corpus)
for corpus in filter(lambda corpus: corpus.status == 'start analysis',
corpora):
__create_cqpserver_container(corpus)
for corpus in filter(lambda corpus: corpus.status == 'stop analysis',
corpora):
__remove_cqpserver_container(corpus)
c_session.commit()
Session.remove()
def __create_build_corpus_service(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
if os.path.exists(corpus_data_dir):
shutil.rmtree(corpus_data_dir)
if os.path.exists(corpus_registry_dir):
shutil.rmtree(corpus_registry_dir)
os.mkdir(corpus_data_dir)
os.mkdir(corpus_registry_dir)
service_args = {'command': 'docker-entrypoint.sh build-corpus',
'constraints': ['node.role==worker'],
'labels': {'origin': 'nopaque',
'type': 'corpus.prepare',
'corpus_id': str(corpus.id)},
'mounts': [corpus_file + ':/root/files/corpus.vrt:ro',
corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'build-corpus_{}'.format(corpus.id),
'restart_policy': docker.types.RestartPolicy()}
service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
service = docker_client.services.get(service_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
service.remove()
try:
docker_client.services.create(service_image, **service_args)
except docker.errors.DockerException:
corpus.status = 'failed'
else:
corpus.status = 'queued'
def __checkout_build_corpus_service(corpus):
logger = init_logger()
service_name = 'build-corpus_{}'.format(corpus.id)
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
logger.error('__checkout_build_corpus_service({}):'.format(corpus.id)
+ ' The service does not exist.'
+ ' (stauts: {} -> failed)'.format(corpus.status))
corpus.status = 'failed'
return
except docker.errors.DockerException:
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
elif corpus.status == 'running' and task_state == 'complete':
service.remove()
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
service.remove()
corpus.status = task_state
def __create_cqpserver_container(corpus):
corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id),
'corpora', str(corpus.id))
corpus_data_dir = os.path.join(corpus_dir, 'data')
corpus_registry_dir = os.path.join(corpus_dir, 'registry')
container_args = {'command': 'cqpserver',
'detach': True,
'volumes': [corpus_data_dir + ':/corpora/data:rw',
corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'],
'name': 'cqpserver_{}'.format(corpus.id),
2020-06-10 14:18:02 +02:00
'network': 'nopaque_default'}
2020-06-05 14:42:04 +02:00
container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest')
try:
container = docker_client.containers.get(container_args['name'])
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
try:
docker_client.containers.run(container_image, **container_args)
except docker.errors.DockerException:
return
else:
corpus.status = 'analysing'
def __remove_cqpserver_container(corpus):
container_name = 'cqpserver_{}'.format(corpus.id)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
pass
except docker.errors.DockerException:
return
else:
container.remove(force=True)
corpus.status = 'prepared'