from flask import current_app from ..models import Corpus import docker import logging import os import shutil class CheckCorporaMixin: def check_corpora(self): corpora = Corpus.query.all() queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora)) # noqa running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora)) # noqa start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora)) # noqa analysing_corpora = list(filter(lambda corpus: corpus.status == 'analysing', corpora)) # noqa stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora)) # noqa submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora)) # noqa for corpus in submitted_corpora: self.create_build_corpus_service(corpus) for corpus in queued_corpora + running_corpora: self.checkout_build_corpus_service(corpus) for corpus in start_analysis_corpora: self.create_cqpserver_container(corpus) for corpus in analysing_corpora: self.checkout_analysing_corpus_container(corpus) for corpus in stop_analysis_corpora: self.remove_cqpserver_container(corpus) def create_build_corpus_service(self, corpus): ''' # Docker service settings # ''' ''' ## Command ## ''' command = 'docker-entrypoint.sh build-corpus' ''' ## Constraints ## ''' constraints = ['node.role==worker'] ''' ## Image ## ''' image = current_app.config['DOCKER_IMAGE_PREFIX'] + 'cqpserver:latest' ''' ## Labels ## ''' labels = { 'origin': current_app.config['SERVER_NAME'], 'type': 'build-corpus', 'corpus_id': str(corpus.id) } ''' ## Mounts ## ''' ''' ### Corpus file mount ### ''' corpus_file_source = os.path.join(corpus.path, 'merged', 'corpus.vrt') corpus_file_target = '/root/files/corpus.vrt' corpus_file_mount = \ corpus_file_source + ':' + corpus_file_target + ':ro' ''' ### Corpus data mount ### ''' corpus_data_source = os.path.join(corpus.path, 'data') corpus_data_target = '/corpora/data' corpus_data_mount = \ corpus_data_source + ':' + corpus_data_target + ':rw' # Make sure that their is no data in the corpus data directory shutil.rmtree(corpus_data_source, ignore_errors=True) os.mkdir(corpus_data_source) ''' ### Corpus registry mount ### ''' corpus_registry_source = os.path.join(corpus.path, 'registry') corpus_registry_target = '/usr/local/share/cwb/registry' corpus_registry_mount = \ corpus_registry_source + ':' + corpus_registry_target + ':rw' # Make sure that their is no data in the corpus registry directory shutil.rmtree(corpus_registry_source, ignore_errors=True) os.mkdir(corpus_registry_source) mounts = [corpus_file_mount, corpus_data_mount, corpus_registry_mount] ''' ## Name ## ''' name = 'build-corpus_{}'.format(corpus.id) ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: self.docker.services.create( image, command=command, constraints=constraints, labels=labels, mounts=mounts, name=name, restart_policy=restart_policy ) except docker.errors.APIError as e: logging.error( 'Create "{}" service raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'queued' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) def checkout_build_corpus_service(self, corpus): service_name = 'build-corpus_{}'.format(corpus.id) try: service = self.docker.services.get(service_name) except docker.errors.NotFound: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.NotFound" The service does not exist. ' + '(corpus.status: {} -> failed)'.format(corpus.status) ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) except docker.errors.InvalidVersion: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.InvalidVersion" One of the arguments is ' + 'not supported with the current API version.' ) else: service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) elif (corpus.status == 'running' and task_state in ['complete', 'failed']): try: service.remove() except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' # noqa + 'Details: {}'.format(e) ) return else: corpus.status = \ 'prepared' if task_state == 'complete' else 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) def create_cqpserver_container(self, corpus): ''' # Docker container settings # ''' ''' ## Command ## ''' command = 'cqpserver' ''' ## Detach ## ''' detach = True ''' ## Image ## ''' image = current_app.config['DOCKER_IMAGE_PREFIX'] + 'cqpserver:latest' ''' ## Name ## ''' name = 'cqpserver_{}'.format(corpus.id) ''' ## Network ## ''' network = 'nopaque_default' ''' ## Volumes ## ''' ''' ### Corpus data volume ### ''' corpus_data_source = os.path.join(corpus.path, 'data') corpus_data_target = '/corpora/data' corpus_data_volume = \ corpus_data_source + ':' + corpus_data_target + ':rw' ''' ### Corpus registry volume ### ''' corpus_registry_source = os.path.join(corpus.path, 'registry') corpus_registry_target = '/usr/local/share/cwb/registry' corpus_registry_volume = \ corpus_registry_source + ':' + corpus_registry_target + ':rw' volumes = [corpus_data_volume, corpus_registry_volume] # Check if a cqpserver container already exists. If this is the case, # remove it and create a new one try: container = self.docker.containers.get(name) except docker.errors.NotFound: pass except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: try: container.remove(force=True) except docker.errors.APIError as e: logging.error( 'Remove "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return try: self.docker.containers.run(image, command=command, detach=detach, volumes=volumes, name=name, network=network) except docker.errors.ContainerError: # This case should not occur, because detach is True. logging.error( 'Run "{}" container raised '.format(name) + '"docker.errors.ContainerError" The container exits with a ' + 'non-zero exit code and detach is False.' ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.ImageNotFound: logging.error( 'Run "{}" container raised '.format(name) + '"docker.errors.ImageNotFound" The specified image does not ' + 'exist.' ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Run "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'analysing' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) def checkout_analysing_corpus_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: self.docker.containers.get(container_name) except docker.errors.NotFound: logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa corpus.status = 'prepared' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return def remove_cqpserver_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: container = self.docker.containers.get(container_name) except docker.errors.NotFound: pass except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: try: container.remove(force=True) except docker.errors.APIError as e: logging.error( 'Remove "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return corpus.status = 'prepared' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status } self.buffer_user_patch_operation(corpus, patch_operation)