from flask import current_app from ..models import Corpus import docker import os import shutil class CheckCorporaMixin: def check_corpora(self): corpora = Corpus.query.all() for corpus in (x for x in corpora if x.status == 'submitted'): self.create_build_corpus_service(corpus) for corpus in (x for x in corpora if x.status == 'queued' or x.status == 'running'): # noqa self.checkout_build_corpus_service(corpus) for corpus in (x for x in corpora if x.status == 'prepared' and x.num_analysis_sessions > 0): # noqa corpus.status = 'start analysis' for corpus in (x for x in corpora if x.status == 'analysing' and x.num_analysis_sessions == 0): # noqa corpus.status = 'stop analysis' for corpus in (x for x in corpora if x.status == 'analysing'): self.checkout_analysing_corpus_container(corpus) for corpus in (x for x in corpora if x.status == 'start analysis'): self.create_cqpserver_container(corpus) for corpus in (x for x in corpora if x.status == 'stop analysis'): self.remove_cqpserver_container(corpus) def create_build_corpus_service(self, corpus): ''' # Docker service settings # ''' ''' ## Command ## ''' command = 'docker-entrypoint.sh build-corpus' ''' ## Constraints ## ''' constraints = ['node.role==worker'] ''' ## Image ## ''' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cqpserver:r1674' # noqa ''' ## Labels ## ''' labels = { 'origin': current_app.config['SERVER_NAME'], 'type': 'build-corpus', 'corpus_id': str(corpus.id) } ''' ## Mounts ## ''' ''' ### Corpus file mount ### ''' corpus_file_source = os.path.join(corpus.path, 'merged', 'corpus.vrt') corpus_file_target = '/root/files/corpus.vrt' corpus_file_mount = f'{corpus_file_source}:{corpus_file_target}:ro' ''' ### Corpus data mount ### ''' corpus_data_source = os.path.join(corpus.path, 'data') corpus_data_target = '/corpora/data' corpus_data_mount = f'{corpus_data_source}:{corpus_data_target}:rw' # Make sure that their is no data in the corpus data directory shutil.rmtree(corpus_data_source, ignore_errors=True) os.mkdir(corpus_data_source) ''' ### Corpus registry mount ### ''' corpus_registry_source = os.path.join(corpus.path, 'registry') corpus_registry_target = '/usr/local/share/cwb/registry' corpus_registry_mount = f'{corpus_registry_source}:{corpus_registry_target}:rw' # noqa # Make sure that their is no data in the corpus registry directory shutil.rmtree(corpus_registry_source, ignore_errors=True) os.mkdir(corpus_registry_source) mounts = [corpus_file_mount, corpus_data_mount, corpus_registry_mount] ''' ## Name ## ''' name = f'build-corpus_{corpus.id}' ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: self.docker.services.create( image, command=command, constraints=constraints, labels=labels, mounts=mounts, name=name, restart_policy=restart_policy ) except docker.errors.APIError as e: current_app.logger.error( f'Create service "{name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return corpus.status = 'queued' def checkout_build_corpus_service(self, corpus): service_name = f'build-corpus_{corpus.id}' try: service = self.docker.services.get(service_name) except docker.errors.NotFound as e: current_app.logger.error( f'Get service "{service_name}" failed ' + f'due to "docker.errors.NotFound": {e}' ) corpus.status = 'failed' return except docker.errors.APIError as e: current_app.logger.error( f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' return elif corpus.status == 'running' and task_state == 'complete': corpus.status = 'prepared' elif corpus.status == 'running' and task_state == 'failed': corpus.status = 'failed' else: return try: service.remove() except docker.errors.APIError as e: current_app.logger.error( f'Remove service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) def create_cqpserver_container(self, corpus): ''' # Docker container settings # ''' ''' ## Command ## ''' command = 'cqpserver' ''' ## Detach ## ''' detach = True ''' ## Image ## ''' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cqpserver:r1674' # noqa ''' ## Name ## ''' name = f'cqpserver_{corpus.id}' ''' ## Network ## ''' network = 'nopaque_default' ''' ## Volumes ## ''' ''' ### Corpus data volume ### ''' corpus_data_source = os.path.join(corpus.path, 'data') corpus_data_target = '/corpora/data' corpus_data_volume = f'{corpus_data_source}:{corpus_data_target}:rw' ''' ### Corpus registry volume ### ''' corpus_registry_source = os.path.join(corpus.path, 'registry') corpus_registry_target = '/usr/local/share/cwb/registry' corpus_registry_volume = f'{corpus_registry_source}:{corpus_registry_target}:rw' # noqa volumes = [corpus_data_volume, corpus_registry_volume] # Check if a cqpserver container already exists. If this is the case, # remove it and create a new one try: container = self.docker.containers.get(name) except docker.errors.NotFound: pass except docker.errors.APIError as e: current_app.logger.error( f'Get container "{name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return else: try: container.remove(force=True) except docker.errors.APIError as e: current_app.logger.error( f'Remove container "{name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return try: self.docker.containers.run( image, command=command, detach=detach, volumes=volumes, name=name, network=network ) except docker.errors.ImageNotFound as e: current_app.logger.error( f'Run container "{name}" failed ' + f'due to "docker.errors.ImageNotFound" error: {e}' ) corpus.status = 'failed' return except docker.errors.APIError as e: current_app.logger.error( f'Run container "{name}" failed ' + f'due to "docker.errors.APIError" error: {e}' ) return corpus.status = 'analysing' def checkout_analysing_corpus_container(self, corpus): container_name = f'cqpserver_{corpus.id}' try: self.docker.containers.get(container_name) except docker.errors.NotFound as e: current_app.logger.error( f'Get container "{container_name}" failed ' + f'due to "docker.errors.NotFound": {e}' ) corpus.num_analysis_sessions = 0 corpus.status = 'prepared' except docker.errors.APIError as e: current_app.logger.error( f'Get container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) def remove_cqpserver_container(self, corpus): container_name = f'cqpserver_{corpus.id}' try: container = self.docker.containers.get(container_name) except docker.errors.NotFound: corpus.status = 'prepared' return except docker.errors.APIError as e: current_app.logger.error( f'Get container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return try: container.remove(force=True) except docker.errors.APIError as e: current_app.logger.error( f'Remove container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' )