from datetime import datetime from . import db, scheduler from .models import Job import docker import json import os def checkout_jobs(): with scheduler.app.app_context(): client = docker.from_env() jobs = db.session.query(Job) for job in jobs.filter_by(status='submitted').all(): _command = (job.service + ' -i /files' + ' -o /files/output' + ' ' + ' '.join(json.loads(job.service_args))) _constraints = ['node.role==worker'] _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( job.service, job.service_version ) _labels = {'service': job.service} _mounts = [os.path.join('/home/compute/mnt/opaque', str(job.user_id), 'jobs', str(job.id)) + ':/files:rw'] _name = str(job.id) ''' ' The Docker SDK for Python expects the cpu_reservation value to be ' scaled to nanos (10^9). Because the job object contains unscaled ' (10^0) values, it must be conveted. ' ' While the cpu_reservation value has to be in nanos, the ' mem_reservation value must be presented in an unscaled form ' (intuitive right?). Bacause the job object provides the memory ' value in megabytes, it is also necessary to convert the value. ''' _resources = docker.types.Resources( cpu_reservation=job.n_cores * (10 ** 9), mem_reservation=job.mem_mb * (10 ** 6) ) _restart_policy = docker.types.RestartPolicy(condition='none') try: service = client.services.create( _image, command=_command, constraints=_constraints, labels=_labels, mounts=_mounts, name=_name, resources=_resources, restart_policy=_restart_policy ) job.status = 'scheduled' except docker.errors.APIError: job.status = 'failed' print('[ERROR] {}: client.services.create raised APIError' .format(job.id)) for job in jobs.filter(Job.status != 'complete', Job.status != 'failed', Job.status != 'submitted').all(): try: service = client.services.get(str(job.id)) job.status = service.tasks()[0].get('Status').get('State') if job.status == 'complete' or job.status == 'failed': job.end_date = datetime.utcnow() service.remove() except docker.errors.APIError: job.status = 'failed' print('[ERROR] {}: client.services.get raised APIError' .format(job.id)) except docker.errors.NotFound: job.status = 'failed' print('[ERROR] {}: client.services.get raised NotFound' .format(job.id)) except docker.errors.InvalidVersion: job.status = 'failed' print('[ERROR] {}: client.services.get raised InvalidVersion' .format(job.id)) db.session.commit()