nopaque/app/daemon/corpus_utils.py

253 lines
9.9 KiB
Python
Raw Normal View History

from flask import current_app
from ..models import Corpus
2020-11-09 15:14:19 +00:00
import docker
import os
import shutil
class CheckCorporaMixin:
def check_corpora(self):
corpora = Corpus.query.all()
2021-11-16 14:23:57 +00:00
for corpus in (x for x in corpora if x.status == 'submitted'):
self.create_build_corpus_service(corpus)
2021-11-16 14:23:57 +00:00
for corpus in (x for x in corpora if x.status == 'queued' or x.status == 'running'): # noqa
self.checkout_build_corpus_service(corpus)
2021-11-16 14:23:57 +00:00
for corpus in (x for x in corpora if x.status == 'prepared' and x.num_analysis_sessions > 0): # noqa
corpus.status = 'start analysis'
for corpus in (x for x in corpora if x.status == 'analysing' and x.num_analysis_sessions == 0): # noqa
corpus.status = 'stop analysis'
for corpus in (x for x in corpora if x.status == 'analysing'):
self.checkout_analysing_corpus_container(corpus)
2021-11-16 14:23:57 +00:00
for corpus in (x for x in corpora if x.status == 'start analysis'):
self.create_cqpserver_container(corpus)
for corpus in (x for x in corpora if x.status == 'stop analysis'):
self.remove_cqpserver_container(corpus)
2020-11-09 15:14:19 +00:00
def create_build_corpus_service(self, corpus):
''' # Docker service settings # '''
''' ## Command ## '''
command = ['bash', '-c']
command.append(
f'mkdir /corpora/data/nopaque_{corpus.id}'
' && '
'cwb-encode'
' -c utf8'
f' -d /corpora/data/nopaque_{corpus.id}'
' -f /root/files/corpus.vrt'
f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}'
' -P pos -P lemma -P simple_pos'
' -S ent:0+type -S s:0'
' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title' # noqa
' -xsB -9'
' && '
f'cwb-make -V NOPAQUE_{corpus.id}'
)
''' ## Constraints ## '''
constraints = ['node.role==worker']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Labels ## '''
labels = {
'origin': current_app.config['SERVER_NAME'],
'type': 'corpus.build',
'corpus_id': str(corpus.id)
}
''' ## Mounts ## '''
mounts = []
''' ### Data mount ### '''
data_mount_source = os.path.join(corpus.path, 'cwb', 'data')
data_mount_target = '/corpora/data'
data_mount = f'{data_mount_source}:{data_mount_target}:rw'
# Make sure that their is no data in the data directory
shutil.rmtree(data_mount_source, ignore_errors=True)
os.makedirs(data_mount_source)
mounts.append(data_mount)
''' ### File mount ### '''
file_mount_source = os.path.join(corpus.path, 'cwb', 'corpus.vrt')
file_mount_target = '/root/files/corpus.vrt'
file_mount = f'{file_mount_source}:{file_mount_target}:ro'
mounts.append(file_mount)
''' ### Registry mount ### '''
registry_mount_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_mount_target = '/usr/local/share/cwb/registry'
registry_mount = f'{registry_mount_source}:{registry_mount_target}:rw'
# Make sure that their is no data in the registry directory
shutil.rmtree(registry_mount_source, ignore_errors=True)
os.makedirs(registry_mount_source)
mounts.append(registry_mount)
''' ## Name ## '''
2021-11-16 14:23:57 +00:00
name = f'build-corpus_{corpus.id}'
''' ## Restart policy ## '''
restart_policy = docker.types.RestartPolicy()
try:
self.docker.services.create(
image,
command=command,
constraints=constraints,
labels=labels,
mounts=mounts,
name=name,
restart_policy=restart_policy
)
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Create service "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
2021-11-16 14:23:57 +00:00
return
corpus.status = 'queued'
def checkout_build_corpus_service(self, corpus):
2021-11-16 14:23:57 +00:00
service_name = f'build-corpus_{corpus.id}'
try:
service = self.docker.services.get(service_name)
2021-11-16 14:23:57 +00:00
except docker.errors.NotFound as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get service "{service_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
corpus.status = 'failed'
2021-11-16 14:23:57 +00:00
return
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get service "{service_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
2021-11-16 14:23:57 +00:00
service_tasks = service.tasks()
if not service_tasks:
return
task_state = service_tasks[0].get('Status').get('State')
if corpus.status == 'queued' and task_state != 'pending':
corpus.status = 'running'
return
elif corpus.status == 'running' and task_state == 'complete':
corpus.status = 'prepared'
elif corpus.status == 'running' and task_state == 'failed':
corpus.status = 'failed'
else:
return
# try:
# service.remove()
# except docker.errors.APIError as e:
# current_app.logger.error(
# f'Remove service "{service_name}" failed '
# f'due to "docker.errors.APIError": {e}'
# )
2020-11-09 15:14:19 +00:00
def create_cqpserver_container(self, corpus):
''' # Docker container settings # '''
''' ## Command ## '''
command = []
command.append(
'echo "host *;" > cqpserver.init'
' && '
'echo "user anonymous \\"\\";" >> cqpserver.init'
' && '
'cqpserver -I cqpserver.init'
)
''' ## Detach ## '''
detach = True
''' ## Entrypoint ## '''
entrypoint = ['bash', '-c']
''' ## Image ## '''
image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702'
''' ## Name ## '''
2021-11-16 14:23:57 +00:00
name = f'cqpserver_{corpus.id}'
''' ## Network ## '''
network = 'nopaque_default'
''' ## Volumes ## '''
volumes = []
''' ### Corpus data volume ### '''
data_volume_source = os.path.join(corpus.path, 'cwb', 'data')
data_volume_target = '/corpora/data'
data_volume = f'{data_volume_source}:{data_volume_target}:rw'
volumes.append(data_volume)
''' ### Corpus registry volume ### '''
registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_volume_target = '/usr/local/share/cwb/registry'
registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw' # noqa
volumes.append(registry_volume)
# Check if a cqpserver container already exists. If this is the case,
# remove it and create a new one
try:
container = self.docker.containers.get(name)
except docker.errors.NotFound:
pass
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
)
2020-11-09 15:14:19 +00:00
return
else:
try:
container.remove(force=True)
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Remove container "{name}" failed '
f'due to "docker.errors.APIError": {e}'
2020-11-19 11:31:29 +00:00
)
return
try:
2021-11-16 14:23:57 +00:00
self.docker.containers.run(
image,
command=command,
detach=detach,
entrypoint=entrypoint,
2021-11-16 14:23:57 +00:00
volumes=volumes,
name=name,
network=network
)
2021-11-16 14:23:57 +00:00
except docker.errors.ImageNotFound as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Run container "{name}" failed '
f'due to "docker.errors.ImageNotFound" error: {e}'
)
corpus.status = 'failed'
2021-11-16 14:23:57 +00:00
return
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Run container "{name}" failed '
f'due to "docker.errors.APIError" error: {e}'
2020-11-19 11:31:29 +00:00
)
2021-11-16 14:23:57 +00:00
return
corpus.status = 'analysing'
2020-11-09 15:14:19 +00:00
def checkout_analysing_corpus_container(self, corpus):
2021-11-16 14:23:57 +00:00
container_name = f'cqpserver_{corpus.id}'
try:
self.docker.containers.get(container_name)
2021-11-16 14:23:57 +00:00
except docker.errors.NotFound as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get container "{container_name}" failed '
f'due to "docker.errors.NotFound": {e}'
)
2021-11-16 14:23:57 +00:00
corpus.num_analysis_sessions = 0
corpus.status = 'prepared'
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
)
def remove_cqpserver_container(self, corpus):
2021-11-16 14:23:57 +00:00
container_name = f'cqpserver_{corpus.id}'
try:
container = self.docker.containers.get(container_name)
except docker.errors.NotFound:
2021-11-16 14:23:57 +00:00
corpus.status = 'prepared'
return
except docker.errors.APIError as e:
current_app.logger.error(
2021-11-16 14:23:57 +00:00
f'Get container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
2020-11-19 11:31:29 +00:00
)
return
2021-11-16 14:23:57 +00:00
try:
container.remove(force=True)
except docker.errors.APIError as e:
current_app.logger.error(
f'Remove container "{container_name}" failed '
f'due to "docker.errors.APIError": {e}'
2021-11-16 14:23:57 +00:00
)