From 99c43cc1a6d4ba429171364a6124547f1e42e8d3 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Mon, 9 Aug 2021 11:02:09 +0200 Subject: [PATCH] some style updates --- app/tasks/job_utils.bak.py | 218 ------------------------------------- app/tasks/job_utils.py | 2 +- 2 files changed, 1 insertion(+), 219 deletions(-) delete mode 100644 app/tasks/job_utils.bak.py diff --git a/app/tasks/job_utils.bak.py b/app/tasks/job_utils.bak.py deleted file mode 100644 index 71b03bfd..00000000 --- a/app/tasks/job_utils.bak.py +++ /dev/null @@ -1,218 +0,0 @@ -from datetime import datetime -from werkzeug.utils import secure_filename -from .. import db, mail -from ..email import create_message -from ..models import Job, JobResult -import docker -import logging -import json -import os - - -DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567' - - -class CheckJobsMixin: - def check_jobs(self): - jobs = Job.query.all() - canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs)) # noqa - queued_jobs = list(filter(lambda job: job.status == 'queued', jobs)) - running_jobs = list(filter(lambda job: job.status == 'running', jobs)) - submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs)) # noqa - for job in submitted_jobs: - self.create_job_service(job) - for job in queued_jobs + running_jobs: - self.checkout_job_service(job) - for job in canceling_jobs: - self.remove_job_service(job) - - def create_job_service(self, job): - if job.service == 'file-setup': - mem_mb = 2048 - n_cores = 2 - executable = 'file-setup' - image = '{}/sfb1288inf/file-setup:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa - elif job.service == 'ocr': - mem_mb = 4096 - n_cores = 4 - executable = 'ocr' - image = '{}/sfb1288inf/ocr:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa - elif job.service == 'nlp': - mem_mb = 2048 - n_cores = 2 - executable = 'nlp' - image = '{}/sfb1288inf/nlp:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa - # Command - command = '{} -i /input -o /output'.format(executable) - command += ' --log-dir /input' - command += ' --mem-mb {}'.format(mem_mb) - command += ' --n-cores {}'.format(n_cores) - command += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) - command += ' ' + ' '.join(json.loads(job.service_args)) - # Constraints - constraints = ['node.role==worker'] - # Labels - labels = {'origin': 'nopaque', 'type': 'job', 'job_id': str(job.id)} - # Mounts - ## Input mount - input_mount_source = job.path - input_mount_target = os.path.abspath('/input') - if job.service == 'file-setup': - input_mount_target = os.path.join(input_mount_target, secure_filename(job.title)) # noqa - input_mount = '{}:{}:rw'.format(input_mount_source, input_mount_target) - ## Output mount - output_mount_source = os.path.join(job.path, 'output') - output_mount_target = os.path.abspath('/output') - output_mount = '{}:{}:rw'.format(output_mount_source, output_mount_target) # noqa - os.makedirs(output_mount_src) - mounts = [input_mount, output_mount] - # Name - name = 'job_{}'.format(job.id) - # Ressources - ressources = docker.types.Resources( - cpu_reservation=n_cores * (10 ** 9), - mem_reservation=mem_mb * (10 ** 6) - ) - # Restart policy - restart_policy = docker.types.RestartPolicy() - try: - self.docker.services.create( - image, - command=command, - constraints=constraints, - labels=labels, - mounts=mounts, - name=name, - ressources=ressources, - restart_policy=restart_policy - ) - except docker.errors.APIError as e: - logging.error( - 'Create "{}" service raised '.format(service_kwargs['name']) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - else: - job.status = 'queued' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa - self.buffer_user_patch_operation(job, patch_operation) - finally: - self.send_job_notification(job) - - def checkout_job_service(self, job): - service_name = 'job_{}'.format(job.id) - try: - service = self.docker.services.get(service_name) - except docker.errors.NotFound: - logging.error('Get "{}" service raised '.format(service_name) - + '"docker.errors.NotFound" The service does not exist. ' - + '(job.status: {} -> failed)'.format(job.status)) - job.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa - self.buffer_user_patch_operation(job, patch_operation) - except docker.errors.APIError as e: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - except docker.errors.InvalidVersion: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.InvalidVersion" One of the arguments is ' - + 'not supported with the current API version.' - ) - return - else: - service_tasks = service.tasks() - if not service_tasks: - return - task_state = service_tasks[0].get('Status').get('State') - if job.status == 'queued' and task_state != 'pending': - job.status = 'running' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa - self.buffer_user_patch_operation(job, patch_operation) - elif job.status == 'running' and task_state in ['complete', 'failed']: - try: - service.remove() - except docker.errors.APIError as e: - logging.error( - 'Remove "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' # noqa - + 'Details: {}'.format(e) - ) - return - else: - if task_state == 'complete': - results_dir = os.path.join(job.path, 'output') - result_files = filter(lambda x: x.endswith('.zip'), - os.listdir(results_dir)) - for result_file in result_files: - job_result = JobResult(filename=result_file, job=job) # noqa - db.session.add(job_result) - db.session.flush() - db.session.refresh(job_result) - patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} # noqa - self.buffer_user_patch_operation(job, patch_operation) # noqa - job.end_date = datetime.utcnow() - patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} # noqa - self.buffer_user_patch_operation(job, patch_operation) - job.status = task_state - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa - self.buffer_user_patch_operation(job, patch_operation) - finally: - self.send_job_notification(job) - - def remove_job_service(self, job): - service_name = 'job_{}'.format(job.id) - try: - service = self.docker.services.get(service_name) - except docker.errors.NotFound: - job.status = 'canceled' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa - self.buffer_user_patch_operation(job, patch_operation) - except docker.errors.APIError as e: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - except docker.errors.InvalidVersion: - logging.error( - 'Get "{}" service raised '.format(service_name) - + '"docker.errors.InvalidVersion" One of the arguments is ' - + 'not supported with the current API version.' - ) - return - else: - try: - service.update(mounts=None) - except docker.errors.APIError as e: - logging.error( - 'Update "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - return - try: - service.remove() - except docker.errors.APIError as e: - logging.error( - 'Remove "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' - + 'Details: {}'.format(e) - ) - - def send_job_notification(self, job): - if job.creator.setting_job_status_mail_notifications == 'none': - return - if (job.creator.setting_job_status_mail_notifications == 'end' - and job.status not in ['complete', 'failed']): - return - msg = create_message(job.creator.email, - 'Status update for your Job "{}"'.format(job.title), # noqa - 'tasks/email/notification', job=job) - mail.send(msg) diff --git a/app/tasks/job_utils.py b/app/tasks/job_utils.py index c3fd5fd2..ee804bbc 100644 --- a/app/tasks/job_utils.py +++ b/app/tasks/job_utils.py @@ -254,7 +254,7 @@ class CheckJobsMixin: return msg = create_message( job.creator.email, - 'Status update for your Job "{}"'.format(job.title), # noqa + 'Status update for your Job "{}"'.format(job.title), 'tasks/email/notification', job=job )