from ..models import Corpus import docker import logging import os import shutil class CheckCorporaMixin: def check_corpora(self): corpora = Corpus.query.all() queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora)) # noqa running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora)) # noqa start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora)) # noqa analysing_corpora = list(filter(lambda corpus: corpus.status == 'analysing', corpora)) # noqa stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora)) # noqa submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora)) # noqa for corpus in submitted_corpora: self.create_build_corpus_service(corpus) for corpus in queued_corpora + running_corpora: self.checkout_build_corpus_service(corpus) for corpus in start_analysis_corpora: self.create_cqpserver_container(corpus) for corpus in analysing_corpora: self.checkout_analysing_corpus_container(corpus) for corpus in stop_analysis_corpora: self.remove_cqpserver_container(corpus) def create_build_corpus_service(self, corpus): corpus_data_dir = os.path.join(corpus.path, 'data') shutil.rmtree(corpus_data_dir, ignore_errors=True) os.mkdir(corpus_data_dir) corpus_registry_dir = os.path.join(corpus.path, 'registry') shutil.rmtree(corpus_registry_dir, ignore_errors=True) os.mkdir(corpus_registry_dir) corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt') service_kwargs = { 'command': 'docker-entrypoint.sh build-corpus', 'constraints': ['node.role==worker'], 'labels': {'origin': 'nopaque', 'type': 'corpus.build', 'corpus_id': str(corpus.id)}, 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', corpus_data_dir + ':/corpora/data:rw', corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], 'name': 'build-corpus_{}'.format(corpus.id), 'restart_policy': docker.types.RestartPolicy() } service_image = \ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' try: self.docker.services.create(service_image, **service_kwargs) except docker.errors.APIError as e: logging.error( 'Create "{}" service raised '.format(service_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'queued' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) def checkout_build_corpus_service(self, corpus): service_name = 'build-corpus_{}'.format(corpus.id) try: service = self.docker.services.get(service_name) except docker.errors.NotFound: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.NotFound" The service does not exist. ' + '(corpus.status: {} -> failed)'.format(corpus.status) ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) except docker.errors.InvalidVersion: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.InvalidVersion" One of the arguments is ' + 'not supported with the current API version.' ) else: service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) elif (corpus.status == 'running' and task_state in ['complete', 'failed']): try: service.remove() except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: corpus.status = 'prepared' if task_state == 'complete' \ else 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) def create_cqpserver_container(self, corpus): corpus_data_dir = os.path.join(corpus.path, 'data') corpus_registry_dir = os.path.join(corpus.path, 'registry') container_kwargs = { 'command': 'cqpserver', 'detach': True, 'volumes': [corpus_data_dir + ':/corpora/data:rw', corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], 'name': 'cqpserver_{}'.format(corpus.id), 'network': 'nopaque_default' } container_image = \ 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' # Check if a cqpserver container already exists. If this is the case, # remove it and create a new one try: container = self.docker.containers.get(container_kwargs['name']) except docker.errors.NotFound: pass except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: try: container.remove(force=True) except docker.errors.APIError as e: logging.error( 'Remove "{}" container raised '.format( container_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return try: self.docker.containers.run(container_image, **container_kwargs) except docker.errors.ContainerError: # This case should not occur, because detach is True. logging.error( 'Run "{}" container raised '.format(container_kwargs['name']) + '"docker.errors.ContainerError" The container exits with a ' + 'non-zero exit code and detach is False.' ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.ImageNotFound: logging.error( 'Run "{}" container raised '.format(container_kwargs['name']) + '"docker.errors.ImageNotFound" The specified image does not ' + 'exist.' ) corpus.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Run "{}" container raised '.format(container_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'analysing' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) def checkout_analysing_corpus_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: self.docker.containers.get(container_name) except docker.errors.NotFound: logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa corpus.status = 'prepared' except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return def remove_cqpserver_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: container = self.docker.containers.get(container_name) except docker.errors.NotFound: pass except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: try: container.remove(force=True) except docker.errors.APIError as e: logging.error( 'Remove "{}" container raised '.format(container_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return corpus.status = 'prepared' patch_operation = { 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation)