diff --git a/app/services/views.py b/app/services/views.py index c580bb32..c3be8ac5 100644 --- a/app/services/views.py +++ b/app/services/views.py @@ -3,10 +3,13 @@ from flask import current_app, flash, redirect, render_template, url_for from . import services from flask_login import current_user, login_required from .forms import NewOCRJobForm, NewNLPJobForm +from ..models import Job from ..import swarm +from .. import db from threading import Thread import hashlib import os +import json @services.route('/ocr', methods=['GET', 'POST']) @@ -15,24 +18,23 @@ def ocr(): new_ocr_job_form = NewOCRJobForm() if new_ocr_job_form.validate_on_submit(): app = current_app._get_current_object() - id = hashlib.md5( - (current_user.username + '_' + datetime.now().isoformat()).encode() - ).hexdigest() - ''' - ' TODO: Implement a Job class. For now a dictionary representation - ' is enough. - ''' - job = {'creator': current_user.id, - 'id': id, - 'requested_cpus': 2, - 'requested_memory': 2048, - 'service': 'ocr', - 'service_args': {'lang': new_ocr_job_form.language.data, - 'version': new_ocr_job_form.version.data - }, - 'status': 'queued' - } - dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id) + ocr_job = Job() + ocr_job.title = new_ocr_job_form.title.data + ocr_job.description = new_ocr_job_form.description.data + ocr_job.user_id = current_user.id + ocr_job.creation_date = datetime.utcnow() + ocr_job.service = "ocr" + ocr_job.ressources = json.dumps({"n_cores": 2, + "mem_mb": 4096}) + ocr_job.service_args = json.dumps({"args": ["--keep-intermediates", + "--skip-binarisation"], + "lang": new_ocr_job_form.language.data, + "version": new_ocr_job_form.version.data}) + ocr_job.status = "queued" + db.session.add(ocr_job) + db.session.commit() + + dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(ocr_job.id)) try: os.makedirs(dir) @@ -47,7 +49,7 @@ def ocr(): ' NOTE: Using self created threads is just for testing purpose as ' there is no scheduler available. ''' - thread = Thread(target=swarm.run, args=(job,)) + thread = Thread(target=swarm.run, args=(ocr_job,)) thread.start() flash('Job created!') return redirect(url_for('services.ocr')) @@ -68,21 +70,21 @@ def nlp(): id = hashlib.md5( (current_user.username + '_' + datetime.now().isoformat()).encode() ).hexdigest() - ''' - ' TODO: Implement a Job class. For now a dictionary representation - ' is enough. - ''' - job = {'creator': current_user.id, - 'id': id, - 'requested_cpus': 2, - 'requested_memory': 2048, - 'service': 'nlp', - 'service_args': {'lang': new_nlp_job_form.language.data, - 'version': new_nlp_job_form.version.data - }, - 'status': 'queued' - } - dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', id) + nlp_job = Job() + nlp_job.title = new_nlp_job_form.title.data + nlp_job.description = new_nlp_job_form.description.data + nlp_job.user_id = current_user.id + nlp_job.creation_date = datetime.utcnow() + nlp_job.service = "nlp" + nlp_job.ressources = json.dumps({"n_cores": 2, + "mem_mb": 4096}) + nlp_job.service_args = json.dumps({"args": [], + "lang": new_nlp_job_form.language.data, + "version": new_nlp_job_form.version.data}) + nlp_job.status = "queued" + db.session.add(nlp_job) + db.session.commit() + dir = os.path.join(app.config['OPAQUE_STORAGE'], 'jobs', str(nlp_job.id)) try: os.makedirs(dir) @@ -97,7 +99,7 @@ def nlp(): ' NOTE: Using self created threads is just for testing purpose as ' there is no scheduler available. ''' - thread = Thread(target=swarm.run, args=(job,)) + thread = Thread(target=swarm.run, args=(nlp_job,)) thread.start() flash('Job created!') return redirect(url_for('services.nlp')) diff --git a/app/swarm.py b/app/swarm.py index a5a9e540..859a9f91 100644 --- a/app/swarm.py +++ b/app/swarm.py @@ -1,5 +1,6 @@ import docker import time +import json class Swarm: @@ -19,22 +20,27 @@ class Swarm: ''' def run(self, job): + ''' + Input is a job object. From this the _command is built. + ''' # Prepare argument values needed for the service creation. - _command = job['service'] \ - + ' -i /files/{}'.format(job['id']) \ - + ' -l {}'.format(job['service_args']['lang']) \ - + ' -o /files/{}/output'.format(job['id']) - # + ' --keep-intermediates' + service_args = json.loads(job.service_args) + ressources = json.loads(job.ressources) + _command = (job.service + + ' -i /files/{}'.format(job.id) + + ' -l {}'.format(service_args['lang']) + + ' -o /files/{}/output'.format(job.id) + + ' ' + ' '.join(service_args['args'])) _constraints = ['node.role==worker'] _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( - job['service'], - job['service_args']['version'] + job.service, + service_args['version'] ) - _labels = {'service': job['service']} + _labels = {'service': job.service} _mounts = [ '/home/compute/mnt/opaque/jobs:/files:rw', ] - _name = job['id'] + _name = job.id ''' ' The Docker SDK for Python expects the cpu_reservation value to be ' scaled to nanos (10^9). Because the job object contains unscaled @@ -46,8 +52,8 @@ class Swarm: ' in megabytes, it is also necessary to convert the value. ''' _resources = docker.types.Resources( - cpu_reservation=job['requested_cpus'] * (10 ** 9), - mem_reservation=job['requested_memory'] * (10 ** 6) + cpu_reservation=ressources['n_cores'] * (10 ** 9), + mem_reservation=ressources['mem_mb'] * (10 ** 6) ) _restart_policy = docker.types.RestartPolicy(condition='none') '''