mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-10-31 10:42:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			200 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			200 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datetime import datetime
 | |
| from flask import current_app
 | |
| from werkzeug.utils import secure_filename
 | |
| from .. import db
 | |
| from ..models import Job, JobResult
 | |
| import docker
 | |
| import json
 | |
| import os
 | |
| import shutil
 | |
| 
 | |
| 
 | |
| class CheckJobsMixin:
 | |
|     def check_jobs(self):
 | |
|         jobs = Job.query.all()
 | |
|         canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs))  # noqa
 | |
|         queued_jobs = list(filter(lambda job: job.status == 'queued', jobs))
 | |
|         running_jobs = list(filter(lambda job: job.status == 'running', jobs))
 | |
|         submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs))  # noqa
 | |
|         for job in submitted_jobs:
 | |
|             self.create_job_service(job)
 | |
|         for job in queued_jobs + running_jobs:
 | |
|             self.checkout_job_service(job)
 | |
|         for job in canceling_jobs:
 | |
|             self.remove_job_service(job)
 | |
| 
 | |
|     def create_job_service(self, job):
 | |
|         ''' # Docker service settings # '''
 | |
|         ''' ## Service specific settings ## '''
 | |
|         if job.service == 'file-setup':
 | |
|             mem_mb = 2048
 | |
|             n_cores = 2
 | |
|             executable = 'file-setup'
 | |
|             image = (current_app.config['DOCKER_IMAGE_PREFIX']
 | |
|                      + 'file-setup:' + job.service_version)
 | |
|         elif job.service == 'ocr':
 | |
|             mem_mb = 4096
 | |
|             n_cores = 4
 | |
|             executable = 'ocr'
 | |
|             image = (current_app.config['DOCKER_IMAGE_PREFIX']
 | |
|                      + 'ocr:' + job.service_version)
 | |
|         elif job.service == 'nlp':
 | |
|             mem_mb = 2048
 | |
|             n_cores = 2
 | |
|             executable = 'nlp'
 | |
|             image = (current_app.config['DOCKER_IMAGE_PREFIX']
 | |
|                      + 'nlp:' + job.service_version)
 | |
|         ''' ## Command ## '''
 | |
|         command = '{} -i /input -o /output'.format(executable)
 | |
|         command += ' --log-dir /input'
 | |
|         command += ' --mem-mb {}'.format(mem_mb)
 | |
|         command += ' --n-cores {}'.format(n_cores)
 | |
|         command += ' --zip [' + job.service + ']_' + secure_filename(job.title)
 | |
|         command += ' ' + ' '.join(json.loads(job.service_args))
 | |
|         ''' ## Constraints ## '''
 | |
|         constraints = ['node.role==worker']
 | |
|         ''' ## Labels ## '''
 | |
|         labels = {
 | |
|             'origin': current_app.config['SERVER_NAME'],
 | |
|             'type': 'job',
 | |
|             'job_id': str(job.id)
 | |
|         }
 | |
|         ''' ## Mounts ## '''
 | |
|         ''' ### Input mount ### '''
 | |
|         input_mount_source = job.path
 | |
|         input_mount_target = '/input'
 | |
|         if job.service == 'file-setup':
 | |
|             input_mount_target += '/' + secure_filename(job.title)
 | |
|         input_mount = input_mount_source + ':' + input_mount_target + ':rw'
 | |
|         ''' ### Output mount ### '''
 | |
|         output_mount_source = os.path.join(job.path, 'output')
 | |
|         output_mount_target = '/output'
 | |
|         output_mount = output_mount_source + ':' + output_mount_target + ':rw'
 | |
|         # Make sure that their is no data in the output directory
 | |
|         shutil.rmtree(output_mount_source, ignore_errors=True)
 | |
|         os.makedirs(output_mount_source)
 | |
|         mounts = [input_mount, output_mount]
 | |
|         ''' ## Name ## '''
 | |
|         name = 'job_{}'.format(job.id)
 | |
|         ''' ## Resources ## '''
 | |
|         resources = docker.types.Resources(
 | |
|             cpu_reservation=n_cores * (10 ** 9),
 | |
|             mem_reservation=mem_mb * (10 ** 6)
 | |
|         )
 | |
|         ''' ## Restart policy ## '''
 | |
|         restart_policy = docker.types.RestartPolicy()
 | |
|         try:
 | |
|             self.docker.services.create(
 | |
|                 image,
 | |
|                 command=command,
 | |
|                 constraints=constraints,
 | |
|                 labels=labels,
 | |
|                 mounts=mounts,
 | |
|                 name=name,
 | |
|                 resources=resources,
 | |
|                 restart_policy=restart_policy
 | |
|             )
 | |
|         except docker.errors.APIError as e:
 | |
