Change job model.

This commit is contained in:
Patrick Jentsch 2019-08-09 11:48:43 +02:00
parent 50273ea4d1
commit a82b7292ed
4 changed files with 114 additions and 123 deletions

View File

@ -221,24 +221,17 @@ class Job(db.Model):
# Primary key # Primary key
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
creation_date = db.Column(db.DateTime(), default=datetime.utcnow) creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(64)) description = db.Column(db.String(255))
''' mem_mb = db.Column(db.Integer)
' Requested ressources. n_cores = db.Column(db.Integer)
' Example: {"n_cores": 2,
' "mem_mb": 4096
' }
'''
ressources = db.Column(db.String(255))
service = db.Column(db.String(64)) service = db.Column(db.String(64))
''' '''
' Service specific arguments in JSON format. ' Service specific arguments as string list.
' Example: {"args": ["--keep-intermediates", "skip-binarization"], ' Example: ["-l eng", "--keep-intermediates", "--skip-binarization"]
' "lang": "eng",
' "version": "latest"
' }
''' '''
service_args = db.Column(db.String(255)) service_args = db.Column(db.String(255))
status = db.Column(db.String(8)) service_version = db.Column(db.String(16))
status = db.Column(db.String(16))
title = db.Column(db.String(32)) title = db.Column(db.String(32))
user_id = db.Column(db.Integer, db.ForeignKey('users.id')) user_id = db.Column(db.Integer, db.ForeignKey('users.id'))

View File

@ -3,10 +3,50 @@ from wtforms import MultipleFileField, SelectField, StringField, SubmitField, Va
from wtforms.validators import DataRequired, Length from wtforms.validators import DataRequired, Length
class NewNLPJobForm(FlaskForm):
description = StringField(
'Description',
validators=[DataRequired(), Length(1, 255)]
)
files = MultipleFileField('Files', validators=[DataRequired()])
language = SelectField(
'Language',
choices=[('', 'Choose your option'),
('en', 'English'),
('fr', 'French'),
('de', 'German'),
('it', 'Italian'),
('pt', 'Portuguese'),
('es', 'Spanish')
],
validators=[DataRequired()]
)
submit = SubmitField('Submit')
title = StringField(
'Title',
validators=[DataRequired(), Length(1, 32)]
)
version = SelectField(
'Version',
choices=[('', 'Choose your option'),
('latest', 'Latest'),
],
validators=[DataRequired()]
)
def validate_files(form, field):
for file in field.data:
if not file.filename.lower().endswith('.txt'):
raise ValidationError(
'File does not have an approved extension: '
'.txt'
)
class NewOCRJobForm(FlaskForm): class NewOCRJobForm(FlaskForm):
description = StringField( description = StringField(
'Description', 'Description',
validators=[DataRequired(), Length(1, 64)] validators=[DataRequired(), Length(1, 255)]
) )
files = MultipleFileField('Files', validators=[DataRequired()]) files = MultipleFileField('Files', validators=[DataRequired()])
language = SelectField( language = SelectField(
@ -44,43 +84,3 @@ class NewOCRJobForm(FlaskForm):
'File does not have an approved extension: ' 'File does not have an approved extension: '
'.pdf | .tif | .tiff' '.pdf | .tif | .tiff'
) )
class NewNLPJobForm(FlaskForm):
description = StringField(
'Description',
validators=[DataRequired(), Length(1, 64)]
)
files = MultipleFileField('Files', validators=[DataRequired()])
language = SelectField(
'Language',
choices=[('', 'Choose your option'),
('en', 'English'),
('fr', 'French'),
('de', 'German'),
('it', 'Italian'),
('pt', 'Portuguese'),
('es', 'Spanish')
],
validators=[DataRequired()]
)
submit = SubmitField('Submit')
title = StringField(
'Title',
validators=[DataRequired(), Length(1, 32)]
)
version = SelectField(
'Version',
choices=[('', 'Choose your option'),
('latest', 'Latest'),
],
validators=[DataRequired()]
)
def validate_files(form, field):
for file in field.data:
if not file.filename.lower().endswith('.txt'):
raise ValidationError(
'File does not have an approved extension: '
'.txt'
)

View File

