from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import docker import time import json import os class Swarm: def __init__(self, app=None): self.app = app if app is not None: self.init_app(app) self.docker = docker.from_env() def init_app(self, app): engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI']) self.Session = sessionmaker(bind=engine) ''' ' Swarm mode is intendet to run containers which serve a non terminating ' service like a webserver. For processing an occuring job it is necessary ' to use a one-shot container, which stops after the wrapped job process is ' completly executed. In order to run these one-shot containers in Swarm ' mode, the following run method is implemented analog to the presented ' implementation in Alex Ellis' blog post "One-shot containers on Docker ' Swarm"¹. ' ' ¹ https://blog.alexellis.io/containers-on-swarm/ ''' def run(self, job): ''' Input is a job object. From this the _command is built. ''' # Prepare argument values needed for the service creation. service_args = json.loads(job.service_args) ressources = json.loads(job.ressources) _command = (job.service + ' -i /files' + ' -l {}'.format(service_args['lang']) + ' -o /files/output' + ' ' + ' '.join(service_args['args'])) _constraints = ['node.role==worker'] _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( job.service, service_args['version'] ) _labels = {'service': job.service} _mounts = [os.path.join('/home/compute/mnt/opaque', str(job.user_id), 'jobs', str(job.id)) + ':/files:rw'] _name = job.id ''' ' The Docker SDK for Python expects the cpu_reservation value to be ' scaled to nanos (10^9). Because the job object contains unscaled ' (10^0) values, it must be conveted. ' ' While the cpu_reservation value has to be in nanos, the ' mem_reservation value must be presented in an unscaled form ' (intuitive right?). Bacause the job object provides the memory value ' in megabytes, it is also necessary to convert the value. ''' _resources = docker.types.Resources( cpu_reservation=ressources['n_cores'] * (10 ** 9), mem_reservation=ressources['mem_mb'] * (10 ** 6) ) _restart_policy = docker.types.RestartPolicy(condition='none') ''' ' Create the service with the prepared values. ' ' Note: A service reserves hardware ressources. In case no worker node ' has the required ressources available (not reserved), the ' service gets queued by the Docker engine until a node is able ' to meet the requirements. ' ' TODO: The name argument should be used with the prepared value ' (name=_name). Because there is no id generator for now, it is ' not set, so that the Docker engine assigns a random name. ''' service = self.docker.services.create( _image, command=_command, constraints=_constraints, labels=_labels, mounts=_mounts, resources=_resources, restart_policy=_restart_policy ) ''' ' Because it takes some time until all data in the service object is ' initialized (especcially the task list returned by the service.tasks ' method), a poll loop checks if the task list is empty. ''' while not service.tasks(): time.sleep(1) service.reload() ''' ' Poll the service until the job is completly executed. ''' session = self.Session() job.status = 'running' session.add(job) session.commit() current_state = None while True: current_state = service.tasks()[0].get('Status').get('State') if current_state == 'complete' or current_state == 'failed': break time.sleep(1) service.reload() job.status = current_state session.add(job) session.commit() session.close() # Remove the service from the swarm. service.remove() return