mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-03 20:02:47 +00:00 
			
		
		
		
	Remove ugly workaround for scheduler function. Now the current app_context is used, instead of creating a new app instance.
This commit is contained in:
		@@ -40,13 +40,3 @@ def create_app(config_name):
 | 
			
		||||
    app.register_blueprint(admin_blueprint, url_prefix='/admin')
 | 
			
		||||
 | 
			
		||||
    return app
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def create_minimal_app(config_name):
 | 
			
		||||
    app = Flask(__name__)
 | 
			
		||||
    app.config.from_object(config[config_name])
 | 
			
		||||
 | 
			
		||||
    config[config_name].init_app(app)
 | 
			
		||||
    db.init_app(app)
 | 
			
		||||
 | 
			
		||||
    return app
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,5 @@
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from . import create_minimal_app, db
 | 
			
		||||
from . import db, scheduler
 | 
			
		||||
from .models import Job
 | 
			
		||||
import docker
 | 
			
		||||
import json
 | 
			
		||||
@@ -7,72 +7,71 @@ import os
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def checkout_jobs():
 | 
			
		||||
    app = create_minimal_app(os.getenv('FLASK_CONFIG') or 'default')
 | 
			
		||||
    app.app_context().push()
 | 
			
		||||
    client = docker.from_env()
 | 
			
		||||
    jobs = db.session.query(Job)
 | 
			
		||||
    submitted_jobs = jobs.filter_by(status='submitted').all()
 | 
			
		||||
    foo_jobs = jobs.filter(Job.status != 'complete',
 | 
			
		||||
                           Job.status != 'failed',
 | 
			
		||||
                           Job.status != 'submitted').all()
 | 
			
		||||
    for job in submitted_jobs:
 | 
			
		||||
        _command = (job.service
 | 
			
		||||
                    + ' -i /files'
 | 
			
		||||
                    + ' -o /files/output'
 | 
			
		||||
                    + ' ' + ' '.join(json.loads(job.service_args)))
 | 
			
		||||
        _constraints = ['node.role==worker']
 | 
			
		||||
        _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
 | 
			
		||||
            job.service,
 | 
			
		||||
            job.service_version
 | 
			
		||||
        )
 | 
			
		||||
        _labels = {'service': job.service}
 | 
			
		||||
        _mounts = [os.path.join('/home/compute/mnt/opaque',
 | 
			
		||||
                                str(job.user_id),
 | 
			
		||||
                                'jobs',
 | 
			
		||||
                                str(job.id))
 | 
			
		||||
                   + ':/files:rw']
 | 
			
		||||
        _name = str(job.id)
 | 
			
		||||
        '''
 | 
			
		||||
        ' The Docker SDK for Python expects the cpu_reservation value to be
 | 
			
		||||
        ' scaled to nanos (10^9). Because the job object contains unscaled
 | 
			
		||||
        ' (10^0) values, it must be conveted.
 | 
			
		||||
        '
 | 
			
		||||
        ' While the cpu_reservation value has to be in nanos, the
 | 
			
		||||
        ' mem_reservation value must be presented in an unscaled form
 | 
			
		||||
        ' (intuitive right?). Bacause the job object provides the memory value
 | 
			
		||||
        ' in megabytes, it is also necessary to convert the value.
 | 
			
		||||
        '''
 | 
			
		||||
        _resources = docker.types.Resources(
 | 
			
		||||
            cpu_reservation=job.n_cores * (10 ** 9),
 | 
			
		||||
            mem_reservation=job.mem_mb * (10 ** 6)
 | 
			
		||||
        )
 | 
			
		||||
        _restart_policy = docker.types.RestartPolicy(condition='none')
 | 
			
		||||
        '''
 | 
			
		||||
        ' Create the service with the prepared values.
 | 
			
		||||
        '
 | 
			
		||||
        ' Note: A service reserves hardware ressources. In case no worker node
 | 
			
		||||
        '       has the required ressources available (not reserved), the
 | 
			
		||||
        '       service gets queued by the Docker engine until a node is able
 | 
			
		||||
        '       to meet the requirements.
 | 
			
		||||
        '''
 | 
			
		||||
        service = client.services.create(
 | 
			
		||||
            _image,
 | 
			
		||||
            command=_command,
 | 
			
		||||
            constraints=_constraints,
 | 
			
		||||
            labels=_labels,
 | 
			
		||||
            mounts=_mounts,
 | 
			
		||||
            name=_name,
 | 
			
		||||
            resources=_resources,
 | 
			
		||||
            restart_policy=_restart_policy
 | 
			
		||||
        )
 | 
			
		||||
        job.status = 'scheduled'
 | 
			
		||||
    for job in foo_jobs:
 | 
			
		||||
        '''
 | 
			
		||||
        ' TODO: Handle service not found error.
 | 
			
		||||
        '''
 | 
			
		||||
        service = client.services.get(str(job.id))
 | 
			
		||||
        job.status = service.tasks()[0].get('Status').get('State')
 | 
			
		||||
        if job.status == 'complete' or job.status == 'failed':
 | 
			
		||||
            job.end_date = datetime.utcnow()
 | 
			
		||||
            service.remove()
 | 
			
		||||
    db.session.commit()
 | 
			
		||||
    with scheduler.app.app_context():
 | 
			
		||||
        client = docker.from_env()
 | 
			
		||||
        jobs = db.session.query(Job)
 | 
			
		||||
        submitted_jobs = jobs.filter_by(status='submitted').all()
 | 
			
		||||
        foo_jobs = jobs.filter(Job.status != 'complete',
 | 
			
		||||
                               Job.status != 'failed',
 | 
			
		||||
                               Job.status != 'submitted').all()
 | 
			
		||||
        for job in submitted_jobs:
 | 
			
		||||
            _command = (job.service
 | 
			
		||||
                        + ' -i /files'
 | 
			
		||||
                        + ' -o /files/output'
 | 
			
		||||
                        + ' ' + ' '.join(json.loads(job.service_args)))
 | 
			
		||||
            _constraints = ['node.role==worker']
 | 
			
		||||
            _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
 | 
			
		||||
                job.service,
 | 
			
		||||
                job.service_version
 | 
			
		||||
            )
 | 
			
		||||
            _labels = {'service': job.service}
 | 
			
		||||
            _mounts = [os.path.join('/home/compute/mnt/opaque',
 | 
			
		||||
                                    str(job.user_id),
 | 
			
		||||
                                    'jobs',
 | 
			
		||||
                                    str(job.id))
 | 
			
		||||
                       + ':/files:rw']
 | 
			
		||||
            _name = str(job.id)
 | 
			
		||||
            '''
 | 
			
		||||
            ' The Docker SDK for Python expects the cpu_reservation value to be
 | 
			
		||||
            ' scaled to nanos (10^9). Because the job object contains unscaled
 | 
			
		||||
            ' (10^0) values, it must be conveted.
 | 
			
		||||
            '
 | 
			
		||||
            ' While the cpu_reservation value has to be in nanos, the
 | 
			
		||||
            ' mem_reservation value must be presented in an unscaled form
 | 
			
		||||
            ' (intuitive right?). Bacause the job object provides the memory
 | 
			
		||||
            ' value in megabytes, it is also necessary to convert the value.
 | 
			
		||||
            '''
 | 
			
		||||
            _resources = docker.types.Resources(
 | 
			
		||||
                cpu_reservation=job.n_cores * (10 ** 9),
 | 
			
		||||
                mem_reservation=job.mem_mb * (10 ** 6)
 | 
			
		||||
            )
 | 
			
		||||
            _restart_policy = docker.types.RestartPolicy(condition='none')
 | 
			
		||||
            '''
 | 
			
		||||
            ' Create the service with the prepared values.
 | 
			
		||||
            '
 | 
			
		||||
            ' Note: A service reserves hardware ressources. In case no worker
 | 
			
		||||
            '       node has the required ressources available (not reserved),
 | 
			
		||||
            '       the service gets queued by the Docker engine until a node
 | 
			
		||||
            '       is able to meet the requirements.
 | 
			
		||||
            '''
 | 
			
		||||
            service = client.services.create(
 | 
			
		||||
                _image,
 | 
			
		||||
                command=_command,
 | 
			
		||||
                constraints=_constraints,
 | 
			
		||||
                labels=_labels,
 | 
			
		||||
                mounts=_mounts,
 | 
			
		||||
                name=_name,
 | 
			
		||||
                resources=_resources,
 | 
			
		||||
                restart_policy=_restart_policy
 | 
			
		||||
            )
 | 
			
		||||
            job.status = 'scheduled'
 | 
			
		||||
        for job in foo_jobs:
 | 
			
		||||
            '''
 | 
			
		||||
            ' TODO: Handle service not found error.
 | 
			
		||||
            '''
 | 
			
		||||
            service = client.services.get(str(job.id))
 | 
			
		||||
            job.status = service.tasks()[0].get('Status').get('State')
 | 
			
		||||
            if job.status == 'complete' or job.status == 'failed':
 | 
			
		||||
                job.end_date = datetime.utcnow()
 | 
			
		||||
                service.remove()
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user