Restructure code and use APScheduler for daemon functionality

This commit is contained in:
Patrick Jentsch
2022-06-28 12:30:02 +02:00
parent 7c52d3f392
commit b8bf004684
20 changed files with 755 additions and 710 deletions

View File

@ -1,23 +1,9 @@
from app import db
from flask import current_app
from time import sleep
from .corpus_utils import CheckCorporaMixin
from .job_utils import CheckJobsMixin
import docker
from .corpus_utils import check_corpora
from .job_utils import check_jobs
class Daemon(CheckCorporaMixin, CheckJobsMixin):
def __init__(self):
self.docker = docker.from_env()
self.docker.login(
username=current_app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'],
password=current_app.config['NOPAQUE_DOCKER_REGISTRY_PASSWORD'],
registry=current_app.config['NOPAQUE_DOCKER_REGISTRY']
)
def run(self):
while True:
self.check_corpora()
self.check_jobs()
db.session.commit()
sleep(1.5)
def daemon():
check_corpora()
check_jobs()
db.session.commit()

View File

@ -1,3 +1,4 @@
from app import docker_client
from app.models import Corpus, CorpusStatus
from flask import current_app
import docker
@ -5,250 +6,249 @@ import os
import shutil
class CheckCorporaMixin:
def check_corpora(self):
corpora = Corpus.query.all()
for corpus in (x for x in corpora if x.status == CorpusStatus.SUBMITTED): # noqa
self.create_build_corpus_service(corpus)
for corpus in (x for x in corpora if x.status == CorpusStatus.QUEUED or x.status == CorpusStatus.BUILDING): # noqa
self.checkout_build_corpus_service(corpus)
for corpus in (x for x in corpora if x.status == CorpusStatus.BUILT and x.num_analysis_sessions > 0): # noqa
corpus.status = CorpusStatus.STARTING_ANALYSIS_SESSION
for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION and x.num_analysis_sessions == 0): # noqa
corpus.status = CorpusStatus.CANCELING_ANALYSIS_SESSION
for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION): # noqa
self.checkout_analysing_corpus_container(corpus)
for corpus in (x for x in corpora if x.status == CorpusStatus.STARTING_ANALYSIS_SESSION): # noqa
self.create_cqpserver_container(corpus)
for corpus in (x for x in corpora if x.status == CorpusStatus.CANCELING_ANALYSIS_SESSION): # noqa
self.remove_cqpserver_container(corpus)
def check_corpora():
corpora = Corpus.query.all()
for corpus in [x for x in corpora if x.status == CorpusStatus.SUBMITTED]:
_create_build_corpus_service(corpus)
for corpus in [x for x in corpora if x.status in [CorpusStatus.QUEUED, CorpusStatus.BUILDING]]:
_checkout_build_corpus_service(corpus)
for corpus in [x for x in corpora if x.status == CorpusStatus.BUILT and x.num_analysis_sessions > 0]:
corpus.status = CorpusStatus.STARTING_ANALYSIS_SESSION
for corpus in [x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION and x.num_analysis_sessions == 0]:
corpus.status = CorpusStatus.CANCELING_ANALYSIS_SESSION
for corpus in [x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION]:
_checkout_analysing_corpus_container(corpus)
for corpus in [x for x in corpora if x.status == CorpusStatus.STARTING_ANALYSIS_SESSION]:
_create_cqpserver_container(corpus)
for corpus in [x for x in corpora if x.status == CorpusStatus.CANCELING_ANALYSIS_SESSION]:
_remove_cqpserver_container(corpus)
def create_build_corpus_service(self, corpus):
''' # Docker service settings # '''
''' ## Command ## '''
command = ['bash', '-c']
command.append(
f'mkdir /corpora/data/nopaque_{corpus.id}'
' && '
'cwb-encode'
' -c utf8'
f' -d /corpora/data/nopaque_{corpus.id}'
' -f /root/files/corpus.vrt'
f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}'
' -P pos -P lemma -P simple_pos'
' -S ent:0+type -S s:0'
' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title' # noqa
' -xsB -9'
' && '
f'cwb-make -V NOPAQUE_{corpus.id}'
def _create_build_corpus_service(corpus):
''' # Docker service settings # '''
''' ## Command ## '''
command = ['bash', '-c']
command.append(
f'mkdir /corpora/data/nopaque_{corpus.id}'
' && '
'cwb-encode'
' -c utf8'
f' -d /corpora/data/nopaque_{corpus.id}'
' -f /root/files/corpus.vrt'
f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}'
' -P pos -P lemma -P simple_pos'
' -S ent:0+type -S s:0'
' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title'
' -xsB -9'
' && '
f'cwb-make -V NOPAQUE_{corpus.id}'
)
''' ## Constraints ## '''
constraints = ['node.role==worker']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Labels ## '''
labels = {
'origin': current_app.config['SERVER_NAME'],
'type': 'corpus.build',
'corpus_id': str(corpus.id)
}
''' ## Mounts ## '''
mounts = []
''' ### Data mount ### '''
data_mount_source = os.path.join(corpus.path, 'cwb', 'data')
data_mount_target = '/corpora/data'
data_mount = f'{data_mount_source}:{data_mount_target}:rw'
# Make sure that their is no data in the data directory
shutil.rmtree(data_mount_source, ignore_errors=True)
os.makedirs(data_mount_source)
mounts.append(data_mount)
''' ### File mount ### '''
file_mount_source = os.path.join(corpus.path, 'cwb', 'corpus.vrt')
file_mount_target = '/root/files/corpus.vrt'
file_mount = f'{file_mount_source}:{file_mount_target}:ro'
mounts.append(file_mount)
''' ### Registry mount ### '''
registry_mount_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_mount_target = '/usr/local/share/cwb/registry'
registry_mount = f'{registry_mount_source}:{registry_mount_target}:rw'
# Make sure that their is no data in the registry directory
shutil.rmtree(registry_mount_source, ignore_errors=True)
os.makedirs(registry_mount_source)
mounts.append(registry_mount)
''' ## Name ## '''
name = f'build-corpus_{corpus.id}'
''' ## Restart policy ## '''
restart_policy = docker.types.RestartPolicy()
try:
docker_client.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
restart_policy=restart_policy,
user='0:0'
)
''' ## Constraints ## '''
constraints = ['node.role==worker']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Labels ## '''
labels = {
'origin': current_app.config['SERVER_NAME'],
'type': 'corpus.build',
'corpus_id': str(corpus.id)
}
''' ## Mounts ## '''
mounts = []
''' ### Data mount ### '''
data_mount_source = os.path.join(corpus.path, 'cwb', 'data')
data_mount_target = '/corpora/data'
data_mount = f'{data_mount_source}:{data_mount_target}:rw'
# Make sure that their is no data in the data directory
shutil.rmtree(data_mount_source, ignore_errors=True)
os.makedirs(data_mount_source)
mounts.append(data_mount)
''' ### File mount ### '''
file_mount_source = os.path.join(corpus.path, 'cwb', 'corpus.vrt')
file_mount_target = '/root/files/corpus.vrt'
file_mount = f'{file_mount_source}:{file_mount_target}:ro'
mounts.append(file_mount)
''' ### Registry mount ### '''
registry_mount_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_mount_target = '/usr/local/share/cwb/registry'
registry_mount = f'{registry_mount_source}:{registry_mount_target}:rw'
# Make sure that their is no data in the registry directory
shutil.rmtree(registry_mount_source, ignore_errors=True)
os.makedirs(registry_mount_source)
mounts.append(registry_mount)
''' ## Name ## '''
name = f'build-corpus_{corpus.id}'
''' ## Restart policy ## '''
restart_policy = docker.types.RestartPolicy()
try:
self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
restart_policy=restart_policy,
user='0:0'
)
except docker.errors.APIError as e:
current_app.logger.error(
f'Create service "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
corpus.status = CorpusStatus.QUEUED
def checkout_build_corpus_service(self, corpus):
service_name = f'build-corpus_{corpus.id}'
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
corpus.status = CorpusStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == CorpusStatus.QUEUED and task_state != 'pending': # noqa
corpus.status = CorpusStatus.BUILDING
return
elif corpus.status == CorpusStatus.BUILDING and task_state == 'complete': # noqa
corpus.status = CorpusStatus.BUILT
elif corpus.status == CorpusStatus.BUILDING and task_state == 'failed': # noqa
corpus.status = CorpusStatus.FAILED
else:
return
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def create_cqpserver_container(self, corpus):
''' # Docker container settings # '''
''' ## Command ## '''
command = []
command.append(
'echo "host *;" > cqpserver.init'
' && '
'echo "user anonymous \\"\\";" >> cqpserver.init'
' && '
'cqpserver -I cqpserver.init'
except docker.errors.APIError as e:
current_app.logger.error(
f'Create service "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
''' ## Detach ## '''
detach = True
''' ## Entrypoint ## '''
entrypoint = ['bash', '-c']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Name ## '''
name = f'cqpserver_{corpus.id}'
''' ## Network ## '''
network = 'nopaque_default'
''' ## Volumes ## '''
volumes = []
''' ### Corpus data volume ### '''
data_volume_source = os.path.join(corpus.path, 'cwb', 'data')
data_volume_target = '/corpora/data'
data_volume = f'{data_volume_source}:{data_volume_target}:rw'
volumes.append(data_volume)
''' ### Corpus registry volume ### '''
registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_volume_target = '/usr/local/share/cwb/registry'
registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw' # noqa
volumes.append(registry_volume)
# Check if a cqpserver container already exists. If this is the case,
# remove it and create a new one
try:
container = self.docker.containers.get(name)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
else:
try:
container.remove(force=True)
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
self.docker.containers.run(
image,
command=command,
detach=detach,
entrypoint=entrypoint,
name=name,
network=network,
user='0:0',
volumes=volumes
)
except docker.errors.ImageNotFound as e:
current_app.logger.error(
f'Run container "{name}" failed '
f'due to "docker.errors.ImageNotFound" error: {e}'
)
corpus.status = CorpusStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Run container "{name}" failed '
f'due to "docker.errors.APIError" error: {e}'
)
return
corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION
return
corpus.status = CorpusStatus.QUEUED
def checkout_analysing_corpus_container(self, corpus):
container_name = f'cqpserver_{corpus.id}'
try:
self.docker.containers.get(container_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
corpus.num_analysis_sessions = 0
corpus.status = CorpusStatus.BUILT
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def _checkout_build_corpus_service(corpus):
service_name = f'build-corpus_{corpus.id}'
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
corpus.status = CorpusStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == CorpusStatus.QUEUED and task_state != 'pending':
corpus.status = CorpusStatus.BUILDING
return
elif corpus.status == CorpusStatus.BUILDING and task_state == 'complete':
corpus.status = CorpusStatus.BUILT
elif corpus.status == CorpusStatus.BUILDING and task_state == 'failed':
corpus.status = CorpusStatus.FAILED
else:
return
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def remove_cqpserver_container(self, corpus):
container_name = f'cqpserver_{corpus.id}'
try:
container = self.docker.containers.get(container_name)
except docker.errors.NotFound:
corpus.status = CorpusStatus.BUILT
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
def _create_cqpserver_container(corpus):
''' # Docker container settings # '''
''' ## Command ## '''
command = []
command.append(
'echo "host *;" > cqpserver.init'
' && '
'echo "user anonymous \\"\\";" >> cqpserver.init'
' && '
'cqpserver -I cqpserver.init'
)
''' ## Detach ## '''
detach = True
''' ## Entrypoint ## '''
entrypoint = ['bash', '-c']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Name ## '''
name = f'cqpserver_{corpus.id}'
''' ## Network ## '''
network = 'nopaque_default'
''' ## Volumes ## '''
volumes = []
''' ### Corpus data volume ### '''
data_volume_source = os.path.join(corpus.path, 'cwb', 'data')
data_volume_target = '/corpora/data'
data_volume = f'{data_volume_source}:{data_volume_target}:rw'
volumes.append(data_volume)
''' ### Corpus registry volume ### '''
registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_volume_target = '/usr/local/share/cwb/registry'
registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw'
volumes.append(registry_volume)
# Check if a cqpserver container already exists. If this is the case,
# remove it and create a new one
try:
container = docker_client.containers.get(name)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
else:
try:
container.remove(force=True)
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove container "{container_name}" failed '
f'Remove container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
docker_client.containers.run(
image,
command=command,
detach=detach,
entrypoint=entrypoint,
name=name,
network=network,
user='0:0',
volumes=volumes
)
except docker.errors.ImageNotFound as e:
current_app.logger.error(
f'Run container "{name}" failed '
f'due to "docker.errors.ImageNotFound" error: {e}'
)
corpus.status = CorpusStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Run container "{name}" failed '
f'due to "docker.errors.APIError" error: {e}'
)
return
corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION
def _checkout_analysing_corpus_container(corpus):
container_name = f'cqpserver_{corpus.id}'
try:
docker_client.containers.get(container_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
corpus.num_analysis_sessions = 0
corpus.status = CorpusStatus.BUILT
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def _remove_cqpserver_container(corpus):
container_name = f'cqpserver_{corpus.id}'
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
corpus.status = CorpusStatus.BUILT
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
container.remove(force=True)
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)

View File

@ -1,4 +1,4 @@
from app import db
from app import db, docker_client
from app.models import (
Job,
JobResult,
@ -15,217 +15,216 @@ import os
import shutil
class CheckJobsMixin:
def check_jobs(self):
jobs = Job.query.all()
for job in (x for x in jobs if x.status == JobStatus.SUBMITTED):
self.create_job_service(job)
for job in (x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]): # noqa
self.checkout_job_service(job)
for job in (x for x in jobs if x.status == JobStatus.CANCELING):
self.remove_job_service(job)
def check_jobs():
jobs = Job.query.all()
for job in [x for x in jobs if x.status == JobStatus.SUBMITTED]:
_create_job_service(job)
for job in [x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]]:
_checkout_job_service(job)
for job in [x for x in jobs if x.status == JobStatus.CANCELING]:
_remove_job_service(job)
def create_job_service(self, job):
''' # Docker service settings # '''
''' ## Service specific settings ## '''
if job.service == 'file-setup-pipeline':
mem_mb = 512
n_cores = 2
executable = 'file-setup-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup-pipeline:v{job.service_version}' # noqa
elif job.service == 'tesseract-ocr-pipeline':
mem_mb = 1024
n_cores = 4
executable = 'tesseract-ocr-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}tesseract-ocr-pipeline:v{job.service_version}' # noqa
elif job.service == 'transkribus-htr-pipeline':
mem_mb = 1024
n_cores = 4
executable = 'transkribus-htr-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}transkribus-htr-pipeline:v{job.service_version}' # noqa
elif job.service == 'spacy-nlp-pipeline':
mem_mb = 1024
n_cores = 1
executable = 'spacy-nlp-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}spacy-nlp-pipeline:v{job.service_version}' # noqa
''' ## Command ## '''
command = f'{executable} -i /input -o /output'
command += ' --log-dir /logs'
command += f' --mem-mb {mem_mb}'
command += f' --n-cores {n_cores}'
if job.service == 'spacy-nlp-pipeline':
command += f' -m {job.service_args["model"]}'
if 'encoding_detection' in job.service_args and job.service_args['encoding_detection']: # noqa
command += ' --check-encoding'
elif job.service == 'tesseract-ocr-pipeline':
command += f' -m {job.service_args["model"]}'
if 'binarization' in job.service_args and job.service_args['binarization']:
command += ' --binarize'
elif job.service == 'transkribus-htr-pipeline':
transkribus_htr_model = TranskribusHTRModel.query.get(job.service_args['model'])
command += f' -m {transkribus_htr_model.transkribus_model_id}'
readcoop_username = current_app.config.get('NOPAQUE_READCOOP_USERNAME')
command += f' --readcoop-username "{readcoop_username}"'
readcoop_password = current_app.config.get('NOPAQUE_READCOOP_PASSWORD')
command += f' --readcoop-password "{readcoop_password}"'
if 'binarization' in job.service_args and job.service_args['binarization']:
command += ' --binarize'
''' ## Constraints ## '''
constraints = ['node.role==worker']
''' ## Labels ## '''
labels = {
'origin': current_app.config['SERVER_NAME'],
'type': 'job',
'job_id': str(job.id)
}
''' ## Mounts ## '''
mounts = []
''' ### Input mount(s) ### '''
input_mount_target_base = '/input'
if job.service == 'file-setup-pipeline':
input_mount_target_base += f'/{secure_filename(job.title)}'
for job_input in job.inputs:
input_mount_source = job_input.path
input_mount_target = f'{input_mount_target_base}/{job_input.filename}' # noqa
input_mount = f'{input_mount_source}:{input_mount_target}:ro'
mounts.append(input_mount)
if job.service == 'tesseract-ocr-pipeline':
model = TesseractOCRModel.query.get(job.service_args['model'])
if model is None:
job.status = JobStatus.FAILED
return
models_mount_source = model.path
models_mount_target = f'/usr/local/share/tessdata/{model.filename}'
models_mount = f'{models_mount_source}:{models_mount_target}:ro'
mounts.append(models_mount)
''' ### Output mount ### '''
output_mount_source = os.path.join(job.path, 'results')
output_mount_target = '/output'
output_mount = f'{output_mount_source}:{output_mount_target}:rw'
# Make sure that their is no data in the output directory
shutil.rmtree(output_mount_source, ignore_errors=True)
os.makedirs(output_mount_source)
mounts.append(output_mount)
''' ### Pipeline data mount ### '''
pyflow_data_mount_source = os.path.join(job.path, 'pipeline_data')
pyflow_data_mount_target = '/logs/pyflow.data'
pyflow_data_mount = f'{pyflow_data_mount_source}:{pyflow_data_mount_target}:rw' # noqa
# Make sure that their is no data in the output directory
shutil.rmtree(pyflow_data_mount_source, ignore_errors=True)
os.makedirs(pyflow_data_mount_source)
mounts.append(pyflow_data_mount)
''' ## Name ## '''
name = f'job_{job.id}'
''' ## Resources ## '''
resources = docker.types.Resources(
cpu_reservation=n_cores * (10 ** 9),
mem_reservation=mem_mb * (10 ** 6)
def _create_job_service(job):
''' # Docker service settings # '''
''' ## Service specific settings ## '''
if job.service == 'file-setup-pipeline':
mem_mb = 512
n_cores = 2
executable = 'file-setup-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup-pipeline:v{job.service_version}'
elif job.service == 'tesseract-ocr-pipeline':
mem_mb = 1024
n_cores = 4
executable = 'tesseract-ocr-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}tesseract-ocr-pipeline:v{job.service_version}'
elif job.service == 'transkribus-htr-pipeline':
mem_mb = 1024
n_cores = 4
executable = 'transkribus-htr-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}transkribus-htr-pipeline:v{job.service_version}'
elif job.service == 'spacy-nlp-pipeline':
mem_mb = 1024
n_cores = 1
executable = 'spacy-nlp-pipeline'
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}spacy-nlp-pipeline:v{job.service_version}'
''' ## Command ## '''
command = f'{executable} -i /input -o /output'
command += ' --log-dir /logs'
command += f' --mem-mb {mem_mb}'
command += f' --n-cores {n_cores}'
if job.service == 'spacy-nlp-pipeline':
command += f' -m {job.service_args["model"]}'
if 'encoding_detection' in job.service_args and job.service_args['encoding_detection']:
command += ' --check-encoding'
elif job.service == 'tesseract-ocr-pipeline':
command += f' -m {job.service_args["model"]}'
if 'binarization' in job.service_args and job.service_args['binarization']:
command += ' --binarize'
elif job.service == 'transkribus-htr-pipeline':
transkribus_htr_model = TranskribusHTRModel.query.get(job.service_args['model'])
command += f' -m {transkribus_htr_model.transkribus_model_id}'
readcoop_username = current_app.config.get('NOPAQUE_READCOOP_USERNAME')
command += f' --readcoop-username "{readcoop_username}"'
readcoop_password = current_app.config.get('NOPAQUE_READCOOP_PASSWORD')
command += f' --readcoop-password "{readcoop_password}"'
if 'binarization' in job.service_args and job.service_args['binarization']:
command += ' --binarize'
''' ## Constraints ## '''
constraints = ['node.role==worker']
''' ## Labels ## '''
labels = {
'origin': current_app.config['SERVER_NAME'],
'type': 'job',
'job_id': str(job.id)
}
''' ## Mounts ## '''
mounts = []
''' ### Input mount(s) ### '''
input_mount_target_base = '/input'
if job.service == 'file-setup-pipeline':
input_mount_target_base += f'/{secure_filename(job.title)}'
for job_input in job.inputs:
input_mount_source = job_input.path
input_mount_target = f'{input_mount_target_base}/{job_input.filename}'
input_mount = f'{input_mount_source}:{input_mount_target}:ro'
mounts.append(input_mount)
if job.service == 'tesseract-ocr-pipeline':
model = TesseractOCRModel.query.get(job.service_args['model'])
if model is None:
job.status = JobStatus.FAILED
return
models_mount_source = model.path
models_mount_target = f'/usr/local/share/tessdata/{model.filename}'
models_mount = f'{models_mount_source}:{models_mount_target}:ro'
mounts.append(models_mount)
''' ### Output mount ### '''
output_mount_source = os.path.join(job.path, 'results')
output_mount_target = '/output'
output_mount = f'{output_mount_source}:{output_mount_target}:rw'
# Make sure that their is no data in the output directory
shutil.rmtree(output_mount_source, ignore_errors=True)
os.makedirs(output_mount_source)
mounts.append(output_mount)
''' ### Pipeline data mount ### '''
pyflow_data_mount_source = os.path.join(job.path, 'pipeline_data')
pyflow_data_mount_target = '/logs/pyflow.data'
pyflow_data_mount = f'{pyflow_data_mount_source}:{pyflow_data_mount_target}:rw'
# Make sure that their is no data in the output directory
shutil.rmtree(pyflow_data_mount_source, ignore_errors=True)
os.makedirs(pyflow_data_mount_source)
mounts.append(pyflow_data_mount)
''' ## Name ## '''
name = f'job_{job.id}'
''' ## Resources ## '''
resources = docker.types.Resources(
cpu_reservation=n_cores * (10 ** 9),
mem_reservation=mem_mb * (10 ** 6)
)
''' ## Restart policy ## '''
restart_policy = docker.types.RestartPolicy()
try:
docker_client.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
resources=resources,
restart_policy=restart_policy,
user='0:0'
)
''' ## Restart policy ## '''
restart_policy = docker.types.RestartPolicy()
try:
self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
resources=resources,
restart_policy=restart_policy,
user='0:0'
)
except docker.errors.APIError as e:
current_app.logger.error(
f'Create service "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
job.status = JobStatus.QUEUED
except docker.errors.APIError as e:
current_app.logger.error(
f'Create service "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
job.status = JobStatus.QUEUED
def checkout_job_service(self, job):
service_name = f'job_{job.id}'
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.NotFound": {e}'
def _checkout_job_service(job):
service_name = f'job_{job.id}'
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
job.status = JobStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == JobStatus.QUEUED and task_state != 'pending':
job.status = JobStatus.RUNNING
return
elif job.status == JobStatus.RUNNING and task_state == 'complete':
job.status = JobStatus.COMPLETED
results_dir = os.path.join(job.path, 'results')
with open(os.path.join(results_dir, 'outputs.json')) as f:
outputs = json.load(f)
for output in outputs:
filename = os.path.basename(output['file'])
job_result = JobResult(
filename=filename,
job=job,
mimetype=output['mimetype']
)
job.status = JobStatus.FAILED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if job.status == JobStatus.QUEUED and task_state != 'pending':
job.status = JobStatus.RUNNING
return
elif job.status == JobStatus.RUNNING and task_state == 'complete': # noqa
job.status = JobStatus.COMPLETED
results_dir = os.path.join(job.path, 'results')
with open(os.path.join(results_dir, 'outputs.json')) as f:
outputs = json.load(f)
for output in outputs:
filename = os.path.basename(output['file'])
job_result = JobResult(
filename=filename,
job=job,
mimetype=output['mimetype']
)
if 'description' in output:
job_result.description = output['description']
db.session.add(job_result)
db.session.flush(objects=[job_result])
db.session.refresh(job_result)
os.rename(
os.path.join(results_dir, output['file']),
job_result.path
)
elif job.status == JobStatus.RUNNING and task_state == 'failed':
job.status = JobStatus.FAILED
else:
return
job.end_date = datetime.utcnow()
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
if 'description' in output:
job_result.description = output['description']
db.session.add(job_result)
db.session.flush(objects=[job_result])
db.session.refresh(job_result)
os.rename(
os.path.join(results_dir, output['file']),
job_result.path
)
elif job.status == JobStatus.RUNNING and task_state == 'failed':
job.status = JobStatus.FAILED
else:
return
job.end_date = datetime.utcnow()
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def remove_job_service(self, job):
service_name = f'job_{job.id}'
try:
service = self.docker.services.get(service_name)
except docker.errors.NotFound:
job.status = JobStatus.CANCELED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
service.update(mounts=None)
except docker.errors.APIError as e:
current_app.logger.error(
f'Update service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove "{service_name}" service failed '
f'due to "docker.errors.APIError": {e}'
)
def _remove_job_service(job):
service_name = f'job_{job.id}'
try:
service = docker_client.services.get(service_name)
except docker.errors.NotFound:
job.status = JobStatus.CANCELED
return
except docker.errors.APIError as e:
current_app.logger.error(
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
service.update(mounts=None)
except docker.errors.APIError as e:
current_app.logger.error(
f'Update service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
return
try:
service.remove()
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove "{service_name}" service failed '
f'due to "docker.errors.APIError": {e}'
)