diff --git a/web/app/tasks/corpus_utils.py b/web/app/tasks/corpus_utils.py index 5d51f342..ba018cf5 100644 --- a/web/app/tasks/corpus_utils.py +++ b/web/app/tasks/corpus_utils.py @@ -8,17 +8,20 @@ import shutil class CheckCorporaMixin: def check_corpora(self): corpora = Corpus.query.all() - queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora)) - running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora)) - start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora)) - stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora)) - submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora)) + queued_corpora = list(filter(lambda corpus: corpus.status == 'queued', corpora)) # noqa + running_corpora = list(filter(lambda corpus: corpus.status == 'running', corpora)) # noqa + start_analysis_corpora = list(filter(lambda corpus: corpus.status == 'start analysis', corpora)) # noqa + analysing_corpora = list(filter(lambda corpus: corpus.status == 'analysing', corpora)) # noqa + stop_analysis_corpora = list(filter(lambda corpus: corpus.status == 'stop analysis', corpora)) # noqa + submitted_corpora = list(filter(lambda corpus: corpus.status == 'submitted', corpora)) # noqa for corpus in submitted_corpora: self.create_build_corpus_service(corpus) for corpus in queued_corpora + running_corpora: self.checkout_build_corpus_service(corpus) for corpus in start_analysis_corpora: self.create_cqpserver_container(corpus) + for corpus in analysing_corpora: + self.checkout_analysing_corpus_container(corpus) for corpus in stop_analysis_corpora: self.remove_cqpserver_container(corpus) @@ -54,7 +57,8 @@ class CheckCorporaMixin: ) else: corpus.status = 'queued' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) def checkout_build_corpus_service(self, corpus): @@ -68,7 +72,8 @@ class CheckCorporaMixin: + '(corpus.status: {} -> failed)'.format(corpus.status) ) corpus.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( @@ -89,7 +94,8 @@ class CheckCorporaMixin: task_state = service_tasks[0].get('Status').get('State') if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) elif (corpus.status == 'running' and task_state in ['complete', 'failed']): @@ -105,7 +111,8 @@ class CheckCorporaMixin: else: corpus.status = 'prepared' if task_state == 'complete' \ else 'failed' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) def create_cqpserver_container(self, corpus): @@ -139,7 +146,8 @@ class CheckCorporaMixin: container.remove(force=True) except docker.errors.APIError as e: logging.error( - 'Remove "{}" container raised '.format(container_kwargs['name']) + 'Remove "{}" container raised '.format( + container_kwargs['name']) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) @@ -154,7 +162,8 @@ class CheckCorporaMixin: + 'non-zero exit code and detach is False.' ) corpus.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.ImageNotFound: logging.error( @@ -163,7 +172,8 @@ class CheckCorporaMixin: + 'exist.' ) corpus.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( @@ -173,9 +183,25 @@ class CheckCorporaMixin: ) else: corpus.status = 'analysing' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation) + def checkout_analysing_corpus_container(self, corpus): + container_name = 'cqpserver_{}'.format(corpus.id) + try: + self.docker.containers.get(container_name) + except docker.errors.NotFound: + logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa + corpus.status = 'prepared' + except docker.errors.APIError as e: + logging.error( + 'Get "{}" container raised '.format(container_name) + + '"docker.errors.APIError" The server returned an error. ' + + 'Details: {}'.format(e) + ) + return + def remove_cqpserver_container(self, corpus): container_name = 'cqpserver_{}'.format(corpus.id) try: @@ -200,5 +226,6 @@ class CheckCorporaMixin: ) return corpus.status = 'prepared' - patch_operation = {'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + patch_operation = { + 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} self.buffer_user_patch_operation(corpus, patch_operation)