mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-04 12:22:47 +00:00 
			
		
		
		
	Don't use jaas as if offers not enough configuration possibilities.
This commit is contained in:
		
							
								
								
									
										162
									
								
								app/swarm.py
									
									
									
									
									
								
							
							
						
						
									
										162
									
								
								app/swarm.py
									
									
									
									
									
								
							@@ -1,98 +1,88 @@
 | 
				
			|||||||
import docker
 | 
					import docker
 | 
				
			||||||
import subprocess
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Swarm:
 | 
					class Swarm:
 | 
				
			||||||
    def __init__(self):
 | 
					    def __init__(self):
 | 
				
			||||||
        self.docker = docker.from_env()
 | 
					        self.docker = docker.from_env()
 | 
				
			||||||
        self.checkout()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def checkout(self):
 | 
					    '''
 | 
				
			||||||
        cpus = 0
 | 
					    ' Swarm mode is intendet to run containers which serve a non terminating
 | 
				
			||||||
        memory = 0
 | 
					    ' service like a webserver. For processing an occuring job it is necessary
 | 
				
			||||||
        for node in self.docker.nodes.list(filters={'role': 'worker'}):
 | 
					    ' to use a one-shot container, which stops after the wrapped job process is
 | 
				
			||||||
            if node.attrs.get('Status').get('State') == 'ready':
 | 
					    ' completly executed. In order to run these one-shot containers in Swarm
 | 
				
			||||||
                cpus += 0 or node.attrs \
 | 
					    ' mode, the following run method is implemented analog to the presented
 | 
				
			||||||
                    .get('Description') \
 | 
					    ' implementation in Alex Ellis' blog post "One-shot containers on Docker
 | 
				
			||||||
                    .get('Resources') \
 | 
					    ' Swarm"¹.
 | 
				
			||||||
                    .get('NanoCPUs')
 | 
					    '
 | 
				
			||||||
                memory += 0 or node.attrs \
 | 
					    ' ¹ https://blog.alexellis.io/containers-on-swarm/
 | 
				
			||||||
                    .get('Description') \
 | 
					    '''
 | 
				
			||||||
                    .get('Resources') \
 | 
					 | 
				
			||||||
                    .get('MemoryBytes')
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        ' For whatever reason the Python Docker SDK provides a CPU count in
 | 
					 | 
				
			||||||
        ' nano (10^-6), whilst this is not that handy, it gets converted.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        cpus *= 10 ** -9
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        ' For a more natural handling the memory information
 | 
					 | 
				
			||||||
        ' gets converted from bytes to megabytes.
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        memory *= 10 ** -6
 | 
					 | 
				
			||||||
        self.cpus = int(cpus)
 | 
					 | 
				
			||||||
        self.memory = int(memory)
 | 
					 | 
				
			||||||
        self.available_cpus = self.cpus
 | 
					 | 
				
			||||||
        self.available_memory = self.memory
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def run(self, job):
 | 
					    def run(self, job):
 | 
				
			||||||
        if self.available_cpus < job['requested_cpus'] or \
 | 
					        # Prepare argument values needed for the service creation.
 | 
				
			||||||
           self.available_memory < job['requested_memory']:
 | 
					        _command = 'ocr' \
 | 
				
			||||||
            print('Not enough ressources available.')
 | 
					                  + ' -i /input/{}'.format(job['id']) \
 | 
				
			||||||
            '''
 | 
					                  + ' -l {}'.format(job['service_args']['lang']) \
 | 
				
			||||||
            ' TODO: At this point the scheduler thinks that the job gets
 | 
					                  + ' -o /output' \
 | 
				
			||||||
            '       processed, which apparently is not the case. So the job
 | 
					                  + ' --keep-intermediates' \
 | 
				
			||||||
            '       needs to get rescheduled and gain a new chance to get
 | 
					                  + ' --nCores {}'.format(job['requested_cpus'])
 | 
				
			||||||
            '       processed (next).
 | 
					        _constraints = ['node.role==worker']
 | 
				
			||||||
            '
 | 
					        _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
 | 
				
			||||||
            ' Note: Maybe it is a good idea to create a method that checks if
 | 
					            job['service'],
 | 
				
			||||||
            '       enough ressources are available before the run method gets
 | 
					            job['service_args']['version']
 | 
				
			||||||
            '       executed. This would replace the need of the TODO mentioned
 | 
					 | 
				
			||||||
            '       above.
 | 
					 | 
				
			||||||
            '''
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        job['status'] = 'running'
 | 
					 | 
				
			||||||
        # TODO: Push job changes to the database
 | 
					 | 
				
			||||||
        self.available_cpus -= job['requested_cpus']
 | 
					 | 
				
			||||||
        self.available_memory -= job['requested_memory']
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        container_command = 'ocr' \
 | 
					 | 
				
			||||||
                            + ' -i /input/{}'.format(job['id']) \
 | 
					 | 
				
			||||||
                            + ' -l {}'.format(job['service_args']['lang']) \
 | 
					 | 
				
			||||||
                            + ' -o /output' \
 | 
					 | 
				
			||||||
                            + ' --keep-intermediates' \
 | 
					 | 
				
			||||||
                            + ' --nCores {}'.format(job['requested_cpus'])
 | 
					 | 
				
			||||||
        container_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/ocr'
 | 
					 | 
				
			||||||
        container_mount = '/media/sf_files/=/input/'
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        ' Swarm mode is intendet to run containers which are meant to serve a
 | 
					 | 
				
			||||||
        ' non terminating service like a webserver. In order to process the
 | 
					 | 
				
			||||||
        ' occuring jobs it is necessary to use one-shot (terminating)
 | 
					 | 
				
			||||||
        ' containers. These one-shot containers are spawned with a programm
 | 
					 | 
				
			||||||
        ' called JaaS¹ (Jobs as a Service), which is described in Alex Ellis'
 | 
					 | 
				
			||||||
        ' short article "One-shot containers on Docker Swarm"².
 | 
					 | 
				
			||||||
        '
 | 
					 | 
				
			||||||
        ' ¹ https://github.com/alexellis/jaas
 | 
					 | 
				
			||||||
        ' ² https://blog.alexellis.io/containers-on-swarm/
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        cmd = ['jaas', 'run'] \
 | 
					 | 
				
			||||||
            + ['--command', container_command] \
 | 
					 | 
				
			||||||
            + ['--image', container_image] \
 | 
					 | 
				
			||||||
            + ['--mount', container_mount] \
 | 
					 | 
				
			||||||
            + ['--timeout', '86400s']
 | 
					 | 
				
			||||||
        completed_process = subprocess.run(
 | 
					 | 
				
			||||||
            cmd,
 | 
					 | 
				
			||||||
            stderr=subprocess.DEVNULL,
 | 
					 | 
				
			||||||
            stdout=subprocess.DEVNULL
 | 
					 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					        _labels = {'service': 'ocr'}
 | 
				
			||||||
        self.available_cpus += job['requested_cpus']
 | 
					        _mounts = ['/media/sf_files:/input:rw']
 | 
				
			||||||
        self.available_memory += job['requested_memory']
 | 
					        _name = job['id']
 | 
				
			||||||
        if (completed_process.returncode == 0):
 | 
					        '''
 | 
				
			||||||
            job['status'] = 'finished'
 | 
					        ' The Docker SDK for Python expects the cpu_reservation value to be
 | 
				
			||||||
        else:
 | 
					        ' scaled to nanos (10^9). Because the job object contains unscaled
 | 
				
			||||||
            job['status'] = 'failed'
 | 
					        ' (10^0) values, it must be conveted.
 | 
				
			||||||
        # TODO: Push job changes to the database
 | 
					        '
 | 
				
			||||||
 | 
					        ' While the cpu_reservation value has to be in nanos, the
 | 
				
			||||||
 | 
					        ' mem_reservation value must be presented in bytes (intuitive right?).
 | 
				
			||||||
 | 
					        ' Bacause the job object provides the memory value in megabytes, it is
 | 
				
			||||||
 | 
					        ' also necessary to convert the value.
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        _resources = docker.types.Resources(
 | 
				
			||||||
 | 
					            cpu_reservation=job['requested_cpus'] * (10 ** 9),
 | 
				
			||||||
 | 
					            mem_reservation=job['requested_memory'] * (10 ** 6)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        _restart_policy = docker.types.RestartPolicy(condition='none')
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        ' Create the service with the prepared values.
 | 
				
			||||||
 | 
					        '
 | 
				
			||||||
 | 
					        ' Note: A service reserves hardware ressources. In case no worker node
 | 
				
			||||||
 | 
					        '       has the required ressources available (not reserved), the
 | 
				
			||||||
 | 
					        '       service gets queued by the Docker engine until a node is able
 | 
				
			||||||
 | 
					        '       to meet the requirements.
 | 
				
			||||||
 | 
					        '
 | 
				
			||||||
 | 
					        ' TODO: The name argument should be used with the prepared value
 | 
				
			||||||
 | 
					        '       (name=_name). Because there is no id generator for now, it is
 | 
				
			||||||
 | 
					        '       not set, so that the Docker engine assigns a random name.
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        service = self.docker.services.create(
 | 
				
			||||||
 | 
					            _image,
 | 
				
			||||||
 | 
					            command=_command,
 | 
				
			||||||
 | 
					            constraints=_constraints,
 | 
				
			||||||
 | 
					            labels=_labels,
 | 
				
			||||||
 | 
					            mounts=_mounts,
 | 
				
			||||||
 | 
					            resources=_resources,
 | 
				
			||||||
 | 
					            restart_policy=_restart_policy
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        ' Poll the service until the job is completly executed.
 | 
				
			||||||
 | 
					        '
 | 
				
			||||||
 | 
					        ' Note: Because it takes some time until all data in the service object
 | 
				
			||||||
 | 
					        '       is initialized (especcially the task list returned by the tasks
 | 
				
			||||||
 | 
					        '       method) the poll loop also checks if the task list is empy (The
 | 
				
			||||||
 | 
					        '       not service.tasks() condition implements this).
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        while not service.tasks() or \
 | 
				
			||||||
 | 
					                service.tasks()[0].get('Status').get('State') != 'complete':
 | 
				
			||||||
 | 
					            time.sleep(1)
 | 
				
			||||||
 | 
					            service.reload()
 | 
				
			||||||
 | 
					        # Remove the service from the swarm.
 | 
				
			||||||
 | 
					        service.remove()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user