diff --git a/app/swarm.py b/app/swarm.py index 928a79cb..ccf0bf7d 100644 --- a/app/swarm.py +++ b/app/swarm.py @@ -1,98 +1,88 @@ import docker -import subprocess +import time class Swarm: def __init__(self): self.docker = docker.from_env() - self.checkout() - def checkout(self): - cpus = 0 - memory = 0 - for node in self.docker.nodes.list(filters={'role': 'worker'}): - if node.attrs.get('Status').get('State') == 'ready': - cpus += 0 or node.attrs \ - .get('Description') \ - .get('Resources') \ - .get('NanoCPUs') - memory += 0 or node.attrs \ - .get('Description') \ - .get('Resources') \ - .get('MemoryBytes') - ''' - ' For whatever reason the Python Docker SDK provides a CPU count in - ' nano (10^-6), whilst this is not that handy, it gets converted. - ''' - cpus *= 10 ** -9 - ''' - ' For a more natural handling the memory information - ' gets converted from bytes to megabytes. - ''' - memory *= 10 ** -6 - self.cpus = int(cpus) - self.memory = int(memory) - self.available_cpus = self.cpus - self.available_memory = self.memory + ''' + ' Swarm mode is intendet to run containers which serve a non terminating + ' service like a webserver. For processing an occuring job it is necessary + ' to use a one-shot container, which stops after the wrapped job process is + ' completly executed. In order to run these one-shot containers in Swarm + ' mode, the following run method is implemented analog to the presented + ' implementation in Alex Ellis' blog post "One-shot containers on Docker + ' Swarm"¹. + ' + ' ¹ https://blog.alexellis.io/containers-on-swarm/ + ''' def run(self, job): - if self.available_cpus < job['requested_cpus'] or \ - self.available_memory < job['requested_memory']: - print('Not enough ressources available.') - ''' - ' TODO: At this point the scheduler thinks that the job gets - ' processed, which apparently is not the case. So the job - ' needs to get rescheduled and gain a new chance to get - ' processed (next). - ' - ' Note: Maybe it is a good idea to create a method that checks if - ' enough ressources are available before the run method gets - ' executed. This would replace the need of the TODO mentioned - ' above. - ''' - return - - job['status'] = 'running' - # TODO: Push job changes to the database - self.available_cpus -= job['requested_cpus'] - self.available_memory -= job['requested_memory'] - - container_command = 'ocr' \ - + ' -i /input/{}'.format(job['id']) \ - + ' -l {}'.format(job['service_args']['lang']) \ - + ' -o /output' \ - + ' --keep-intermediates' \ - + ' --nCores {}'.format(job['requested_cpus']) - container_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/ocr' - container_mount = '/media/sf_files/=/input/' - ''' - ' Swarm mode is intendet to run containers which are meant to serve a - ' non terminating service like a webserver. In order to process the - ' occuring jobs it is necessary to use one-shot (terminating) - ' containers. These one-shot containers are spawned with a programm - ' called JaaS¹ (Jobs as a Service), which is described in Alex Ellis' - ' short article "One-shot containers on Docker Swarm"². - ' - ' ¹ https://github.com/alexellis/jaas - ' ² https://blog.alexellis.io/containers-on-swarm/ - ''' - cmd = ['jaas', 'run'] \ - + ['--command', container_command] \ - + ['--image', container_image] \ - + ['--mount', container_mount] \ - + ['--timeout', '86400s'] - completed_process = subprocess.run( - cmd, - stderr=subprocess.DEVNULL, - stdout=subprocess.DEVNULL + # Prepare argument values needed for the service creation. + _command = 'ocr' \ + + ' -i /input/{}'.format(job['id']) \ + + ' -l {}'.format(job['service_args']['lang']) \ + + ' -o /output' \ + + ' --keep-intermediates' \ + + ' --nCores {}'.format(job['requested_cpus']) + _constraints = ['node.role==worker'] + _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( + job['service'], + job['service_args']['version'] ) - - self.available_cpus += job['requested_cpus'] - self.available_memory += job['requested_memory'] - if (completed_process.returncode == 0): - job['status'] = 'finished' - else: - job['status'] = 'failed' - # TODO: Push job changes to the database + _labels = {'service': 'ocr'} + _mounts = ['/media/sf_files:/input:rw'] + _name = job['id'] + ''' + ' The Docker SDK for Python expects the cpu_reservation value to be + ' scaled to nanos (10^9). Because the job object contains unscaled + ' (10^0) values, it must be conveted. + ' + ' While the cpu_reservation value has to be in nanos, the + ' mem_reservation value must be presented in bytes (intuitive right?). + ' Bacause the job object provides the memory value in megabytes, it is + ' also necessary to convert the value. + ''' + _resources = docker.types.Resources( + cpu_reservation=job['requested_cpus'] * (10 ** 9), + mem_reservation=job['requested_memory'] * (10 ** 6) + ) + _restart_policy = docker.types.RestartPolicy(condition='none') + ''' + ' Create the service with the prepared values. + ' + ' Note: A service reserves hardware ressources. In case no worker node + ' has the required ressources available (not reserved), the + ' service gets queued by the Docker engine until a node is able + ' to meet the requirements. + ' + ' TODO: The name argument should be used with the prepared value + ' (name=_name). Because there is no id generator for now, it is + ' not set, so that the Docker engine assigns a random name. + ''' + service = self.docker.services.create( + _image, + command=_command, + constraints=_constraints, + labels=_labels, + mounts=_mounts, + resources=_resources, + restart_policy=_restart_policy + ) + ''' + ' Poll the service until the job is completly executed. + ' + ' Note: Because it takes some time until all data in the service object + ' is initialized (especcially the task list returned by the tasks + ' method) the poll loop also checks if the task list is empy (The + ' not service.tasks() condition implements this). + ''' + while not service.tasks() or \ + service.tasks()[0].get('Status').get('State') != 'complete': + time.sleep(1) + service.reload() + # Remove the service from the swarm. + service.remove() return