from datetime import datetime from flask import current_app from werkzeug.utils import secure_filename from .. import db from ..models import Job, JobResult import docker import json import os import shutil class CheckJobsMixin: def check_jobs(self): jobs = Job.query.all() for job in (x for x in jobs if x.status == 'submitted'): self.create_job_service(job) for job in (x for x in jobs if x.status in ['queued', 'running']): self.checkout_job_service(job) for job in (x for x in jobs if x.status == 'canceling'): self.remove_job_service(job) def create_job_service(self, job): ''' # Docker service settings # ''' ''' ## Service specific settings ## ''' if job.service == 'file-setup': mem_mb = 2048 n_cores = 2 executable = 'file-setup' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup:{job.service_version}' # noqa elif job.service == 'ocr': mem_mb = 4096 n_cores = 4 executable = 'ocr' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}ocr:{job.service_version}' # noqa elif job.service == 'nlp': mem_mb = 2048 n_cores = 2 executable = 'nlp' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}nlp:{job.service_version}' # noqa ''' ## Command ## ''' command = f'{executable} -i /input -o /output' command += ' --log-dir /input' command += f' --mem-mb {mem_mb}' command += f' --n-cores {n_cores}' command += f' --zip [{job.service}]_{secure_filename(job.title)}' command += ' ' + ' '.join(json.loads(job.service_args)) ''' ## Constraints ## ''' constraints = ['node.role==worker'] ''' ## Labels ## ''' labels = { 'origin': current_app.config['SERVER_NAME'], 'type': 'job', 'job_id': str(job.id) } ''' ## Mounts ## ''' ''' ### Input mount ### ''' input_mount_source = job.path input_mount_target = '/input' if job.service == 'file-setup': input_mount_target += f'/{secure_filename(job.title)}' input_mount = f'{input_mount_source}:{input_mount_target}:rw' ''' ### Output mount ### ''' output_mount_source = os.path.join(job.path, 'output') output_mount_target = '/output' output_mount = f'{output_mount_source}:{output_mount_target}:rw' # Make sure that their is no data in the output directory shutil.rmtree(output_mount_source, ignore_errors=True) os.makedirs(output_mount_source) mounts = [input_mount, output_mount] ''' ## Name ## ''' name = f'job_{job.id}' ''' ## Resources ## ''' resources = docker.types.Resources( cpu_reservation=n_cores * (10 ** 9), mem_reservation=mem_mb * (10 ** 6) ) ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: self.docker.services.create( image, command=command, constraints=constraints, labels=labels, mounts=mounts, name=name, resources=resources, restart_policy=restart_policy ) except docker.errors.APIError as e: current_app.logger.error( f'Create service "{name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return job.status = 'queued' def checkout_job_service(self, job): service_name = f'job_{job.id}' try: service = self.docker.services.get(service_name) except docker.errors.NotFound as e: current_app.logger.error( f'Get service "{service_name}" failed ' + f'due to "docker.errors.NotFound": {e}' ) job.status = 'failed' return except docker.errors.APIError as e: current_app.logger.error( f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' return elif job.status == 'running' and task_state == 'complete': job.status = 'complete' results_dir = os.path.join(job.path, 'output') result_files = [x for x in os.listdir(results_dir) if x.endswith('.zip')] # noqa for result_file in result_files: job_result = JobResult(filename=result_file, job=job) db.session.add(job_result) db.session.flush() db.session.refresh(job_result) elif job.status == 'running' and task_state == 'failed': job.status = 'failed' else: return job.end_date = datetime.utcnow() try: service.remove() except docker.errors.APIError as e: current_app.logger.error( f'Remove service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) def remove_job_service(self, job): service_name = f'job_{job.id}' try: service = self.docker.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' return except docker.errors.APIError as e: current_app.logger.error( f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return try: service.update(mounts=None) except docker.errors.APIError as e: current_app.logger.error( f'Update service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' ) return try: service.remove() except docker.errors.APIError as e: current_app.logger.error( f'Remove "{service_name}" service failed ' + f'due to "docker.errors.APIError": {e}' )