@ -6,61 +6,8 @@ from ..models import Job
from ..import swarm from ..import swarm
from .. import db from .. import db
from threading import Thread from threading import Thread
import os
import json import json
import os
@services.route('/ocr', methods=['GET', 'POST'])
@login_required
def ocr():
new_ocr_job_form = NewOCRJobForm()
if new_ocr_job_form.validate_on_submit():
ocr_job = Job(creator=current_user._get_current_object(),
description=new_ocr_job_form.description.data,
service="ocr",
ressources=json.dumps({"n_cores": 4,
"mem_mb": 8192}),
service_args=json.dumps({"args": ["--keep-intermediates",
"--skip-binarisation"],
"lang": new_ocr_job_form.language.data,
"version": new_ocr_job_form.version.data}),
status="pending",
title=new_ocr_job_form.title.data)
db.session.add(ocr_job)
db.session.commit()
dir = os.path.join(current_app.config['OPAQUE_STORAGE'],
str(ocr_job.user_id),
'jobs',
str(ocr_job.id))
try:
os.makedirs(dir)
except OSError:
flash('OSError!')
db.session.remove(ocr_job)
db.session.commit()
else:
for file in new_ocr_job_form.files.data:
file.save(os.path.join(dir, file.filename))
'''
' TODO: Let the scheduler run this job in the background.
'
' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available.
'''
db.session.expunge(ocr_job)
thread = Thread(target=swarm.run, args=(ocr_job,))
thread.start()
flash('Job created!')
return redirect(url_for('services.ocr'))
return render_template(
'services/ocr.html.j2',
title='Optical Character Recognition',
new_ocr_job_form=new_ocr_job_form
)
@services.route('/nlp', methods=['GET', 'POST']) @services.route('/nlp', methods=['GET', 'POST'])
@ -68,15 +15,14 @@ def ocr():
def nlp(): def nlp():
new_nlp_job_form = NewNLPJobForm() new_nlp_job_form = NewNLPJobForm()
if new_nlp_job_form.validate_on_submit(): if new_nlp_job_form.validate_on_submit():
nlp_job = Job(creator=current_user._get_current_object(), nlp_job = Job(creator=current_user,
description=new_nlp_job_form.description.data, description=new_nlp_job_form.description.data,
service="nlp", mem_mb=4096,
ressources=json.dumps({"n_cores": 2, n_cores=2,
"mem_mb": 4096}), service='nlp',
service_args=json.dumps({"args": [], service_args=json.dumps(['-l {}'.format(new_nlp_job_form.language.data)]),
"lang": new_nlp_job_form.language.data, service_version=new_nlp_job_form.version.data,
"version": new_nlp_job_form.version.data}), status='submitted',
status="pending",
title=new_nlp_job_form.title.data) title=new_nlp_job_form.title.data)
db.session.add(nlp_job) db.session.add(nlp_job)
@ -113,3 +59,57 @@ def nlp():
title='Natrual Language Processing', title='Natrual Language Processing',
new_nlp_job_form=new_nlp_job_form new_nlp_job_form=new_nlp_job_form
) )
@services.route('/ocr', methods=['GET', 'POST'])
@login_required
def ocr():
new_ocr_job_form = NewOCRJobForm()
if new_ocr_job_form.validate_on_submit():
ocr_job = Job(creator=current_user,
description=new_ocr_job_form.description.data,
mem_mb=8192,
n_cores=4,
service='ocr',
service_args=json.dumps([
'-l {}'.format(new_ocr_job_form.language.data),
'--keep-intermediates',
'--skip-binarisation']),
service_version=new_ocr_job_form.version.data,
status='submitted',
title=new_ocr_job_form.title.data)
db.session.add(ocr_job)
db.session.commit()
dir = os.path.join(current_app.config['OPAQUE_STORAGE'],
str(ocr_job.user_id),
'jobs',
str(ocr_job.id))
try:
os.makedirs(dir)
except OSError:
flash('OSError!')
db.session.remove(ocr_job)
db.session.commit()
else:
for file in new_ocr_job_form.files.data:
file.save(os.path.join(dir, file.filename))
'''
' TODO: Let the scheduler run this job in the background.
'
' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available.
'''
db.session.expunge(ocr_job)
thread = Thread(target=swarm.run, args=(ocr_job,))
thread.start()
flash('Job created!')
return redirect(url_for('services.ocr'))
return render_template(
'services/ocr.html.j2',
title='Optical Character Recognition',
new_ocr_job_form=new_ocr_job_form
)

View File

@ -35,16 +35,14 @@ class Swarm:
''' '''
# Prepare argument values needed for the service creation. # Prepare argument values needed for the service creation.
service_args = json.loads(job.service_args) service_args = json.loads(job.service_args)
ressources = json.loads(job.ressources)
_command = (job.service _command = (job.service
+ ' -i /files' + ' -i /files'
+ ' -l {}'.format(service_args['lang'])
+ ' -o /files/output' + ' -o /files/output'
+ ' ' + ' '.join(service_args['args'])) + ' ' + ' '.join(service_args))
_constraints = ['node.role==worker'] _constraints = ['node.role==worker']
_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
job.service, job.service,
service_args['version'] job.service_version
) )
_labels = {'service': job.service} _labels = {'service': job.service}
_mounts = [os.path.join('/home/compute/mnt/opaque', _mounts = [os.path.join('/home/compute/mnt/opaque',
@ -64,8 +62,8 @@ class Swarm:
' in megabytes, it is also necessary to convert the value. ' in megabytes, it is also necessary to convert the value.
''' '''
_resources = docker.types.Resources( _resources = docker.types.Resources(
cpu_reservation=ressources['n_cores'] * (10 ** 9), cpu_reservation=job.n_cores * (10 ** 9),
mem_reservation=ressources['mem_mb'] * (10 ** 6) mem_reservation=job.mem_mb * (10 ** 6)
) )
_restart_policy = docker.types.RestartPolicy(condition='none') _restart_policy = docker.types.RestartPolicy(condition='none')
''' '''