|             current_app.logger.error(
 | |
|                 'Create "{}" service raised '.format(name)
 | |
|                 + '"docker.errors.APIError" The server returned an error. '
 | |
|                 + 'Details: {}'.format(e)
 | |
|             )
 | |
|             return
 | |
|         else:
 | |
|             job.status = 'queued'
 | |
| 
 | |
|     def checkout_job_service(self, job):
 | |
|         service_name = 'job_{}'.format(job.id)
 | |
|         try:
 | |
|             service = self.docker.services.get(service_name)
 | |
|         except docker.errors.NotFound:
 | |
|             current_app.logger.error(
 | |
|                 'Get "{}" service raised '.format(service_name)
 | |
|                 + '"docker.errors.NotFound" The service does not exist. '
 | |
|                 + '(job.status: {} -> failed)'.format(job.status)
 | |
|             )
 | |
|             job.status = 'failed'
 | |
|         except docker.errors.APIError as e:
 | |
|             current_app.logger.error(
 | |
|                 'Get "{}" service raised '.format(service_name)
 | |
|                 + '"docker.errors.APIError" The server returned an error. '
 | |
|                 + 'Details: {}'.format(e)
 | |
|             )
 | |
|             return
 | |
|         except docker.errors.InvalidVersion:
 | |
|             current_app.logger.error(
 | |
|                 'Get "{}" service raised '.format(service_name)
 | |
|                 + '"docker.errors.InvalidVersion" One of the arguments is '
 | |
|                 + 'not supported with the current API version.'
 | |
|             )
 | |
|             return
 | |
|         else:
 | |
|             service_tasks = service.tasks()
 | |
|             if not service_tasks:
 | |
|                 return
 | |
|             task_state = service_tasks[0].get('Status').get('State')
 | |
|             if job.status == 'queued' and task_state != 'pending':
 | |
|                 job.status = 'running'
 | |
|             elif job.status == 'running' and task_state in ['complete', 'failed']:  # noqa
 | |
|                 try:
 | |
|                     service.remove()
 | |
|                 except docker.errors.APIError as e:
 | |
|                     current_app.logger.error(
 | |
|                         'Remove "{}" service raised '.format(service_name)
 | |
|                         + '"docker.errors.APIError" The server returned an error. '  # noqa
 | |
|                         + 'Details: {}'.format(e)
 | |
|                     )
 | |
|                     return
 | |
|                 else:
 | |
|                     if task_state == 'complete':
 | |
|                         results_dir = os.path.join(job.path, 'output')
 | |
|                         result_files = filter(lambda x: x.endswith('.zip'),
 | |
|                                               os.listdir(results_dir))
 | |
|                         for result_file in result_files:
 | |
|                             job_result = JobResult(filename=result_file, job=job)  # noqa
 | |
|                             db.session.add(job_result)
 | |
|                             db.session.flush()
 | |
|                             db.session.refresh(job_result)
 | |
|                     job.end_date = datetime.utcnow()
 | |
|                     job.status = task_state
 | |
| 
 | |
|     def remove_job_service(self, job):
 | |
|         service_name = 'job_{}'.format(job.id)
 | |
|         try:
 | |
|             service = self.docker.services.get(service_name)
 | |
|         except docker.errors.NotFound:
 | |
|             job.status = 'canceled'
 | |
|         except docker.errors.APIError as e:
 | |
|             current_app.logger.error(
 | |
|                 'Get "{}" service raised '.format(service_name)
 | |
|                 + '"docker.errors.APIError" The server returned an error. '
 | |
|                 + 'Details: {}'.format(e)
 | |
|             )
 | |
|             return
 | |
|         except docker.errors.InvalidVersion:
 | |
|             current_app.logger.error(
 | |
|                 'Get "{}" service raised '.format(service_name)
 | |
|                 + '"docker.errors.InvalidVersion" One of the arguments is '
 | |
|                 + 'not supported with the current API version.'
 | |
|             )
 | |
|             return
 | |
|         else:
 | |
|             try:
 | |
|                 service.update(mounts=None)
 | |
|             except docker.errors.APIError as e:
 | |
|                 current_app.logger.error(
 | |
|                     'Update "{}" service raised '.format(service_name)
 | |
|                     + '"docker.errors.APIError" The server returned an error. '
 | |
|                     + 'Details: {}'.format(e)
 | |
|                 )
 | |
|                 return
 | |
|             try:
 | |
|                 service.remove()
 | |
|             except docker.errors.APIError as e:
 | |
|                 current_app.logger.error(
 | |
|                     'Remove "{}" service raised '.format(service_name)
 | |
|                     + '"docker.errors.APIError" The server returned an error. '
 | |
|                     + 'Details: {}'.format(e)
 | |
|                 )
 |