diff --git a/web/app/tasks/job_utils.py b/web/app/tasks/job_utils.py index e2635dda..9baadde7 100644 --- a/web/app/tasks/job_utils.py +++ b/web/app/tasks/job_utils.py @@ -27,6 +27,11 @@ class CheckJobsMixin: cmd = '{} -i /files -o /files/output'.format(job.service) if job.service == 'file-setup': cmd += ' -f {}'.format(secure_filename(job.title)) + ressources = docker.types.Resources(cpu_reservation=4 * (10 ** 9), mem_reservation=4096 * (10 ** 6)) # noqa + elif job.service == 'nlp': + ressources = docker.types.Resources(cpu_reservation=2 * (10 ** 9), mem_reservation=4096 * (10 ** 6)) # noqa + elif job.service == 'ocr': + ressources = docker.types.Resources(cpu_reservation=4 * (10 ** 9), mem_reservation=8192 * (10 ** 6)) # noqa cmd += ' --log-dir /files' cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) cmd += ' ' + ' '.join(json.loads(job.service_args)) @@ -37,12 +42,9 @@ class CheckJobsMixin: 'job_id': str(job.id)}, 'mounts': [job.path + ':/files:rw'], 'name': 'job_{}'.format(job.id), - 'resources': docker.types.Resources( - cpu_reservation=job.n_cores * (10 ** 9), - mem_reservation=job.mem_mb * (10 ** 6) - ), + 'resources': ressources, 'restart_policy': docker.types.RestartPolicy()} - service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version) + service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version) # noqa try: self.docker.services.create(service_image, **service_kwargs) except docker.errors.APIError as e: @@ -54,7 +56,7 @@ class CheckJobsMixin: return else: job.status = 'queued' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -68,7 +70,7 @@ class CheckJobsMixin: + '"docker.errors.NotFound" The service does not exist. ' + '(job.status: {} -> failed)'.format(job.status)) job.status = 'failed' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( @@ -91,7 +93,7 @@ class CheckJobsMixin: task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa self.buffer_user_patch_operation(job, patch_operation) elif job.status == 'running' and task_state in ['complete', 'failed']: try: @@ -99,7 +101,7 @@ class CheckJobsMixin: except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) - + '"docker.errors.APIError" The server returned an error. ' + + '"docker.errors.APIError" The server returned an error. ' # noqa + 'Details: {}'.format(e) ) return @@ -109,17 +111,17 @@ class CheckJobsMixin: result_files = filter(lambda x: x.endswith('.zip'), os.listdir(results_dir)) for result_file in result_files: - job_result = JobResult(filename=result_file, job=job) + job_result = JobResult(filename=result_file, job=job) # noqa db.session.add(job_result) db.session.flush() db.session.refresh(job_result) - patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} - self.buffer_user_patch_operation(job, patch_operation) + patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} # noqa + self.buffer_user_patch_operation(job, patch_operation) # noqa job.end_date = datetime.utcnow() - patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} # noqa self.buffer_user_patch_operation(job, patch_operation) job.status = task_state - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) @@ -130,7 +132,7 @@ class CheckJobsMixin: service = self.docker.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' - patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} + patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( @@ -172,6 +174,6 @@ class CheckJobsMixin: and job.status not in ['complete', 'failed']): return msg = create_message(job.creator.email, - 'Status update for your Job "{}"'.format(job.title), + 'Status update for your Job "{}"'.format(job.title), # noqa 'tasks/email/notification', job=job) mail.send(msg)