from app import db, docker_client, hashids, scheduler from app.models import ( Job, JobResult, JobStatus, TesseractOCRPipelineModel, SpaCyNLPPipelineModel ) from datetime import datetime from flask import current_app from werkzeug.utils import secure_filename import docker import json import os import shutil def job(): with scheduler.app.app_context(): _handle_jobs() def _handle_jobs(): jobs = Job.query.all() for job in [x for x in jobs if x.status == JobStatus.SUBMITTED]: _create_job_service(job) for job in [x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]]: _checkout_job_service(job) for job in [x for x in jobs if x.status == JobStatus.CANCELING]: _remove_job_service(job) db.session.commit() def _create_job_service(job): ''' # Docker service settings # ''' ''' ## Service specific settings ## ''' if job.service == 'file-setup-pipeline': mem_mb = 512 n_cores = 2 executable = 'file-setup-pipeline' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup-pipeline:v{job.service_version}' elif job.service == 'tesseract-ocr-pipeline': mem_mb = 1024 n_cores = 4 executable = 'tesseract-ocr-pipeline' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}tesseract-ocr-pipeline:v{job.service_version}' elif job.service == 'transkribus-htr-pipeline': mem_mb = 1024 n_cores = 4 executable = 'transkribus-htr-pipeline' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}transkribus-htr-pipeline:v{job.service_version}' elif job.service == 'spacy-nlp-pipeline': mem_mb = 1024 n_cores = 1 executable = 'spacy-nlp-pipeline' image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}spacy-nlp-pipeline:v{job.service_version}' ''' ## Command ## ''' command = f'{executable} -i /input -o /output' command += ' --log-dir /logs' command += f' --mem-mb {mem_mb}' command += f' --n-cores {n_cores}' if job.service == 'spacy-nlp-pipeline': model_id = hashids.decode(job.service_args['model']) model = SpaCyNLPPipelineModel.query.get(model_id) if model is None: job.status = JobStatus.FAILED return command += f' -m {model.pipeline_name}' if 'encoding_detection' in job.service_args and job.service_args['encoding_detection']: command += ' --check-encoding' elif job.service == 'tesseract-ocr-pipeline': command += f' -m {job.service_args["model"]}' if 'binarization' in job.service_args and job.service_args['binarization']: command += ' --binarize' if 'ocropus_nlbin_threshold' in job.service_args and job.service_args['ocropus_nlbin_threshold']: value = job.service_args['ocropus_nlbin_threshold'] command += f' --ocropus-nlbin-threshold {value}' elif job.service == 'transkribus-htr-pipeline': transkribus_htr_pipeline_model_id = job.service_args['model'] command += f' -m {transkribus_htr_pipeline_model_id}' readcoop_username = current_app.config.get('NOPAQUE_READCOOP_USERNAME') command += f' --readcoop-username "{readcoop_username}"' readcoop_password = current_app.config.get('NOPAQUE_READCOOP_PASSWORD') command += f' --readcoop-password "{readcoop_password}"' if 'binarization' in job.service_args and job.service_args['binarization']: command += ' --binarize' ''' ## Constraints ## ''' constraints = ['node.role==worker'] ''' ## Labels ## ''' labels = { 'origin': current_app.config['SERVER_NAME'], 'type': 'job', 'job_id': str(job.id) } ''' ## Mounts ## ''' mounts = [] ''' ### Input mount(s) ### ''' input_mount_target_base = '/input' if job.service == 'file-setup-pipeline': input_mount_target_base += f'/{secure_filename(job.title)}' for job_input in job.inputs: input_mount_source = job_input.path input_mount_target = f'{input_mount_target_base}/{job_input.filename}' input_mount = f'{input_mount_source}:{input_mount_target}:ro' mounts.append(input_mount) if job.service == 'tesseract-ocr-pipeline': if isinstance(job.service_args['model'], str): model_id = hashids.decode(job.service_args['model']) elif isinstance(job.service_args['model'], int): model_id = job.service_args['model'] else: job.status = JobStatus.FAILED return model = TesseractOCRPipelineModel.query.get(model_id) if model is None: job.status = JobStatus.FAILED return models_mount_source = model.path models_mount_target = f'/usr/local/share/tessdata/{model.id}.traineddata' models_mount = f'{models_mount_source}:{models_mount_target}:ro' mounts.append(models_mount) elif job.service == 'spacy-nlp-pipeline': model_id = hashids.decode(job.service_args['model']) model = SpaCyNLPPipelineModel.query.get(model_id) if model is None: job.status = JobStatus.FAILED return models_mount_source = model.path models_mount_target = f'/usr/local/share/spacy/models/{model.filename}' models_mount = f'{models_mount_source}:{models_mount_target}:ro' mounts.append(models_mount) ''' ### Output mount ### ''' output_mount_source = os.path.join(job.path, 'results') output_mount_target = '/output' output_mount = f'{output_mount_source}:{output_mount_target}:rw' # Make sure that their is no data in the output directory shutil.rmtree(output_mount_source, ignore_errors=True) os.makedirs(output_mount_source) mounts.append(output_mount) ''' ### Pipeline data mount ### ''' pyflow_data_mount_source = os.path.join(job.path, 'pipeline_data') pyflow_data_mount_target = '/logs/pyflow.data' pyflow_data_mount = f'{pyflow_data_mount_source}:{pyflow_data_mount_target}:rw' # Make sure that their is no data in the output directory shutil.rmtree(pyflow_data_mount_source, ignore_errors=True) os.makedirs(pyflow_data_mount_source) mounts.append(pyflow_data_mount) ''' ## Name ## ''' name = f'job_{job.id}' ''' ## Resources ## ''' resources = docker.types.Resources( cpu_reservation=n_cores * (10 ** 9), mem_reservation=mem_mb * (10 ** 6) ) ''' ## Restart policy ## ''' restart_policy = docker.types.RestartPolicy() try: docker_client.services.create( image, command=command, constraints=constraints, labels=labels, mounts=mounts, name=name, resources=resources, restart_policy=restart_policy, user='0:0' ) except docker.errors.DockerException as e: current_app.logger.error(f'Create service "{name}" failed: {e}') return job.status = JobStatus.QUEUED def _checkout_job_service(job): service_name = f'job_{job.id}' try: service = docker_client.services.get(service_name) except docker.errors.NotFound as e: current_app.logger.error(f'Get service "{service_name}" failed: {e}') job.status = JobStatus.FAILED return except docker.errors.DockerException as e: current_app.logger.error(f'Get service "{service_name}" failed: {e}') return service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if job.status == JobStatus.QUEUED and task_state != 'pending': job.status = JobStatus.RUNNING return elif job.status == JobStatus.RUNNING and task_state == 'complete': job.status = JobStatus.COMPLETED results_dir = os.path.join(job.path, 'results') with open(os.path.join(results_dir, 'outputs.json')) as f: outputs = json.load(f) for output in outputs: filename = os.path.basename(output['file']) job_result = JobResult( filename=filename, job=job, mimetype=output['mimetype'] ) if 'description' in output: job_result.description = output['description'] db.session.add(job_result) db.session.flush(objects=[job_result]) db.session.refresh(job_result) os.rename( os.path.join(results_dir, output['file']), job_result.path ) elif job.status == JobStatus.RUNNING and task_state == 'failed': job.status = JobStatus.FAILED else: return job.end_date = datetime.utcnow() try: service.remove() except docker.errors.DockerException as e: current_app.logger.error(f'Remove service "{service_name}" failed: {e}') def _remove_job_service(job): service_name = f'job_{job.id}' try: service = docker_client.services.get(service_name) except docker.errors.NotFound: job.status = JobStatus.CANCELED return except docker.errors.DockerException as e: current_app.logger.error(f'Get service "{service_name}" failed: {e}') return try: service.update(mounts=None) except docker.errors.DockerException as e: current_app.logger.error(f'Update service "{service_name}" failed: {e}') return try: service.remove() except docker.errors.DockerException as e: current_app.logger.error(f'Remove "{service_name}" service failed: {e}')