import docker import time class Swarm: def __init__(self): self.docker = docker.from_env() ''' ' Swarm mode is intendet to run containers which serve a non terminating ' service like a webserver. For processing an occuring job it is necessary ' to use a one-shot container, which stops after the wrapped job process is ' completly executed. In order to run these one-shot containers in Swarm ' mode, the following run method is implemented analog to the presented ' implementation in Alex Ellis' blog post "One-shot containers on Docker ' Swarm"¹. ' ' ¹ https://blog.alexellis.io/containers-on-swarm/ ''' def run(self, job): # Prepare argument values needed for the service creation. _command = job['service'] \ + ' -i /files/{}'.format(job['id']) \ + ' -l {}'.format(job['service_args']['lang']) \ + ' -o /files/{}/output'.format(job['id']) \ + ' --keep-intermediates' \ + ' --nCores {}'.format(job['requested_cpus']) _constraints = ['node.role==worker'] _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( job['service'], job['service_args']['version'] ) _labels = {'service': job['service']} _mounts = [ '/home/compute/mnt/opaque/jobs:/files:rw', ] _name = job['id'] ''' ' The Docker SDK for Python expects the cpu_reservation value to be ' scaled to nanos (10^9). Because the job object contains unscaled ' (10^0) values, it must be conveted. ' ' While the cpu_reservation value has to be in nanos, the ' mem_reservation value must be presented in an unscaled form ' (intuitive right?). Bacause the job object provides the memory value ' in megabytes, it is also necessary to convert the value. ''' _resources = docker.types.Resources( cpu_reservation=job['requested_cpus'] * (10 ** 9), mem_reservation=job['requested_memory'] * (10 ** 6) ) _restart_policy = docker.types.RestartPolicy(condition='none') ''' ' Create the service with the prepared values. ' ' Note: A service reserves hardware ressources. In case no worker node ' has the required ressources available (not reserved), the ' service gets queued by the Docker engine until a node is able ' to meet the requirements. ' ' TODO: The name argument should be used with the prepared value ' (name=_name). Because there is no id generator for now, it is ' not set, so that the Docker engine assigns a random name. ''' service = self.docker.services.create( _image, command=_command, constraints=_constraints, labels=_labels, mounts=_mounts, resources=_resources, restart_policy=_restart_policy ) ''' ' Poll the service until the job is completly executed. ' ' Note: Because it takes some time until all data in the service object ' is initialized (especcially the task list returned by the tasks ' method) the poll loop also checks if the task list is empy (The ' not service.tasks() condition implements this). ''' while not service.tasks() or \ service.tasks()[0].get('Status').get('State') != 'complete': time.sleep(1) service.reload() # Remove the service from the swarm. service.remove() return