Handle job creation with the new Job class.

This commit is contained in:
Stephan Porada 2019-08-06 14:27:41 +02:00
parent 175dbceac6
commit 6f5bf674c6
2 changed files with 54 additions and 46 deletions

View File

@ -3,10 +3,13 @@ from flask import current_app, flash, redirect, render_template, url_for
from . import services from . import services
from flask_login import current_user, login_required from flask_login import current_user, login_required
from .forms import NewOCRJobForm, NewNLPJobForm from .forms import NewOCRJobForm, NewNLPJobForm
from ..models import Job
from ..import swarm from ..import swarm
from .. import db
from threading import Thread from threading import Thread
import hashlib import hashlib
import os import os
import json
@services.route('/ocr', methods=['GET', 'POST']) @services.route('/ocr', methods=['GET', 'POST'])
@ -15,24 +18,23 @@ def ocr():
new_ocr_job_form = NewOCRJobForm() new_ocr_job_form = NewOCRJobForm()
if new_ocr_job_form.validate_on_submit(): if new_ocr_job_form.validate_on_submit():
app = current_app._get_current_object() app = current_app._get_current_object()
id = hashlib.md5( ocr_job = Job()
(current_user.username + '_' + datetime.now().isoformat()).encode() ocr_job.title = new_ocr_job_form.title.data
).hexdigest() ocr_job.description = new_ocr_job_form.description.data
''' ocr_job.user_id = current_user.id
' TODO: Implement a Job class. For now a dictionary representation ocr_job.creation_date = datetime.utcnow()
' is enough. ocr_job.service = "ocr"
''' ocr_job.ressources = json.dumps({"n_cores": 2,
job = {'creator': current_user.id, "mem_mb": 4096})
'id': id, ocr_job.service_args = json.dumps({"args": ["--keep-intermediates",
'requested_cpus': 2, "--skip-binarisation"],
'requested_memory': 2048, "lang": new_ocr_job_form.language.data,
'service': 'ocr', "version": new_ocr_job_form.version.data})
'service_args': {'lang': new_ocr_job_form.language.data, ocr_job.status = "queued"
'version': new_ocr_job_form.version.data db.session.add(ocr_job)
}, db.session.commit()
'status': 'queued'
} dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(ocr_job.id))
dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id)
try: try:
os.makedirs(dir) os.makedirs(dir)
@ -47,7 +49,7 @@ def ocr():
' NOTE: Using self created threads is just for testing purpose as ' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available. ' there is no scheduler available.
''' '''
thread = Thread(target=swarm.run, args=(job,)) thread = Thread(target=swarm.run, args=(ocr_job,))
thread.start() thread.start()
flash('Job created!') flash('Job created!')
return redirect(url_for('services.ocr')) return redirect(url_for('services.ocr'))
@ -68,21 +70,21 @@ def nlp():
id = hashlib.md5( id = hashlib.md5(
(current_user.username + '_' + datetime.now().isoformat()).encode() (current_user.username + '_' + datetime.now().isoformat()).encode()
).hexdigest() ).hexdigest()
''' nlp_job = Job()
' TODO: Implement a Job class. For now a dictionary representation nlp_job.title = new_nlp_job_form.title.data
' is enough. nlp_job.description = new_nlp_job_form.description.data
''' nlp_job.user_id = current_user.id
job = {'creator': current_user.id, nlp_job.creation_date = datetime.utcnow()
'id': id, nlp_job.service = "nlp"
'requested_cpus': 2, nlp_job.ressources = json.dumps({"n_cores": 2,
'requested_memory': 2048, "mem_mb": 4096})
'service': 'nlp', nlp_job.service_args = json.dumps({"args": [],
'service_args': {'lang': new_nlp_job_form.language.data, "lang": new_nlp_job_form.language.data,
'version': new_nlp_job_form.version.data "version": new_nlp_job_form.version.data})
}, nlp_job.status = "queued"
'status': 'queued' db.session.add(nlp_job)
} db.session.commit()
dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id) dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(nlp_job.id))
try: try:
os.makedirs(dir) os.makedirs(dir)
@ -97,7 +99,7 @@ def nlp():
' NOTE: Using self created threads is just for testing purpose as ' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available. ' there is no scheduler available.
''' '''
thread = Thread(target=swarm.run, args=(job,)) thread = Thread(target=swarm.run, args=(nlp_job,))
thread.start() thread.start()
flash('Job created!') flash('Job created!')
return redirect(url_for('services.nlp')) return redirect(url_for('services.nlp'))

View File

@ -1,5 +1,6 @@
import docker import docker
import time import time
import json
class Swarm: class Swarm:
@ -19,22 +20,27 @@ class Swarm:
''' '''
def run(self, job): def run(self, job):
'''
Input is a job object. From this the _command is built.
'''
# Prepare argument values needed for the service creation. # Prepare argument values needed for the service creation.
_command = job['service'] \ service_args = json.loads(job.service_args)
+ ' -i /files/{}'.format(job['id']) \ ressources = json.loads(job.ressources)
+ ' -l {}'.format(job['service_args']['lang']) \ _command = (job.service
+ ' -o /files/{}/output'.format(job['id']) + ' -i /files/{}'.format(job.id)
# + ' --keep-intermediates' + ' -l {}'.format(service_args['lang'])
+ ' -o /files/{}/output'.format(job.id)
+ ' ' + ' '.join(service_args['args']))
_constraints = ['node.role==worker'] _constraints = ['node.role==worker']
_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
job['service'], job.service,
job['service_args']['version'] service_args['version']
) )
_labels = {'service': job['service']} _labels = {'service': job.service}
_mounts = [ _mounts = [
'/home/compute/mnt/opaque/jobs:/files:rw', '/home/compute/mnt/opaque/jobs:/files:rw',
] ]
_name = job['id'] _name = job.id
''' '''
' The Docker SDK for Python expects the cpu_reservation value to be ' The Docker SDK for Python expects the cpu_reservation value to be
' scaled to nanos (10^9). Because the job object contains unscaled ' scaled to nanos (10^9). Because the job object contains unscaled
@ -46,8 +52,8 @@ class Swarm:
' in megabytes, it is also necessary to convert the value. ' in megabytes, it is also necessary to convert the value.
''' '''
_resources = docker.types.Resources( _resources = docker.types.Resources(
cpu_reservation=job['requested_cpus'] * (10 ** 9), cpu_reservation=ressources['n_cores'] * (10 ** 9),
mem_reservation=job['requested_memory'] * (10 ** 6) mem_reservation=ressources['mem_mb'] * (10 ** 6)
) )
_restart_policy = docker.types.RestartPolicy(condition='none') _restart_policy = docker.types.RestartPolicy(condition='none')
''' '''