diff --git a/app/tasks/corpus_utils.py b/app/tasks/corpus_utils.py index ba018cf5..0281177b 100644 --- a/app/tasks/corpus_utils.py +++ b/app/tasks/corpus_utils.py @@ -1,3 +1,4 @@ +from flask import current_app from ..models import Corpus import docker import logging @@ -26,39 +27,69 @@ class CheckCorporaMixin: self.remove_cqpserver_container(corpus) def create_build_corpus_service(self, corpus): - corpus_data_dir = os.path.join(corpus.path, 'data') - shutil.rmtree(corpus_data_dir, ignore_errors=True) - os.mkdir(corpus_data_dir) - corpus_registry_dir = os.path.join(corpus.path, 'registry') - shutil.rmtree(corpus_registry_dir, ignore_errors=True) - os.mkdir(corpus_registry_dir) - corpus_file = os.path.join(corpus.path, 'merged', 'corpus.vrt') - service_kwargs = { - 'command': 'docker-entrypoint.sh build-corpus', - 'constraints': ['node.role==worker'], - 'labels': {'origin': 'nopaque', - 'type': 'corpus.build', - 'corpus_id': str(corpus.id)}, - 'mounts': [corpus_file + ':/root/files/corpus.vrt:ro', - corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'build-corpus_{}'.format(corpus.id), - 'restart_policy': docker.types.RestartPolicy() + ''' # Docker service settings # ''' + ''' ## Command ## ''' + command = 'docker-entrypoint.sh build-corpus' + ''' ## Constraints ## ''' + constraints = ['node.role==worker'] + ''' ## Image ## ''' + image = current_app.config['DOCKER_IMAGE_PREFIX'] + 'cqpserver:latest' + ''' ## Labels ## ''' + labels = { + 'origin': current_app.config['SERVER_NAME'], + 'type': 'build-corpus', + 'corpus_id': str(corpus.id) } - service_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + ''' ## Mounts ## ''' + ''' ### Corpus file mount ### ''' + corpus_file_source = os.path.join(corpus.path, 'merged', 'corpus.vrt') + corpus_file_target = '/root/files/corpus.vrt' + corpus_file_mount = \ + corpus_file_source + ':' + corpus_file_target + ':ro' + ''' ### Corpus data mount ### ''' + corpus_data_source = os.path.join(corpus.path, 'data') + corpus_data_target = '/corpora/data' + corpus_data_mount = \ + corpus_data_source + ':' + corpus_data_target + ':rw' + # Make sure that their is no data in the corpus data directory + shutil.rmtree(corpus_data_source, ignore_errors=True) + os.mkdir(corpus_data_source) + ''' ### Corpus registry mount ### ''' + corpus_registry_source = os.path.join(corpus.path, 'registry') + corpus_registry_target = '/usr/local/share/cwb/registry' + corpus_registry_mount = \ + corpus_registry_source + ':' + corpus_registry_target + ':rw' + # Make sure that their is no data in the corpus registry directory + shutil.rmtree(corpus_registry_source, ignore_errors=True) + os.mkdir(corpus_registry_source) + mounts = [corpus_file_mount, corpus_data_mount, corpus_registry_mount] + ''' ## Name ## ''' + name = 'build-corpus_{}'.format(corpus.id) + ''' ## Restart policy ## ''' + restart_policy = docker.types.RestartPolicy() try: - self.docker.services.create(service_image, **service_kwargs) + self.docker.services.create( + image, + command=command, + constraints=constraints, + labels=labels, + mounts=mounts, + name=name, + restart_policy=restart_policy + ) except docker.errors.APIError as e: logging.error( - 'Create "{}" service raised '.format(service_kwargs['name']) + 'Create "{}" service raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'queued' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) def checkout_build_corpus_service(self, corpus): @@ -73,7 +104,10 @@ class CheckCorporaMixin: ) corpus.status = 'failed' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( @@ -95,7 +129,10 @@ class CheckCorporaMixin: if corpus.status == 'queued' and task_state != 'pending': corpus.status = 'running' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) elif (corpus.status == 'running' and task_state in ['complete', 'failed']): @@ -104,39 +141,53 @@ class CheckCorporaMixin: except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' + + '"docker.errors.APIError" The server returned an error. ' # noqa + 'Details: {}'.format(e) ) return else: - corpus.status = 'prepared' if task_state == 'complete' \ - else 'failed' + corpus.status = \ + 'prepared' if task_state == 'complete' else 'failed' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) def create_cqpserver_container(self, corpus): - corpus_data_dir = os.path.join(corpus.path, 'data') - corpus_registry_dir = os.path.join(corpus.path, 'registry') - container_kwargs = { - 'command': 'cqpserver', - 'detach': True, - 'volumes': [corpus_data_dir + ':/corpora/data:rw', - corpus_registry_dir + ':/usr/local/share/cwb/registry:rw'], - 'name': 'cqpserver_{}'.format(corpus.id), - 'network': 'nopaque_default' - } - container_image = \ - 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/cqpserver:latest' + ''' # Docker container settings # ''' + ''' ## Command ## ''' + command = 'cqpserver' + ''' ## Detach ## ''' + detach = True + ''' ## Image ## ''' + image = current_app.config['DOCKER_IMAGE_PREFIX'] + 'cqpserver:latest' + ''' ## Name ## ''' + name = 'cqpserver_{}'.format(corpus.id), + ''' ## Network ## ''' + network = 'nopaque_default' + ''' ## Volumes ## ''' + ''' ### Corpus data volume ### ''' + corpus_data_source = os.path.join(corpus.path, 'data') + corpus_data_target = '/corpora/data' + corpus_data_volume = \ + corpus_data_source + ':' + corpus_data_target + ':rw' + ''' ### Corpus registry volume ### ''' + corpus_registry_source = os.path.join(corpus.path, 'registry') + corpus_registry_target = '/usr/local/share/cwb/registry' + corpus_registry_volume = \ + corpus_registry_source + ':' + corpus_registry_target + ':rw' + volumes = [corpus_data_volume, corpus_registry_volume] # Check if a cqpserver container already exists. If this is the case, # remove it and create a new one try: - container = self.docker.containers.get(container_kwargs['name']) + container = self.docker.containers.get(name) except docker.errors.NotFound: pass except docker.errors.APIError as e: logging.error( - 'Get "{}" container raised '.format(container_kwargs['name']) + 'Get "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) @@ -146,45 +197,55 @@ class CheckCorporaMixin: container.remove(force=True) except docker.errors.APIError as e: logging.error( - 'Remove "{}" container raised '.format( - container_kwargs['name']) + 'Remove "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return try: - self.docker.containers.run(container_image, **container_kwargs) + self.docker.containers.run(image, command=command, detach=detach, + volumes=volumes, name=name, + network=network) except docker.errors.ContainerError: # This case should not occur, because detach is True. logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) + 'Run "{}" container raised '.format(name) + '"docker.errors.ContainerError" The container exits with a ' + 'non-zero exit code and detach is False.' ) corpus.status = 'failed' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.ImageNotFound: logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) + 'Run "{}" container raised '.format(name) + '"docker.errors.ImageNotFound" The specified image does not ' + 'exist.' ) corpus.status = 'failed' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( - 'Run "{}" container raised '.format(container_kwargs['name']) + 'Run "{}" container raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) else: corpus.status = 'analysing' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) def checkout_analysing_corpus_container(self, corpus): @@ -194,6 +255,12 @@ class CheckCorporaMixin: except docker.errors.NotFound: logging.error('Could not find "{}" but the corpus state is "analysing".') # noqa corpus.status = 'prepared' + patch_operation = { + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } + self.buffer_user_patch_operation(corpus, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" container raised '.format(container_name) @@ -227,5 +294,8 @@ class CheckCorporaMixin: return corpus.status = 'prepared' patch_operation = { - 'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status} + 'op': 'replace', + 'path': '/corpora/{}/status'.format(corpus.id), + 'value': corpus.status + } self.buffer_user_patch_operation(corpus, patch_operation) diff --git a/app/tasks/job_utils.py b/app/tasks/job_utils.py index 8b06ee4e..c3fd5fd2 100644 --- a/app/tasks/job_utils.py +++ b/app/tasks/job_utils.py @@ -1,16 +1,14 @@ from datetime import datetime +from flask import current_app from werkzeug.utils import secure_filename from .. import db, mail from ..email import create_message from ..models import Job, JobResult import docker -import logging import json +import logging import os - - -DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567' -DOCKER_IMAGE_PREFIX = '{}/sfb1288inf/'.format(DOCKER_REGISTRY) +import shutil class CheckJobsMixin: @@ -28,54 +26,64 @@ class CheckJobsMixin: self.remove_job_service(job) def create_job_service(self, job): - # Service specific settings + ''' # Docker service settings # ''' + ''' ## Service specific settings ## ''' if job.service == 'file-setup': mem_mb = 2048 n_cores = 2 executable = 'file-setup' - image = DOCKER_IMAGE_PREFIX + 'file-setup:' + job.service_version + image = (current_app.config['DOCKER_IMAGE_PREFIX'] + + 'file-setup:' + job.service_version) elif job.service == 'ocr': mem_mb = 4096 n_cores = 4 executable = 'ocr' - image = DOCKER_IMAGE_PREFIX + 'ocr:' + job.service_version + image = (current_app.config['DOCKER_IMAGE_PREFIX'] + + 'ocr:' + job.service_version) elif job.service == 'nlp': mem_mb = 2048 n_cores = 2 executable = 'nlp' - image = DOCKER_IMAGE_PREFIX + 'nlp:' + job.service_version - # Command + image = (current_app.config['DOCKER_IMAGE_PREFIX'] + + 'nlp:' + job.service_version) + ''' ## Command ## ''' command = '{} -i /input -o /output'.format(executable) command += ' --log-dir /input' command += ' --mem-mb {}'.format(mem_mb) command += ' --n-cores {}'.format(n_cores) command += ' --zip [' + job.service + ']_' + secure_filename(job.title) command += ' ' + ' '.join(json.loads(job.service_args)) - # Constraints + ''' ## Constraints ## ''' constraints = ['node.role==worker'] - # Labels - labels = {'origin': 'nopaque', 'type': 'job', 'job_id': str(job.id)} - # Mounts - ## Input mount + ''' ## Labels ## ''' + labels = { + 'origin': current_app.config['SERVER_NAME'], + 'type': 'job', + 'job_id': str(job.id) + } + ''' ## Mounts ## ''' + ''' ### Input mount ### ''' input_mount_source = job.path input_mount_target = '/input' if job.service == 'file-setup': input_mount_target += '/' + secure_filename(job.title) input_mount = input_mount_source + ':' + input_mount_target + ':rw' - ## Output mount + ''' ### Output mount ### ''' output_mount_source = os.path.join(job.path, 'output') output_mount_target = '/output' output_mount = output_mount_source + ':' + output_mount_target + ':rw' + # Make sure that their is no data in the output directory + shutil.rmtree(output_mount_source, ignore_errors=True) os.makedirs(output_mount_source) mounts = [input_mount, output_mount] - # Name + ''' ## Name ## ''' name = 'job_{}'.format(job.id) - # Resources + ''' ## Resources ## ''' resources = docker.types.Resources( cpu_reservation=n_cores * (10 ** 9), mem_reservation=mem_mb * (10 ** 6) ) - # Restart policy + ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: self.docker.services.create( @@ -90,14 +98,18 @@ class CheckJobsMixin: ) except docker.errors.APIError as e: logging.error( - 'Create "{}" service raised '.format(service_kwargs['name']) + 'Create "{}" service raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: job.status = 'queued' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/status'.format(job.id), + 'value': job.status + } self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -107,11 +119,17 @@ class CheckJobsMixin: try: service = self.docker.services.get(service_name) except docker.errors.NotFound: - logging.error('Get "{}" service raised '.format(service_name) - + '"docker.errors.NotFound" The service does not exist. ' - + '(job.status: {} -> failed)'.format(job.status)) + logging.error( + 'Get "{}" service raised '.format(service_name) + + '"docker.errors.NotFound" The service does not exist. ' + + '(job.status: {} -> failed)'.format(job.status) + ) job.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/status'.format(job.id), + 'value': job.status + } self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( @@ -134,9 +152,13 @@ class CheckJobsMixin: task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/status'.format(job.id), + 'value': job.status + } self.buffer_user_patch_operation(job, patch_operation) - elif job.status == 'running' and task_state in ['complete', 'failed']: + elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa try: service.remove() except docker.errors.APIError as e: @@ -156,13 +178,25 @@ class CheckJobsMixin: db.session.add(job_result) db.session.flush() db.session.refresh(job_result) - patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} # noqa + patch_operation = { + 'op': 'add', + 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), # noqa + 'value': job_result.to_dict() + } self.buffer_user_patch_operation(job, patch_operation) # noqa job.end_date = datetime.utcnow() - patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/end_date'.format(job.id), + 'value': job.end_date.timestamp() + } self.buffer_user_patch_operation(job, patch_operation) job.status = task_state - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/status'.format(job.id), + 'value': job.status + } self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -173,7 +207,11 @@ class CheckJobsMixin: service = self.docker.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa + patch_operation = { + 'op': 'replace', + 'path': '/jobs/{}/status'.format(job.id), + 'value': job.status + } self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( @@ -214,7 +252,10 @@ class CheckJobsMixin: if (job.creator.setting_job_status_mail_notifications == 'end' and job.status not in ['complete', 'failed']): return - msg = create_message(job.creator.email, - 'Status update for your Job "{}"'.format(job.title), # noqa - 'tasks/email/notification', job=job) + msg = create_message( + job.creator.email, + 'Status update for your Job "{}"'.format(job.title), # noqa + 'tasks/email/notification', + job=job + ) mail.send(msg) diff --git a/config.py b/config.py index 7740ef96..3af786f9 100644 --- a/config.py +++ b/config.py @@ -7,6 +7,10 @@ ROOT_DIR = os.path.abspath(os.path.dirname(__file__)) class Config: + ''' # Docker # ''' + DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567' + DOCKER_IMAGE_PREFIX = DOCKER_REGISTRY + '/sfb1288inf/' + ''' # Flask # ''' PREFERRED_URL_SCHEME = os.environ.get('PREFERRED_URL_SCHEME', 'http') SECRET_KEY = os.environ.get('SECRET_KEY', 'hard to guess string')