standardize the use of service name handles, e.g. file-setup instead of file_setup

This commit is contained in:
Patrick Jentsch 2021-08-04 12:26:49 +02:00
parent ec7a88f36e
commit da350474fb
10 changed files with 316 additions and 87 deletions

View File

@ -2,10 +2,7 @@ from flask import Blueprint
SERVICES = { SERVICES = {
'corpus_analysis': { 'file-setup': {
'name': 'Corpus analysis'
},
'file_setup': {
'name': 'File setup', 'name': 'File setup',
'versions': { 'versions': {
'latest': '1.0.0b', 'latest': '1.0.0b',

View File

@ -77,7 +77,14 @@ class AddFileSetupJobForm(AddJobForm):
'| .tif') '| .tif')
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
version = kwargs.pop('version', SERVICES['file_setup']['versions']['latest']) version = kwargs.pop('version', SERVICES['file-setup']['versions']['latest'])
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.version.choices = [(x, x) for x in SERVICES['file_setup']['versions'] if x != 'latest'] # noqa self.version.choices = [(x, x) for x in SERVICES['file-setup']['versions'] if x != 'latest'] # noqa
self.version.default = version self.version.default = version
AddJobForms = {
'file-setup': AddFileSetupJobForm,
'ocr': AddOCRJobForm,
'nlp': AddNLPJobForm
}

View File

@ -4,41 +4,36 @@ from flask_login import current_user, login_required
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from . import services from . import services
from . import SERVICES from . import SERVICES
from .forms import AddFileSetupJobForm, AddNLPJobForm, AddOCRJobForm
from .. import db, socketio from .. import db, socketio
from .forms import AddJobForms
from ..models import Job, JobInput from ..models import Job, JobInput
import json import json
import logging import logging
import os import os
@services.route('/corpus-analysis')
@login_required
def corpus_analysis():
return render_template('services/corpus_analysis.html.j2')
@services.route('/<service>', methods=['GET', 'POST']) @services.route('/<service>', methods=['GET', 'POST'])
@login_required @login_required
def service(service): def service(service):
if service not in SERVICES: # Check if the requested service exist
if service not in SERVICES or service not in AddJobForms:
abort(404) abort(404)
if service == 'corpus_analysis': version = request.args.get('version',
return render_template('services/{}.html.j2'.format(service), SERVICES[service]['versions']['latest'])
title=SERVICES[service]['name']) if version not in SERVICES[service]['versions']:
elif service == 'file_setup': abort(404)
form = AddFileSetupJobForm(prefix='add-file-setup-job-form') form = AddJobForms[service](prefix='add-job-form', version=version)
elif service == 'nlp': form.version.data = version
version = request.args.get('version') title = SERVICES[service]['name']
if version is None or version not in SERVICES[service]['versions']: versions = SERVICES[service]['versions']
form = AddNLPJobForm(prefix='add-nlp-job-form')
else:
form = AddNLPJobForm(prefix='add-nlp-job-form', version=version)
form.version.data = version
elif service == 'ocr':
version = request.args.get('version')
if version is None or version not in SERVICES[service]['versions']:
form = AddOCRJobForm(prefix='add-ocr-job-form')
else:
form = AddOCRJobForm(prefix='add-ocr-job-form', version=version)
form.version.data = version
if form.is_submitted(): if form.is_submitted():
if not form.validate(): if not form.validate():
logging.error(form.errors)
return make_response(form.errors, 400) return make_response(form.errors, 400)
service_args = [] service_args = []
if service == 'nlp': if service == 'nlp':
@ -80,6 +75,5 @@ def service(service):
socketio.emit(event, jsonpatch, room=room) socketio.emit(event, jsonpatch, room=room)
return make_response( return make_response(
{'redirect_url': url_for('jobs.job', job_id=job.id)}, 201) {'redirect_url': url_for('jobs.job', job_id=job.id)}, 201)
return render_template('services/{}.html.j2'.format(service), return render_template('services/{}.html.j2'.format(service.replace('-', '_')),
form=form, title=SERVICES[service]['name'], form=form, title=title, versions=versions)
versions=SERVICES[service]['versions'])

View File

@ -1,9 +1,3 @@
/* Fix material icon vertical alignment when nested in various elements */
h1 .nopaque-icons, h2 .nopaque-icons, h3 .nopaque-icons, h4 .nopaque-icons,
.tab .nopaque-icons, .tab .material-icons {
line-height: inherit;
}
.parallax-container .parallax { .parallax-container .parallax {
z-index: 0; z-index: 0;
} }

View File

@ -101,6 +101,11 @@ indicator will show up how the column is sorted right now.; */
font-size: 2.5rem; font-size: 2.5rem;
} }
/* Fix material icon vertical alignment when nested in various elements */
h1 .nopaque-icons, h2 .nopaque-icons, h3 .nopaque-icons, h4 .nopaque-icons,
.tab .nopaque-icons, .tab .material-icons {
line-height: inherit;
}
.nopaque-icons.service-icon[data-service="corpus-analysis"]:empty:before {content: "H";} .nopaque-icons.service-icon[data-service="corpus-analysis"]:empty:before {content: "H";}
.nopaque-icons.service-icon[data-service="file-setup"]:empty:before {content: "E";} .nopaque-icons.service-icon[data-service="file-setup"]:empty:before {content: "E";}
.nopaque-icons.service-icon[data-service="nlp"]:empty:before {content: "G";} .nopaque-icons.service-icon[data-service="nlp"]:empty:before {content: "G";}

218
app/tasks/job_utils.bak.py Normal file
View File

@ -0,0 +1,218 @@
from datetime import datetime
from werkzeug.utils import secure_filename
from .. import db, mail
from ..email import create_message
from ..models import Job, JobResult
import docker
import logging
import json
import os
DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567'
class CheckJobsMixin:
def check_jobs(self):
jobs = Job.query.all()
canceling_jobs = list(filter(lambda job: job.status == 'canceling', jobs)) # noqa
queued_jobs = list(filter(lambda job: job.status == 'queued', jobs))
running_jobs = list(filter(lambda job: job.status == 'running', jobs))
submitted_jobs = list(filter(lambda job: job.status == 'submitted', jobs)) # noqa
for job in submitted_jobs:
self.create_job_service(job)
for job in queued_jobs + running_jobs:
self.checkout_job_service(job)
for job in canceling_jobs:
self.remove_job_service(job)
def create_job_service(self, job):
if job.service == 'file-setup':
mem_mb = 2048
n_cores = 2
executable = 'file-setup'
image = '{}/sfb1288inf/file-setup:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
elif job.service == 'ocr':
mem_mb = 4096
n_cores = 4
executable = 'ocr'
image = '{}/sfb1288inf/ocr:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
elif job.service == 'nlp':
mem_mb = 2048
n_cores = 2
executable = 'nlp'
image = '{}/sfb1288inf/nlp:{}'.format(DOCKER_REGISTRY, job.service_version) # noqa
# Command
command = '{} -i /input -o /output'.format(executable)
command += ' --log-dir /input'
command += ' --mem-mb {}'.format(mem_mb)
command += ' --n-cores {}'.format(n_cores)
command += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
command += ' ' + ' '.join(json.loads(job.service_args))
# Constraints
constraints = ['node.role==worker']
# Labels
labels = {'origin': 'nopaque', 'type': 'job', 'job_id': str(job.id)}
# Mounts
## Input mount
input_mount_source = job.path
input_mount_target = os.path.abspath('/input')
if job.service == 'file-setup':
input_mount_target = os.path.join(input_mount_target, secure_filename(job.title)) # noqa
input_mount = '{}:{}:rw'.format(input_mount_source, input_mount_target)
## Output mount
output_mount_source = os.path.join(job.path, 'output')
output_mount_target = os.path.abspath('/output')
output_mount = '{}:{}:rw'.format(output_mount_source, output_mount_target) # noqa
os.makedirs(output_mount_src)
mounts = [input_mount, output_mount]
# Name
name = 'job_{}'.format(job.id)
# Ressources
ressources = docker.types.Resources(
cpu_reservation=n_cores * (10 ** 9),
mem_reservation=mem_mb * (10 ** 6)
)
# Restart policy
restart_policy = docker.types.RestartPolicy()
try:
self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
ressources=ressources,
restart_policy=restart_policy
)
except docker.errors.APIError as e:
logging.error(
'Create "{}" service raised '.format(service_kwargs['name'])
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
else:
job.status = 'queued'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
finally:
self.send_job_notification(job)
def checkout_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
logging.error('Get "{}" service raised '.format(service_name)
+ '"docker.errors.NotFound" The service does not exist. '
+ '(job.status: {} -> failed)'.format(job.status))
job.status = 'failed'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
except docker.errors.InvalidVersion:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.InvalidVersion" One of the arguments is '
+ 'not supported with the current API version.'
)
return
else:
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == 'queued' and task_state != 'pending':
job.status = 'running'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
elif job.status == 'running' and task_state in ['complete', 'failed']:
try:
service.remove()
except docker.errors.APIError as e:
logging.error(
'Remove "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. ' # noqa
+ 'Details: {}'.format(e)
)
return
else:
if task_state == 'complete':
results_dir = os.path.join(job.path, 'output')
result_files = filter(lambda x: x.endswith('.zip'),
os.listdir(results_dir))
for result_file in result_files:
job_result = JobResult(filename=result_file, job=job) # noqa
db.session.add(job_result)
db.session.flush()
db.session.refresh(job_result)
patch_operation = {'op': 'add', 'path': '/jobs/{}/results/{}'.format(job.id, job_result.id), 'value': job_result.to_dict()} # noqa
self.buffer_user_patch_operation(job, patch_operation) # noqa
job.end_date = datetime.utcnow()
patch_operation = {'op': 'replace', 'path': '/jobs/{}/end_date'.format(job.id), 'value': job.end_date.timestamp()} # noqa
self.buffer_user_patch_operation(job, patch_operation)
job.status = task_state
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
finally:
self.send_job_notification(job)
def remove_job_service(self, job):
service_name = 'job_{}'.format(job.id)
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
job.status = 'canceled'
patch_operation = {'op': 'replace', 'path': '/jobs/{}/status'.format(job.id), 'value': job.status} # noqa
self.buffer_user_patch_operation(job, patch_operation)
except docker.errors.APIError as e:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
except docker.errors.InvalidVersion:
logging.error(
'Get "{}" service raised '.format(service_name)
+ '"docker.errors.InvalidVersion" One of the arguments is '
+ 'not supported with the current API version.'
)
return
else:
try:
service.update(mounts=None)
except docker.errors.APIError as e:
logging.error(
'Update "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
return
try:
service.remove()
except docker.errors.APIError as e:
logging.error(
'Remove "{}" service raised '.format(service_name)
+ '"docker.errors.APIError" The server returned an error. '
+ 'Details: {}'.format(e)
)
def send_job_notification(self, job):
if job.creator.setting_job_status_mail_notifications == 'none':
return
if (job.creator.setting_job_status_mail_notifications == 'end'
and job.status not in ['complete', 'failed']):
return
msg = create_message(job.creator.email,
'Status update for your Job "{}"'.format(job.title), # noqa
'tasks/email/notification', job=job)
mail.send(msg)

View File

@ -9,24 +9,8 @@ import json
import os import os
# TODO: Integrate the service_settings into app/services/__init__.py DOCKER_REGISTRY = 'gitlab.ub.uni-bielefeld.de:4567'
service_settings = { DOCKER_IMAGE_PREFIX = '{}/sfb1288inf/'.format(DOCKER_REGISTRY)
'file-setup': {
'default_args': ' --mem-mb 2048 --n-cores 2',
'ressources': docker.types.Resources(cpu_reservation=2 * (10 ** 9),
mem_reservation=2048 * (10 ** 6))
},
'nlp': {
'default_args': ' --mem-mb 2048 --n-cores 2',
'ressources': docker.types.Resources(cpu_reservation=2 * (10 ** 9),
mem_reservation=2048 * (10 ** 6))
},
'ocr': {
'default_args': ' --mem-mb 4096 --n-cores 4',
'ressources': docker.types.Resources(cpu_reservation=4 * (10 ** 9),
mem_reservation=4096 * (10 ** 6))
}
}
class CheckJobsMixin: class CheckJobsMixin:
@ -44,36 +28,66 @@ class CheckJobsMixin:
self.remove_job_service(job) self.remove_job_service(job)
def create_job_service(self, job): def create_job_service(self, job):
cmd = job.service # Service specific settings
cmd += ' -i /input'
cmd += ' -o /output'
cmd += ' --log-dir /input'
cmd += ' --zip [{}]_{}'.format(job.service, secure_filename(job.title))
cmd += service_settings[job.service]['default_args']
cmd += ' ' + ' '.join(json.loads(job.service_args))
# Setup input mount
input_mount_src = job.path
input_mount_dest = os.path.abspath('/input')
if job.service == 'file-setup': if job.service == 'file-setup':
input_mount_dest = os.path.join(input_mount_dest, secure_filename(job.title)) # noqa mem_mb = 2048
input_mount = '{}:{}:rw'.format(input_mount_src, input_mount_dest) n_cores = 2
# Setup output mount executable = 'file-setup'
output_mount_src = os.path.join(job.path, 'output') image = DOCKER_IMAGE_PREFIX + 'file-setup:' + job.service_version
output_mount_dest = os.path.abspath('/output') elif job.service == 'ocr':
os.makedirs(output_mount_src) mem_mb = 4096
output_mount = '{}:{}:rw'.format(output_mount_src, output_mount_dest) n_cores = 4
service_kwargs = {'command': cmd, executable = 'ocr'
'constraints': ['node.role==worker'], image = DOCKER_IMAGE_PREFIX + 'ocr:' + job.service_version
'labels': {'origin': 'nopaque', elif job.service == 'nlp':
'type': 'job', mem_mb = 2048
'job_id': str(job.id)}, n_cores = 2
'mounts': [input_mount, output_mount], executable = 'nlp'
'name': 'job_{}'.format(job.id), image = DOCKER_IMAGE_PREFIX + 'nlp:' + job.service_version
'resources': service_settings[job.service]['ressources'], # noqa # Command
'restart_policy': docker.types.RestartPolicy()} command = '{} -i /input -o /output'.format(executable)
service_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(job.service, job.service_version) # noqa command += ' --log-dir /input'
command += ' --mem-mb {}'.format(mem_mb)
command += ' --n-cores {}'.format(n_cores)
command += ' --zip [' + job.service + ']_' + secure_filename(job.title)
command += ' ' + ' '.join(json.loads(job.service_args))
# Constraints
constraints = ['node.role==worker']
# Labels
labels = {'origin': 'nopaque', 'type': 'job', 'job_id': str(job.id)}
# Mounts
## Input mount
input_mount_source = job.path
input_mount_target = '/input'
if job.service == 'file-setup':
input_mount_target += '/' + secure_filename(job.title)
input_mount = input_mount_source + ':' + input_mount_target + ':rw'
## Output mount
output_mount_source = os.path.join(job.path, 'output')
output_mount_target = '/output'
output_mount = output_mount_source + ':' + output_mount_target + ':rw'
os.makedirs(output_mount_source)
mounts = [input_mount, output_mount]
# Name
name = 'job_{}'.format(job.id)
# Resources
resources = docker.types.Resources(
cpu_reservation=n_cores * (10 ** 9),
mem_reservation=mem_mb * (10 ** 6)
)
# Restart policy
restart_policy = docker.types.RestartPolicy()
try: try:
self.docker.services.create(service_image, **service_kwargs) self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
resources=resources,
restart_policy=restart_policy
)
except docker.errors.APIError as e: except docker.errors.APIError as e:
logging.error( logging.error(
'Create "{}" service raised '.format(service_kwargs['name']) 'Create "{}" service raised '.format(service_kwargs['name'])

View File

@ -14,7 +14,7 @@
<li><a href="{{ url_for('main.dashboard', _anchor='jobs') }}" style="padding-left: 47px;"><i class="nopaque-icons">J</i>My Jobs</a></li> <li><a href="{{ url_for('main.dashboard', _anchor='jobs') }}" style="padding-left: 47px;"><i class="nopaque-icons">J</i>My Jobs</a></li>
<li><div class="divider"></div></li> <li><div class="divider"></div></li>
<li><a class="subheader">Processes & Services</a></li> <li><a class="subheader">Processes & Services</a></li>
<li class="service-color service-color-border border-darken" data-service="file-setup" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='file_setup') }}"><i class="nopaque-icons service-icon" data-service="file-setup"></i>File setup</a></li> <li class="service-color service-color-border border-darken" data-service="file-setup" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='file-setup') }}"><i class="nopaque-icons service-icon" data-service="file-setup"></i>File setup</a></li>
<li class="service-color service-color-border border-darken" data-service="ocr" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='ocr') }}"><i class="nopaque-icons service-icon" data-service="ocr"></i>OCR</a></li> <li class="service-color service-color-border border-darken" data-service="ocr" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='ocr') }}"><i class="nopaque-icons service-icon" data-service="ocr"></i>OCR</a></li>
<li class="service-color service-color-border border-darken" data-service="nlp" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='nlp') }}"><i class="nopaque-icons service-icon" data-service="nlp"></i>NLP</a></li> <li class="service-color service-color-border border-darken" data-service="nlp" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='nlp') }}"><i class="nopaque-icons service-icon" data-service="nlp"></i>NLP</a></li>
<li class="service-color service-color-border border-darken" data-service="corpus-analysis" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='corpus_analysis') }}"><i class="nopaque-icons service-icon" data-service="corpus-analysis"></i>Corpus analysis</a></li> <li class="service-color service-color-border border-darken" data-service="corpus-analysis" style="border-left: 10px solid; margin-top: 5px;"><a href="{{ url_for('services.service', service='corpus_analysis') }}"><i class="nopaque-icons service-icon" data-service="corpus-analysis"></i>Corpus analysis</a></li>

