from datetime import datetime from flask import current_app from werkzeug.utils import secure_filename from .. import db, mail from ..email import create_message from ..models import Job, JobResult import docker import json import logging import os import shutil class CheckJobsMixin: def check_jobs(self): jobs = Job.query.all() canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs)) # noqa queued_jobs = list(filter(lambda job: job.status == 'queued', jobs)) running_jobs = list(filter(lambda job: job.status == 'running', jobs)) submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs)) # noqa for job in submitted_jobs: self.create_job_service(job) for job in queued_jobs + running_jobs: self.checkout_job_service(job) for job in canceling_jobs: self.remove_job_service(job) def create_job_service(self, job): ''' # Docker service settings # ''' ''' ## Service specific settings ## ''' if job.service == 'file-setup': mem_mb = 2048 n_cores = 2 executable = 'file-setup' image = (current_app.config['DOCKER_IMAGE_PREFIX'] + 'file-setup:' + job.service_version) elif job.service == 'ocr': mem_mb = 4096 n_cores = 4 executable = 'ocr' image = (current_app.config['DOCKER_IMAGE_PREFIX'] + 'ocr:' + job.service_version) elif job.service == 'nlp': mem_mb = 2048 n_cores = 2 executable = 'nlp' image = (current_app.config['DOCKER_IMAGE_PREFIX'] + 'nlp:' + job.service_version) ''' ## Command ## ''' command = '{} -i /input -o /output'.format(executable) command += ' --log-dir /input' command += ' --mem-mb {}'.format(mem_mb) command += ' --n-cores {}'.format(n_cores) command += ' --zip [' + job.service + ']_' + secure_filename(job.title) command += ' ' + ' '.join(json.loads(job.service_args)) ''' ## Constraints ## ''' constraints = ['node.role==worker'] ''' ## Labels ## ''' labels = { 'origin': current_app.config['SERVER_NAME'], 'type': 'job', 'job_id': str(job.id) } ''' ## Mounts ## ''' ''' ### Input mount ### ''' input_mount_source = job.path input_mount_target = '/input' if job.service == 'file-setup': input_mount_target += '/' + secure_filename(job.title) input_mount = input_mount_source + ':' + input_mount_target + ':rw' ''' ### Output mount ### ''' output_mount_source = os.path.join(job.path, 'output') output_mount_target = '/output' output_mount = output_mount_source + ':' + output_mount_target + ':rw' # Make sure that their is no data in the output directory shutil.rmtree(output_mount_source, ignore_errors=True) os.makedirs(output_mount_source) mounts = [input_mount, output_mount] ''' ## Name ## ''' name = 'job_{}'.format(job.id) ''' ## Resources ## ''' resources = docker.types.Resources( cpu_reservation=n_cores * (10 ** 9), mem_reservation=mem_mb * (10 ** 6) ) ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: self.docker.services.create( image, command=command, constraints=constraints, labels=labels, mounts=mounts, name=name, resources=resources, restart_policy=restart_policy ) except docker.errors.APIError as e: logging.error( 'Create "{}" service raised '.format(name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return else: job.status = 'queued' patch_operation = { 'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status } self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) def checkout_job_service(self, job): service_name = 'job_{}'.format(job.id) try: service = self.docker.services.get(service_name) except docker.errors.NotFound: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.NotFound" The service does not exist. ' + '(job.status: {} -> failed)'.format(job.status) ) job.status = 'failed' patch_operation = { 'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status } self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return except docker.errors.InvalidVersion: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.InvalidVersion" One of the arguments is ' + 'not supported with the current API version.' ) return else: service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' patch_operation = { 'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status } self.buffer_user_patch_operation(job, patch_operation) elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa try: service.remove() except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' # noqa + 'Details: {}'.format(e) ) return else: if task_state == 'complete': results_dir = os.path.join(job.path, 'output') result_files = filter(lambda x: x.endswith('.zip'), os.listdir(results_dir)) for result_file in result_files: job_result = JobResult(filename=result_file, job=job) # noqa db.session.add(job_result) db.session.flush() db.session.refresh(job_result) patch_operation = { 'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), # noqa 'value': job_result.to_dict() } self.buffer_user_patch_operation(job, patch_operation) # noqa job.end_date = datetime.utcnow() patch_operation = { 'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp() } self.buffer_user_patch_operation(job, patch_operation) job.status = task_state patch_operation = { 'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status } self.buffer_user_patch_operation(job, patch_operation) finally: self.send_job_notification(job) def remove_job_service(self, job): service_name = 'job_{}'.format(job.id) try: service = self.docker.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' patch_operation = { 'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status } self.buffer_user_patch_operation(job, patch_operation) except docker.errors.APIError as e: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return except docker.errors.InvalidVersion: logging.error( 'Get "{}" service raised '.format(service_name) + '"docker.errors.InvalidVersion" One of the arguments is ' + 'not supported with the current API version.' ) return else: try: service.update(mounts=None) except docker.errors.APIError as e: logging.error( 'Update "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) return try: service.remove() except docker.errors.APIError as e: logging.error( 'Remove "{}" service raised '.format(service_name) + '"docker.errors.APIError" The server returned an error. ' + 'Details: {}'.format(e) ) def send_job_notification(self, job): if job.creator.setting_job_status_mail_notifications == 'none': return if (job.creator.setting_job_status_mail_notifications == 'end' and job.status not in ['complete', 'failed']): return msg = create_message( job.creator.email, 'Status update for your Job "{}"'.format(job.title), # noqa 'tasks/email/notification', job=job ) mail.send(msg)