from datetime import datetime from werkzeug.utils import secure_filename from . import docker_client from .. import db, mail from ..email import create_message from ..models import JobResult import docker import logging import json import os def create_job_service(job): cmd = '{} -i /files -o /files/output'.format(job.service) if job.service == 'file-setup': cmd += ' -f {}'.format(secure_filename(job.title)) cmd += ' --log-dir /files' cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title)) cmd += ' ' + ' '.join(json.loads(job.service_args)) service_kwargs = {'command': cmd, 'constraints': ['node.role==worker'], 'labels': {'origin': 'nopaque', 'type': 'service.{}'.format(job.service), 'job_id': str(job.id)}, 'mounts': [job.path + ':/files:rw'], 'name': 'job_{}'.format(job.id), 'resources': docker.types.Resources( cpu_reservation=job.n_cores * (10 ** 9), mem_reservation=job.mem_mb * (10 ** 6) ), 'restart_policy': docker.types.RestartPolicy()} service_image = ('gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/' + job.service + ':' + job.service_version) try: docker_client.services.create(service_image, **service_kwargs) except docker.errors.APIError as e: logging.error('Create "{}" service raised '.format(service_kwargs['name']) # noqa + '[docker-APIError] The server returned an error. ' + 'Details: {}'.format(e)) return else: job.status = 'queued' finally: send_notification(job) def checkout_job_service(job): service_name = 'job_{}'.format(job.id) try: service = docker_client.services.get(service_name) except docker.errors.NotFound: logging.error('Get "{}" service raised '.format(service_name) + '[docker-NotFound] The service does not exist. ' + '(job.status: {} -> failed)'.format(job.status)) job.status = 'failed' except docker.errors.APIError as e: logging.error('Get "{}" service raised '.format(service_name) + '[docker-APIError] The server returned an error. ' + 'Details: {}'.format(e)) return except docker.errors.InvalidVersion: logging.error('Get "{}" service raised '.format(service_name) + '[docker-InvalidVersion] One of the arguments is ' + 'not supported with the current API version.') return else: service_tasks = service.tasks() if not service_tasks: return task_state = service_tasks[0].get('Status').get('State') if job.status == 'queued' and task_state != 'pending': job.status = 'running' elif job.status == 'running' and task_state in ['complete', 'failed']: # noqa try: service.remove() except docker.errors.APIError as e: logging.error('Remove "{}" service raised '.format(service_name) # noqa + '[docker-APIError] The server returned an error. ' # noqa + 'Details: {}'.format(e)) return else: if task_state == 'complete': job_results_dir = os.path.join(job.path, 'output') job_results = filter(lambda x: x.endswith('.zip'), os.listdir(job_results_dir)) for job_result in job_results: job_result = JobResult(filename=job_result, job=job) db.session.add(job_result) job.end_date = datetime.utcnow() job.status = task_state finally: send_notification(job) def remove_job_service(job): service_name = 'job_{}'.format(job.id) try: service = docker_client.services.get(service_name) except docker.errors.NotFound: job.status = 'canceled' except docker.errors.APIError as e: logging.error('Get "{}" service raised '.format(service_name) + '[docker-APIError] The server returned an error. ' + 'Details: {}'.format(e)) return except docker.errors.InvalidVersion: logging.error('Get "{}" service raised '.format(service_name) + '[docker-InvalidVersion] One of the arguments is ' + 'not supported with the current API version.') return else: try: service.update(mounts=None) except docker.errors.APIError as e: logging.error('Update "{}" service raised '.format(service_name) # noqa + '[docker-APIError] The server returned an error. ' # noqa + 'Details: {}'.format(e)) return try: service.remove() except docker.errors.APIError as e: logging.error('Remove "{}" service raised '.format(service_name) # noqa + '[docker-APIError] The server returned an error. ' # noqa + 'Details: {}'.format(e)) def send_notification(job): if job.creator.setting_job_status_mail_notifications == 'none': return if (job.creator.setting_job_status_mail_notifications == 'end' and job.status not in ['complete', 'failed']): return msg = create_message(job.creator.email, 'Status update for your Job "{}"'.format(job.title), 'tasks/email/notification', job=job) mail.send(msg)