View File

@ -9,7 +9,7 @@
<div class="col s12" data-job-id="{{ job.id }}" data-user-id="{{ job.creator.id }}" id="job-display"> <div class="col s12" data-job-id="{{ job.id }}" data-user-id="{{ job.creator.id }}" id="job-display">
<div class="row"> <div class="row">
<div class="col s8 m9 l10"> <div class="col s8 m9 l10">
<h1 id="title">[<span class="job-service"></span>] <span class="job-title"></span></h1> <h1 id="title"><i style="font-size: inherit;" class="nopaque-icons service-icon" data-service="{{ job.service }}"></i> <span class="job-title"></span></h1>
</div> </div>
<div class="col s4 m3 l2 right-align"> <div class="col s4 m3 l2 right-align">
<p>&nbsp;</p> <p>&nbsp;</p>

View File

@ -4,8 +4,8 @@
<li class="tab disabled"><i class="material-icons">navigate_next</i></li> <li class="tab disabled"><i class="material-icons">navigate_next</i></li>
{% if request.path == url_for('.service', service='corpus_analysis') %} {% if request.path == url_for('.service', service='corpus_analysis') %}
<li class="tab"><a class="active" href="{{ url_for('.service', service='corpus_analysis') }}" target="_self">{{ title }}</a></li> <li class="tab"><a class="active" href="{{ url_for('.service', service='corpus_analysis') }}" target="_self">{{ title }}</a></li>
{% elif request.path == url_for('.service', service='file_setup') %} {% elif request.path == url_for('.service', service='file-setup') %}
<li class="tab"><a class="active" href="{{ url_for('.service', service='file_setup') }}" target="_self">{{ title }}</a></li> <li class="tab"><a class="active" href="{{ url_for('.service', service='file-setup') }}" target="_self">{{ title }}</a></li>
{% elif request.path == url_for('.service', service='nlp') %} {% elif request.path == url_for('.service', service='nlp') %}
<li class="tab"><a class="active" href="{{ url_for('.service', service='nlp') }}" target="_self">{{ title }}</a></li> <li class="tab"><a class="active" href="{{ url_for('.service', service='nlp') }}" target="_self">{{ title }}</a></li>
{% elif request.path == url_for('.service', service='ocr') %} {% elif request.path == url_for('.service', service='ocr') %}