from decorators import background from logger.logger import init_logger from tasks import Session, docker_client, NOPAQUE_STORAGE from tasks.Models import Corpus import docker import os import shutil @background def check_corpora(): c_session = Session() corpora = c_session.query(Corpus).all() for corpus in filter(lambda corpus: corpus.status == 'submitted', corpora): __create_build_corpus_service(corpus) for corpus in filter(lambda corpus: (corpus.status == 'queued' or corpus.status == 'running'), corpora): __checkout_build_corpus_service(corpus) for corpus in filter(lambda corpus: corpus.status == 'start analysis', corpora): __create_cqpserver_container(corpus) for corpus in filter(lambda corpus: corpus.status == 'stop analysis', corpora): __remove_cqpserver_container(corpus) c_session.commit() Session.remove() def __create_build_corpus_service(corpus): corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id), 'corpora', str(corpus.id)) corpus_data_dir = os.path.join(corpus_dir, 'data') corpus_file = os.path.join(corpus_dir, 'merged', 'corpus.vrt') corpus_registry_dir = os.path.join(corpus_dir, 'registry') if os.path.exists(corpus_data_dir): shutil.rmtree(corpus_data_dir) if os.path.exists(corpus_registry_dir): shutil.rmtree(corpus_registry_dir) os.mkdir(corpus_data_dir) os.mkdir(corpus_registry_dir) service_args = {'command': 'docker-entrypoint.sh build-corpus', 'constraints': ['node.role==worker'], 'labels': {'origin': 'nopaque', 'type': 'corpus.prepare', 'corpus_id': str(corpus.id)}, 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', corpus_data_dir + ':/corpora/data:rw', corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], 'name': 'build-corpus_{}'.format(corpus.id), 'restart_policy': docker.types.RestartPolicy()} service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest') try: service = docker_client.services.get(service_args['name']) except docker.errors.NotFound: pass except docker.errors.DockerException: return else: service.remove() try: docker_client.services.create(service_image, **service_args) except docker.errors.DockerException: corpus.status = 'failed' else: corpus.status = 'queued' def __checkout_build_corpus_service(corpus): logger = init_logger() service_name = 'build-corpus_{}'.format(corpus.id) try: service = docker_client.services.get(service_name) except docker.errors.NotFound: logger.error('__checkout_build_corpus_service({}):'.format(corpus.id) + ' The service does not exist.' + ' (stauts: {} -> failed)'.format(corpus.status)) corpus.status = 'failed' return except docker.errors.DockerException: return service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' elif corpus.status == 'running' and task_state == 'complete': service.remove() corpus.status = 'prepared' elif corpus.status == 'running' and task_state == 'failed': service.remove() corpus.status = task_state def __create_cqpserver_container(corpus): corpus_dir = os.path.join(NOPAQUE_STORAGE, str(corpus.user_id), 'corpora', str(corpus.id)) corpus_data_dir = os.path.join(corpus_dir, 'data') corpus_registry_dir = os.path.join(corpus_dir, 'registry') container_args = {'command': 'cqpserver', 'detach': True, 'volumes': [corpus_data_dir + ':/corpora/data:rw', corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], 'name': 'cqpserver_{}'.format(corpus.id), 'network': 'opaque_default'} container_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest') try: container = docker_client.containers.get(container_args['name']) except docker.errors.NotFound: pass except docker.errors.DockerException: return else: container.remove(force=True) try: docker_client.containers.run(container_image, **container_args) except docker.errors.DockerException: return else: corpus.status = 'analysing' def __remove_cqpserver_container(corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: container = docker_client.containers.get(container_name) except docker.errors.NotFound: pass except docker.errors.DockerException: return else: container.remove(force=True) corpus.status = 'prepared'