diff --git a/app/__init__.py b/app/__init__.py index d9311c9a..cef8fa19 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,4 +1,5 @@ from config import Config +from docker import DockerClient from flask import Flask from flask_apscheduler import APScheduler from flask_assets import Environment @@ -11,18 +12,19 @@ from flask_sqlalchemy import SQLAlchemy from flask_hashids import Hashids -assets: Environment = Environment() -db: SQLAlchemy = SQLAlchemy() -hashids: Hashids = Hashids() -login: LoginManager = LoginManager() +assets = Environment() +db = SQLAlchemy() +docker_client = DockerClient() +hashids = Hashids() +login = LoginManager() login.login_view = 'auth.login' login.login_message = 'Please log in to access this page.' -mail: Mail = Mail() -migrate: Migrate = Migrate() -paranoid: Paranoid = Paranoid() +mail = Mail() +migrate = Migrate() +paranoid = Paranoid() paranoid.redirect_view = '/' -scheduler: APScheduler = APScheduler() # TODO: Use this! -socketio: SocketIO = SocketIO() +scheduler = APScheduler() +socketio = SocketIO() def create_app(config: Config = Config) -> Flask: @@ -30,6 +32,11 @@ def create_app(config: Config = Config) -> Flask: app: Flask = Flask(__name__) app.config.from_object(config) config.init_app(app) + docker_client.login( + username=app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'], + password=app.config['NOPAQUE_DOCKER_REGISTRY_PASSWORD'], + registry=app.config['NOPAQUE_DOCKER_REGISTRY'] + ) assets.init_app(app) db.init_app(app) @@ -38,10 +45,11 @@ def create_app(config: Config = Config) -> Flask: mail.init_app(app) migrate.init_app(app, db) paranoid.init_app(app) + scheduler.init_app(app) socketio.init_app(app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI']) # noqa - from app import socketio_event_listeners - from app import sqlalchemy_event_listeners + from . import tasks + tasks.register(app, scheduler) from .admin import bp as admin_blueprint app.register_blueprint(admin_blueprint, url_prefix='/admin') diff --git a/app/cli.py b/app/cli.py index 64cf4fb7..2692996d 100644 --- a/app/cli.py +++ b/app/cli.py @@ -1,7 +1,6 @@ +from app.models import Role, User, TesseractOCRModel, TranskribusHTRModel from flask import current_app from flask_migrate import upgrade -from . import db -from .models import Corpus, Role, User, TesseractOCRModel, TranskribusHTRModel import click import os @@ -41,22 +40,6 @@ def register(app): current_app.logger.info('Insert/Update default TranskribusHTRModels') TranskribusHTRModel.insert_defaults() - @app.cli.group() - def daemon(): - ''' Daemon commands. ''' - pass - - @daemon.command('run') - def run_daemon(): - ''' Run daemon ''' - corpus: Corpus - for corpus in Corpus.query.filter(Corpus.num_analysis_sessions > 0): - corpus.num_analysis_sessions = 0 - db.session.commit() - from app.daemon import Daemon - daemon: Daemon = Daemon() - daemon.run() - @app.cli.group() def converter(): ''' Converter commands. ''' diff --git a/app/daemon/__init__.py b/app/daemon/__init__.py index 84ed0efe..3d038dce 100644 --- a/app/daemon/__init__.py +++ b/app/daemon/__init__.py @@ -1,23 +1,9 @@ from app import db -from flask import current_app -from time import sleep -from .corpus_utils import CheckCorporaMixin -from .job_utils import CheckJobsMixin -import docker +from .corpus_utils import check_corpora +from .job_utils import check_jobs -class Daemon(CheckCorporaMixin, CheckJobsMixin): - def __init__(self): - self.docker = docker.from_env() - self.docker.login( - username=current_app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'], - password=current_app.config['NOPAQUE_DOCKER_REGISTRY_PASSWORD'], - registry=current_app.config['NOPAQUE_DOCKER_REGISTRY'] - ) - - def run(self): - while True: - self.check_corpora() - self.check_jobs() - db.session.commit() - sleep(1.5) +def daemon(): + check_corpora() + check_jobs() + db.session.commit() diff --git a/app/daemon/corpus_utils.py b/app/daemon/corpus_utils.py index 228eb64e..08712f1f 100644 --- a/app/daemon/corpus_utils.py +++ b/app/daemon/corpus_utils.py @@ -1,3 +1,4 @@ +from app import docker_client from app.models import Corpus, CorpusStatus from flask import current_app import docker @@ -5,250 +6,249 @@ import os import shutil -class CheckCorporaMixin: - def check_corpora(self): - corpora = Corpus.query.all() - for corpus in (x for x in corpora if x.status == CorpusStatus.SUBMITTED): # noqa - self.create_build_corpus_service(corpus) - for corpus in (x for x in corpora if x.status == CorpusStatus.QUEUED or x.status == CorpusStatus.BUILDING): # noqa - self.checkout_build_corpus_service(corpus) - for corpus in (x for x in corpora if x.status == CorpusStatus.BUILT and x.num_analysis_sessions > 0): # noqa - corpus.status = CorpusStatus.STARTING_ANALYSIS_SESSION - for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION and x.num_analysis_sessions == 0): # noqa - corpus.status = CorpusStatus.CANCELING_ANALYSIS_SESSION - for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION): # noqa - self.checkout_analysing_corpus_container(corpus) - for corpus in (x for x in corpora if x.status == CorpusStatus.STARTING_ANALYSIS_SESSION): # noqa - self.create_cqpserver_container(corpus) - for corpus in (x for x in corpora if x.status == CorpusStatus.CANCELING_ANALYSIS_SESSION): # noqa - self.remove_cqpserver_container(corpus) +def check_corpora(): + corpora = Corpus.query.all() + for corpus in [x for x in corpora if x.status == CorpusStatus.SUBMITTED]: + _create_build_corpus_service(corpus) + for corpus in [x for x in corpora if x.status in [CorpusStatus.QUEUED, CorpusStatus.BUILDING]]: + _checkout_build_corpus_service(corpus) + for corpus in [x for x in corpora if x.status == CorpusStatus.BUILT and x.num_analysis_sessions > 0]: + corpus.status = CorpusStatus.STARTING_ANALYSIS_SESSION + for corpus in [x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION and x.num_analysis_sessions == 0]: + corpus.status = CorpusStatus.CANCELING_ANALYSIS_SESSION + for corpus in [x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION]: + _checkout_analysing_corpus_container(corpus) + for corpus in [x for x in corpora if x.status == CorpusStatus.STARTING_ANALYSIS_SESSION]: + _create_cqpserver_container(corpus) + for corpus in [x for x in corpora if x.status == CorpusStatus.CANCELING_ANALYSIS_SESSION]: + _remove_cqpserver_container(corpus) - def create_build_corpus_service(self, corpus): - ''' # Docker service settings # ''' - ''' ## Command ## ''' - command = ['bash', '-c'] - command.append( - f'mkdir /corpora/data/nopaque_{corpus.id}' - ' && ' - 'cwb-encode' - ' -c utf8' - f' -d /corpora/data/nopaque_{corpus.id}' - ' -f /root/files/corpus.vrt' - f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}' - ' -P pos -P lemma -P simple_pos' - ' -S ent:0+type -S s:0' - ' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title' # noqa - ' -xsB -9' - ' && ' - f'cwb-make -V NOPAQUE_{corpus.id}' +def _create_build_corpus_service(corpus): + ''' # Docker service settings # ''' + ''' ## Command ## ''' + command = ['bash', '-c'] + command.append( + f'mkdir /corpora/data/nopaque_{corpus.id}' + ' && ' + 'cwb-encode' + ' -c utf8' + f' -d /corpora/data/nopaque_{corpus.id}' + ' -f /root/files/corpus.vrt' + f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}' + ' -P pos -P lemma -P simple_pos' + ' -S ent:0+type -S s:0' + ' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title' + ' -xsB -9' + ' && ' + f'cwb-make -V NOPAQUE_{corpus.id}' + ) + ''' ## Constraints ## ''' + constraints = ['node.role==worker'] + ''' ## Image ## ''' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702' + ''' ## Labels ## ''' + labels = { + 'origin': current_app.config['SERVER_NAME'], + 'type': 'corpus.build', + 'corpus_id': str(corpus.id) + } + ''' ## Mounts ## ''' + mounts = [] + ''' ### Data mount ### ''' + data_mount_source = os.path.join(corpus.path, 'cwb', 'data') + data_mount_target = '/corpora/data' + data_mount = f'{data_mount_source}:{data_mount_target}:rw' + # Make sure that their is no data in the data directory + shutil.rmtree(data_mount_source, ignore_errors=True) + os.makedirs(data_mount_source) + mounts.append(data_mount) + ''' ### File mount ### ''' + file_mount_source = os.path.join(corpus.path, 'cwb', 'corpus.vrt') + file_mount_target = '/root/files/corpus.vrt' + file_mount = f'{file_mount_source}:{file_mount_target}:ro' + mounts.append(file_mount) + ''' ### Registry mount ### ''' + registry_mount_source = os.path.join(corpus.path, 'cwb', 'registry') + registry_mount_target = '/usr/local/share/cwb/registry' + registry_mount = f'{registry_mount_source}:{registry_mount_target}:rw' + # Make sure that their is no data in the registry directory + shutil.rmtree(registry_mount_source, ignore_errors=True) + os.makedirs(registry_mount_source) + mounts.append(registry_mount) + ''' ## Name ## ''' + name = f'build-corpus_{corpus.id}' + ''' ## Restart policy ## ''' + restart_policy = docker.types.RestartPolicy() + try: + docker_client.services.create( + image, + command=command, + constraints=constraints, + labels=labels, + mounts=mounts, + name=name, + restart_policy=restart_policy, + user='0:0' ) - ''' ## Constraints ## ''' - constraints = ['node.role==worker'] - ''' ## Image ## ''' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702' - ''' ## Labels ## ''' - labels = { - 'origin': current_app.config['SERVER_NAME'], - 'type': 'corpus.build', - 'corpus_id': str(corpus.id) - } - ''' ## Mounts ## ''' - mounts = [] - ''' ### Data mount ### ''' - data_mount_source = os.path.join(corpus.path, 'cwb', 'data') - data_mount_target = '/corpora/data' - data_mount = f'{data_mount_source}:{data_mount_target}:rw' - # Make sure that their is no data in the data directory - shutil.rmtree(data_mount_source, ignore_errors=True) - os.makedirs(data_mount_source) - mounts.append(data_mount) - ''' ### File mount ### ''' - file_mount_source = os.path.join(corpus.path, 'cwb', 'corpus.vrt') - file_mount_target = '/root/files/corpus.vrt' - file_mount = f'{file_mount_source}:{file_mount_target}:ro' - mounts.append(file_mount) - ''' ### Registry mount ### ''' - registry_mount_source = os.path.join(corpus.path, 'cwb', 'registry') - registry_mount_target = '/usr/local/share/cwb/registry' - registry_mount = f'{registry_mount_source}:{registry_mount_target}:rw' - # Make sure that their is no data in the registry directory - shutil.rmtree(registry_mount_source, ignore_errors=True) - os.makedirs(registry_mount_source) - mounts.append(registry_mount) - ''' ## Name ## ''' - name = f'build-corpus_{corpus.id}' - ''' ## Restart policy ## ''' - restart_policy = docker.types.RestartPolicy() - try: - self.docker.services.create( - image, - command=command, - constraints=constraints, - labels=labels, - mounts=mounts, - name=name, - restart_policy=restart_policy, - user='0:0' - ) - except docker.errors.APIError as e: - current_app.logger.error( - f'Create service "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - corpus.status = CorpusStatus.QUEUED - - def checkout_build_corpus_service(self, corpus): - service_name = f'build-corpus_{corpus.id}' - try: - service = self.docker.services.get(service_name) - except docker.errors.NotFound as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.NotFound": {e}' - ) - corpus.status = CorpusStatus.FAILED - return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - service_tasks = service.tasks() - if not service_tasks: - return - task_state = service_tasks[0].get('Status').get('State') - if corpus.status == CorpusStatus.QUEUED and task_state != 'pending': # noqa - corpus.status = CorpusStatus.BUILDING - return - elif corpus.status == CorpusStatus.BUILDING and task_state == 'complete': # noqa - corpus.status = CorpusStatus.BUILT - elif corpus.status == CorpusStatus.BUILDING and task_state == 'failed': # noqa - corpus.status = CorpusStatus.FAILED - else: - return - try: - service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - - def create_cqpserver_container(self, corpus): - ''' # Docker container settings # ''' - ''' ## Command ## ''' - command = [] - command.append( - 'echo "host *;" > cqpserver.init' - ' && ' - 'echo "user anonymous \\"\\";" >> cqpserver.init' - ' && ' - 'cqpserver -I cqpserver.init' + except docker.errors.APIError as e: + current_app.logger.error( + f'Create service "{name}" failed ' + f'due to "docker.errors.APIError": {e}' ) - ''' ## Detach ## ''' - detach = True - ''' ## Entrypoint ## ''' - entrypoint = ['bash', '-c'] - ''' ## Image ## ''' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702' - ''' ## Name ## ''' - name = f'cqpserver_{corpus.id}' - ''' ## Network ## ''' - network = 'nopaque_default' - ''' ## Volumes ## ''' - volumes = [] - ''' ### Corpus data volume ### ''' - data_volume_source = os.path.join(corpus.path, 'cwb', 'data') - data_volume_target = '/corpora/data' - data_volume = f'{data_volume_source}:{data_volume_target}:rw' - volumes.append(data_volume) - ''' ### Corpus registry volume ### ''' - registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry') - registry_volume_target = '/usr/local/share/cwb/registry' - registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw' # noqa - volumes.append(registry_volume) - # Check if a cqpserver container already exists. If this is the case, - # remove it and create a new one - try: - container = self.docker.containers.get(name) - except docker.errors.NotFound: - pass - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - else: - try: - container.remove(force=True) - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove container "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - try: - self.docker.containers.run( - image, - command=command, - detach=detach, - entrypoint=entrypoint, - name=name, - network=network, - user='0:0', - volumes=volumes - ) - except docker.errors.ImageNotFound as e: - current_app.logger.error( - f'Run container "{name}" failed ' - f'due to "docker.errors.ImageNotFound" error: {e}' - ) - corpus.status = CorpusStatus.FAILED - return - except docker.errors.APIError as e: - current_app.logger.error( - f'Run container "{name}" failed ' - f'due to "docker.errors.APIError" error: {e}' - ) - return - corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION + return + corpus.status = CorpusStatus.QUEUED - def checkout_analysing_corpus_container(self, corpus): - container_name = f'cqpserver_{corpus.id}' - try: - self.docker.containers.get(container_name) - except docker.errors.NotFound as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.NotFound": {e}' - ) - corpus.num_analysis_sessions = 0 - corpus.status = CorpusStatus.BUILT - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) +def _checkout_build_corpus_service(corpus): + service_name = f'build-corpus_{corpus.id}' + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound as e: + current_app.logger.error( + f'Get service "{service_name}" failed ' + f'due to "docker.errors.NotFound": {e}' + ) + corpus.status = CorpusStatus.FAILED + return + except docker.errors.APIError as e: + current_app.logger.error( + f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if corpus.status == CorpusStatus.QUEUED and task_state != 'pending': + corpus.status = CorpusStatus.BUILDING + return + elif corpus.status == CorpusStatus.BUILDING and task_state == 'complete': + corpus.status = CorpusStatus.BUILT + elif corpus.status == CorpusStatus.BUILDING and task_state == 'failed': + corpus.status = CorpusStatus.FAILED + else: + return + try: + service.remove() + except docker.errors.APIError as e: + current_app.logger.error( + f'Remove service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) - def remove_cqpserver_container(self, corpus): - container_name = f'cqpserver_{corpus.id}' - try: - container = self.docker.containers.get(container_name) - except docker.errors.NotFound: - corpus.status = CorpusStatus.BUILT - return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return +def _create_cqpserver_container(corpus): + ''' # Docker container settings # ''' + ''' ## Command ## ''' + command = [] + command.append( + 'echo "host *;" > cqpserver.init' + ' && ' + 'echo "user anonymous \\"\\";" >> cqpserver.init' + ' && ' + 'cqpserver -I cqpserver.init' + ) + ''' ## Detach ## ''' + detach = True + ''' ## Entrypoint ## ''' + entrypoint = ['bash', '-c'] + ''' ## Image ## ''' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}cwb:r1702' + ''' ## Name ## ''' + name = f'cqpserver_{corpus.id}' + ''' ## Network ## ''' + network = 'nopaque_default' + ''' ## Volumes ## ''' + volumes = [] + ''' ### Corpus data volume ### ''' + data_volume_source = os.path.join(corpus.path, 'cwb', 'data') + data_volume_target = '/corpora/data' + data_volume = f'{data_volume_source}:{data_volume_target}:rw' + volumes.append(data_volume) + ''' ### Corpus registry volume ### ''' + registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry') + registry_volume_target = '/usr/local/share/cwb/registry' + registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw' + volumes.append(registry_volume) + # Check if a cqpserver container already exists. If this is the case, + # remove it and create a new one + try: + container = docker_client.containers.get(name) + except docker.errors.NotFound: + pass + except docker.errors.APIError as e: + current_app.logger.error( + f'Get container "{name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + else: try: container.remove(force=True) except docker.errors.APIError as e: current_app.logger.error( - f'Remove container "{container_name}" failed ' + f'Remove container "{name}" failed ' f'due to "docker.errors.APIError": {e}' ) + return + try: + docker_client.containers.run( + image, + command=command, + detach=detach, + entrypoint=entrypoint, + name=name, + network=network, + user='0:0', + volumes=volumes + ) + except docker.errors.ImageNotFound as e: + current_app.logger.error( + f'Run container "{name}" failed ' + f'due to "docker.errors.ImageNotFound" error: {e}' + ) + corpus.status = CorpusStatus.FAILED + return + except docker.errors.APIError as e: + current_app.logger.error( + f'Run container "{name}" failed ' + f'due to "docker.errors.APIError" error: {e}' + ) + return + corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION + +def _checkout_analysing_corpus_container(corpus): + container_name = f'cqpserver_{corpus.id}' + try: + docker_client.containers.get(container_name) + except docker.errors.NotFound as e: + current_app.logger.error( + f'Get container "{container_name}" failed ' + f'due to "docker.errors.NotFound": {e}' + ) + corpus.num_analysis_sessions = 0 + corpus.status = CorpusStatus.BUILT + except docker.errors.APIError as e: + current_app.logger.error( + f'Get container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + +def _remove_cqpserver_container(corpus): + container_name = f'cqpserver_{corpus.id}' + try: + container = docker_client.containers.get(container_name) + except docker.errors.NotFound: + corpus.status = CorpusStatus.BUILT + return + except docker.errors.APIError as e: + current_app.logger.error( + f'Get container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + try: + container.remove(force=True) + except docker.errors.APIError as e: + current_app.logger.error( + f'Remove container "{container_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) diff --git a/app/daemon/job_utils.py b/app/daemon/job_utils.py index 5f05681f..9624f253 100644 --- a/app/daemon/job_utils.py +++ b/app/daemon/job_utils.py @@ -1,4 +1,4 @@ -from app import db +from app import db, docker_client from app.models import ( Job, JobResult, @@ -15,217 +15,216 @@ import os import shutil -class CheckJobsMixin: - def check_jobs(self): - jobs = Job.query.all() - for job in (x for x in jobs if x.status == JobStatus.SUBMITTED): - self.create_job_service(job) - for job in (x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]): # noqa - self.checkout_job_service(job) - for job in (x for x in jobs if x.status == JobStatus.CANCELING): - self.remove_job_service(job) +def check_jobs(): + jobs = Job.query.all() + for job in [x for x in jobs if x.status == JobStatus.SUBMITTED]: + _create_job_service(job) + for job in [x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]]: + _checkout_job_service(job) + for job in [x for x in jobs if x.status == JobStatus.CANCELING]: + _remove_job_service(job) - def create_job_service(self, job): - ''' # Docker service settings # ''' - ''' ## Service specific settings ## ''' - if job.service == 'file-setup-pipeline': - mem_mb = 512 - n_cores = 2 - executable = 'file-setup-pipeline' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup-pipeline:v{job.service_version}' # noqa - elif job.service == 'tesseract-ocr-pipeline': - mem_mb = 1024 - n_cores = 4 - executable = 'tesseract-ocr-pipeline' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}tesseract-ocr-pipeline:v{job.service_version}' # noqa - elif job.service == 'transkribus-htr-pipeline': - mem_mb = 1024 - n_cores = 4 - executable = 'transkribus-htr-pipeline' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}transkribus-htr-pipeline:v{job.service_version}' # noqa - elif job.service == 'spacy-nlp-pipeline': - mem_mb = 1024 - n_cores = 1 - executable = 'spacy-nlp-pipeline' - image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}spacy-nlp-pipeline:v{job.service_version}' # noqa - ''' ## Command ## ''' - command = f'{executable} -i /input -o /output' - command += ' --log-dir /logs' - command += f' --mem-mb {mem_mb}' - command += f' --n-cores {n_cores}' - if job.service == 'spacy-nlp-pipeline': - command += f' -m {job.service_args["model"]}' - if 'encoding_detection' in job.service_args and job.service_args['encoding_detection']: # noqa - command += ' --check-encoding' - elif job.service == 'tesseract-ocr-pipeline': - command += f' -m {job.service_args["model"]}' - if 'binarization' in job.service_args and job.service_args['binarization']: - command += ' --binarize' - elif job.service == 'transkribus-htr-pipeline': - transkribus_htr_model = TranskribusHTRModel.query.get(job.service_args['model']) - command += f' -m {transkribus_htr_model.transkribus_model_id}' - readcoop_username = current_app.config.get('NOPAQUE_READCOOP_USERNAME') - command += f' --readcoop-username "{readcoop_username}"' - readcoop_password = current_app.config.get('NOPAQUE_READCOOP_PASSWORD') - command += f' --readcoop-password "{readcoop_password}"' - if 'binarization' in job.service_args and job.service_args['binarization']: - command += ' --binarize' - ''' ## Constraints ## ''' - constraints = ['node.role==worker'] - ''' ## Labels ## ''' - labels = { - 'origin': current_app.config['SERVER_NAME'], - 'type': 'job', - 'job_id': str(job.id) - } - ''' ## Mounts ## ''' - mounts = [] - ''' ### Input mount(s) ### ''' - input_mount_target_base = '/input' - if job.service == 'file-setup-pipeline': - input_mount_target_base += f'/{secure_filename(job.title)}' - for job_input in job.inputs: - input_mount_source = job_input.path - input_mount_target = f'{input_mount_target_base}/{job_input.filename}' # noqa - input_mount = f'{input_mount_source}:{input_mount_target}:ro' - mounts.append(input_mount) - if job.service == 'tesseract-ocr-pipeline': - model = TesseractOCRModel.query.get(job.service_args['model']) - if model is None: - job.status = JobStatus.FAILED - return - models_mount_source = model.path - models_mount_target = f'/usr/local/share/tessdata/{model.filename}' - models_mount = f'{models_mount_source}:{models_mount_target}:ro' - mounts.append(models_mount) - ''' ### Output mount ### ''' - output_mount_source = os.path.join(job.path, 'results') - output_mount_target = '/output' - output_mount = f'{output_mount_source}:{output_mount_target}:rw' - # Make sure that their is no data in the output directory - shutil.rmtree(output_mount_source, ignore_errors=True) - os.makedirs(output_mount_source) - mounts.append(output_mount) - ''' ### Pipeline data mount ### ''' - pyflow_data_mount_source = os.path.join(job.path, 'pipeline_data') - pyflow_data_mount_target = '/logs/pyflow.data' - pyflow_data_mount = f'{pyflow_data_mount_source}:{pyflow_data_mount_target}:rw' # noqa - # Make sure that their is no data in the output directory - shutil.rmtree(pyflow_data_mount_source, ignore_errors=True) - os.makedirs(pyflow_data_mount_source) - mounts.append(pyflow_data_mount) - ''' ## Name ## ''' - name = f'job_{job.id}' - ''' ## Resources ## ''' - resources = docker.types.Resources( - cpu_reservation=n_cores * (10 ** 9), - mem_reservation=mem_mb * (10 ** 6) +def _create_job_service(job): + ''' # Docker service settings # ''' + ''' ## Service specific settings ## ''' + if job.service == 'file-setup-pipeline': + mem_mb = 512 + n_cores = 2 + executable = 'file-setup-pipeline' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}file-setup-pipeline:v{job.service_version}' + elif job.service == 'tesseract-ocr-pipeline': + mem_mb = 1024 + n_cores = 4 + executable = 'tesseract-ocr-pipeline' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}tesseract-ocr-pipeline:v{job.service_version}' + elif job.service == 'transkribus-htr-pipeline': + mem_mb = 1024 + n_cores = 4 + executable = 'transkribus-htr-pipeline' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}transkribus-htr-pipeline:v{job.service_version}' + elif job.service == 'spacy-nlp-pipeline': + mem_mb = 1024 + n_cores = 1 + executable = 'spacy-nlp-pipeline' + image = f'{current_app.config["NOPAQUE_DOCKER_IMAGE_PREFIX"]}spacy-nlp-pipeline:v{job.service_version}' + ''' ## Command ## ''' + command = f'{executable} -i /input -o /output' + command += ' --log-dir /logs' + command += f' --mem-mb {mem_mb}' + command += f' --n-cores {n_cores}' + if job.service == 'spacy-nlp-pipeline': + command += f' -m {job.service_args["model"]}' + if 'encoding_detection' in job.service_args and job.service_args['encoding_detection']: + command += ' --check-encoding' + elif job.service == 'tesseract-ocr-pipeline': + command += f' -m {job.service_args["model"]}' + if 'binarization' in job.service_args and job.service_args['binarization']: + command += ' --binarize' + elif job.service == 'transkribus-htr-pipeline': + transkribus_htr_model = TranskribusHTRModel.query.get(job.service_args['model']) + command += f' -m {transkribus_htr_model.transkribus_model_id}' + readcoop_username = current_app.config.get('NOPAQUE_READCOOP_USERNAME') + command += f' --readcoop-username "{readcoop_username}"' + readcoop_password = current_app.config.get('NOPAQUE_READCOOP_PASSWORD') + command += f' --readcoop-password "{readcoop_password}"' + if 'binarization' in job.service_args and job.service_args['binarization']: + command += ' --binarize' + ''' ## Constraints ## ''' + constraints = ['node.role==worker'] + ''' ## Labels ## ''' + labels = { + 'origin': current_app.config['SERVER_NAME'], + 'type': 'job', + 'job_id': str(job.id) + } + ''' ## Mounts ## ''' + mounts = [] + ''' ### Input mount(s) ### ''' + input_mount_target_base = '/input' + if job.service == 'file-setup-pipeline': + input_mount_target_base += f'/{secure_filename(job.title)}' + for job_input in job.inputs: + input_mount_source = job_input.path + input_mount_target = f'{input_mount_target_base}/{job_input.filename}' + input_mount = f'{input_mount_source}:{input_mount_target}:ro' + mounts.append(input_mount) + if job.service == 'tesseract-ocr-pipeline': + model = TesseractOCRModel.query.get(job.service_args['model']) + if model is None: + job.status = JobStatus.FAILED + return + models_mount_source = model.path + models_mount_target = f'/usr/local/share/tessdata/{model.filename}' + models_mount = f'{models_mount_source}:{models_mount_target}:ro' + mounts.append(models_mount) + ''' ### Output mount ### ''' + output_mount_source = os.path.join(job.path, 'results') + output_mount_target = '/output' + output_mount = f'{output_mount_source}:{output_mount_target}:rw' + # Make sure that their is no data in the output directory + shutil.rmtree(output_mount_source, ignore_errors=True) + os.makedirs(output_mount_source) + mounts.append(output_mount) + ''' ### Pipeline data mount ### ''' + pyflow_data_mount_source = os.path.join(job.path, 'pipeline_data') + pyflow_data_mount_target = '/logs/pyflow.data' + pyflow_data_mount = f'{pyflow_data_mount_source}:{pyflow_data_mount_target}:rw' + # Make sure that their is no data in the output directory + shutil.rmtree(pyflow_data_mount_source, ignore_errors=True) + os.makedirs(pyflow_data_mount_source) + mounts.append(pyflow_data_mount) + ''' ## Name ## ''' + name = f'job_{job.id}' + ''' ## Resources ## ''' + resources = docker.types.Resources( + cpu_reservation=n_cores * (10 ** 9), + mem_reservation=mem_mb * (10 ** 6) + ) + ''' ## Restart policy ## ''' + restart_policy = docker.types.RestartPolicy() + try: + docker_client.services.create( + image, + command=command, + constraints=constraints, + labels=labels, + mounts=mounts, + name=name, + resources=resources, + restart_policy=restart_policy, + user='0:0' ) - ''' ## Restart policy ## ''' - restart_policy = docker.types.RestartPolicy() - try: - self.docker.services.create( - image, - command=command, - constraints=constraints, - labels=labels, - mounts=mounts, - name=name, - resources=resources, - restart_policy=restart_policy, - user='0:0' - ) - except docker.errors.APIError as e: - current_app.logger.error( - f'Create service "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - job.status = JobStatus.QUEUED + except docker.errors.APIError as e: + current_app.logger.error( + f'Create service "{name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + job.status = JobStatus.QUEUED - def checkout_job_service(self, job): - service_name = f'job_{job.id}' - try: - service = self.docker.services.get(service_name) - except docker.errors.NotFound as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.NotFound": {e}' +def _checkout_job_service(job): + service_name = f'job_{job.id}' + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound as e: + current_app.logger.error( + f'Get service "{service_name}" failed ' + f'due to "docker.errors.NotFound": {e}' + ) + job.status = JobStatus.FAILED + return + except docker.errors.APIError as e: + current_app.logger.error( + f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + service_tasks = service.tasks() + if not service_tasks: + return + task_state = service_tasks[0].get('Status').get('State') + if job.status == JobStatus.QUEUED and task_state != 'pending': + job.status = JobStatus.RUNNING + return + elif job.status == JobStatus.RUNNING and task_state == 'complete': + job.status = JobStatus.COMPLETED + results_dir = os.path.join(job.path, 'results') + with open(os.path.join(results_dir, 'outputs.json')) as f: + outputs = json.load(f) + for output in outputs: + filename = os.path.basename(output['file']) + job_result = JobResult( + filename=filename, + job=job, + mimetype=output['mimetype'] ) - job.status = JobStatus.FAILED - return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - service_tasks = service.tasks() - if not service_tasks: - return - task_state = service_tasks[0].get('Status').get('State') - if job.status == JobStatus.QUEUED and task_state != 'pending': - job.status = JobStatus.RUNNING - return - elif job.status == JobStatus.RUNNING and task_state == 'complete': # noqa - job.status = JobStatus.COMPLETED - results_dir = os.path.join(job.path, 'results') - with open(os.path.join(results_dir, 'outputs.json')) as f: - outputs = json.load(f) - for output in outputs: - filename = os.path.basename(output['file']) - job_result = JobResult( - filename=filename, - job=job, - mimetype=output['mimetype'] - ) - if 'description' in output: - job_result.description = output['description'] - db.session.add(job_result) - db.session.flush(objects=[job_result]) - db.session.refresh(job_result) - os.rename( - os.path.join(results_dir, output['file']), - job_result.path - ) - elif job.status == JobStatus.RUNNING and task_state == 'failed': - job.status = JobStatus.FAILED - else: - return - job.end_date = datetime.utcnow() - try: - service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' + if 'description' in output: + job_result.description = output['description'] + db.session.add(job_result) + db.session.flush(objects=[job_result]) + db.session.refresh(job_result) + os.rename( + os.path.join(results_dir, output['file']), + job_result.path ) + elif job.status == JobStatus.RUNNING and task_state == 'failed': + job.status = JobStatus.FAILED + else: + return + job.end_date = datetime.utcnow() + try: + service.remove() + except docker.errors.APIError as e: + current_app.logger.error( + f'Remove service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) - def remove_job_service(self, job): - service_name = f'job_{job.id}' - try: - service = self.docker.services.get(service_name) - except docker.errors.NotFound: - job.status = JobStatus.CANCELED - return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - try: - service.update(mounts=None) - except docker.errors.APIError as e: - current_app.logger.error( - f'Update service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) - return - try: - service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove "{service_name}" service failed ' - f'due to "docker.errors.APIError": {e}' - ) +def _remove_job_service(job): + service_name = f'job_{job.id}' + try: + service = docker_client.services.get(service_name) + except docker.errors.NotFound: + job.status = JobStatus.CANCELED + return + except docker.errors.APIError as e: + current_app.logger.error( + f'Get service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + try: + service.update(mounts=None) + except docker.errors.APIError as e: + current_app.logger.error( + f'Update service "{service_name}" failed ' + f'due to "docker.errors.APIError": {e}' + ) + return + try: + service.remove() + except docker.errors.APIError as e: + current_app.logger.error( + f'Remove "{service_name}" service failed ' + f'due to "docker.errors.APIError": {e}' + ) diff --git a/app/decorators.py b/app/decorators.py index 8c1ba90a..c4767e80 100644 --- a/app/decorators.py +++ b/app/decorators.py @@ -1,8 +1,8 @@ +from app.models import Permission from flask import abort, current_app from flask_login import current_user from functools import wraps from threading import Thread -from .models import Permission def permission_required(permission): diff --git a/app/email.py b/app/email.py index 50c41caa..b002defa 100644 --- a/app/email.py +++ b/app/email.py @@ -1,27 +1,27 @@ -from flask import current_app, render_template +from app import mail +from flask import current_app, Flask, render_template from flask_mail import Message -from typing import Any, Text -from . import mail -from .decorators import background +from threading import Thread +from typing import Any -def create_message( - recipient: str, - subject: str, - template: str, - **kwargs: Any -) -> Message: +def create_message(recipient: str, subject: str, template: str, **kwargs: Any) -> Message: subject_prefix: str = current_app.config['NOPAQUE_MAIL_SUBJECT_PREFIX'] msg: Message = Message( - f'{subject_prefix} {subject}', - recipients=[recipient] + body=render_template(f'{template}.txt.j2', **kwargs), + html=render_template(f'{template}.html.j2', **kwargs), + recipients=[recipient], + subject=f'{subject_prefix} {subject}' ) - msg.body = render_template(f'{template}.txt.j2', **kwargs) - msg.html = render_template(f'{template}.html.j2', **kwargs) return msg -@background -def send(msg: Message, *args, **kwargs): - with kwargs['app'].app_context(): +def _send(app: Flask, msg): + with app.app_context(): mail.send(msg) + + +def send(msg: Message, *args, **kwargs): + thread = Thread(target=_send, args=[current_app._get_current_object(), msg]) + thread.start() + return thread diff --git a/app/jobs/routes.py b/app/jobs/routes.py index ac484958..8d78aa6b 100644 --- a/app/jobs/routes.py +++ b/app/jobs/routes.py @@ -58,6 +58,23 @@ def download_job_input(job_id, job_input_id): ) +@bp.route('//log') +@login_required +@admin_required +def job_log(job_id): + job = Job.query.get_or_404(job_id) + if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: + flash( + f'Can\'t restart job "{job.title}": Status is not "Completed/Failed"', # noqa + category='error' + ) + return send_from_directory( + attachment_filename=f'job_{job.hashid}_log.txt', + directory=os.path.join(job.path, 'pipeline_data'), + filename=os.path.join('logs', 'pyflow_log.txt') + ) + + @bp.route('//restart') @login_required @admin_required diff --git a/app/main/__init__.py b/app/main/__init__.py index 65630224..aa4f232e 100644 --- a/app/main/__init__.py +++ b/app/main/__init__.py @@ -2,4 +2,4 @@ from flask import Blueprint bp = Blueprint('main', __name__) -from . import routes +from . import events, routes diff --git a/app/socketio_event_listeners.py b/app/main/events.py similarity index 95% rename from app/socketio_event_listeners.py rename to app/main/events.py index 92cfc1fa..c727d171 100644 --- a/app/socketio_event_listeners.py +++ b/app/main/events.py @@ -1,8 +1,8 @@ from app import hashids, socketio -from app.decorators import socketio_login_required -from app.models import TesseractOCRModel, TranskribusHTRModel, User +from app.models import User from flask_login import current_user from flask_socketio import join_room +from app.decorators import socketio_login_required @socketio.on('users.user.get') diff --git a/app/models.py b/app/models.py index f5cd0009..94b57453 100644 --- a/app/models.py +++ b/app/models.py @@ -1,8 +1,8 @@ -from app import db, login +from app import db, login, mail, socketio from app.converters.vrt import normalize_vrt_file -from app.sqlalchemy_type_decorators import ContainerColumn, IntEnumColumn +from app.email import create_message from datetime import datetime, timedelta -from enum import IntEnum +from enum import Enum, IntEnum from flask import current_app, url_for from flask_hashids import HashidMixin from flask_login import UserMixin @@ -20,9 +20,57 @@ import yaml TRANSKRIBUS_HTR_MODELS = \ - json.loads(requests.get('https://transkribus.eu/TrpServer/rest/models/text').content)['trpModelMetadata'] # noqa + json.loads(requests.get('https://transkribus.eu/TrpServer/rest/models/text', params={'docType': 'handwritten'}).content)['trpModelMetadata'] # noqa +############################################################################## +# enums # +############################################################################## +# region enums +class CorpusStatus(IntEnum): + UNPREPARED = 1 + SUBMITTED = 2 + QUEUED = 3 + BUILDING = 4 + BUILT = 5 + FAILED = 6 + STARTING_ANALYSIS_SESSION = 7 + RUNNING_ANALYSIS_SESSION = 8 + CANCELING_ANALYSIS_SESSION = 9 + + +class JobStatus(IntEnum): + INITIALIZING = 1 + SUBMITTED = 2 + QUEUED = 3 + RUNNING = 4 + CANCELING = 5 + CANCELED = 6 + COMPLETED = 7 + FAILED = 8 + + +class Permission(IntEnum): + ''' + Defines User permissions as integers by the power of 2. User permission + can be evaluated using the bitwise operator &. + ''' + ADMINISTRATE = 1 + CONTRIBUTE = 2 + USE_API = 4 + + +class UserSettingJobStatusMailNotificationLevel(IntEnum): + NONE = 1 + END = 2 + ALL = 3 +# endregion enums + + +############################################################################## +# mixins # +############################################################################## +# region mixins class FileMixin: ''' Mixin for db.Model classes. All file related models should use this. @@ -39,18 +87,59 @@ class FileMixin: 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'mimetype': self.mimetype } +# endregion mixins -class Permission(IntEnum): - ''' - Defines User permissions as integers by the power of 2. User permission - can be evaluated using the bitwise operator &. - ''' - ADMINISTRATE = 1 - CONTRIBUTE = 2 - USE_API = 4 +############################################################################## +# type_decorators # +############################################################################## +# region type_decorators +class IntEnumColumn(db.TypeDecorator): + impl = db.Integer + + def __init__(self, enum_type, *args, **kwargs): + super().__init__(*args, **kwargs) + self.enum_type = enum_type + + def process_bind_param(self, value, dialect): + if isinstance(value, self.enum_type) and isinstance(value.value, int): + return value.value + elif isinstance(value, int): + return self.enum_type(value).value + else: + return TypeError() + + def process_result_value(self, value, dialect): + return self.enum_type(value) +class ContainerColumn(db.TypeDecorator): + impl = db.String + + def __init__(self, container_type, *args, **kwargs): + super().__init__(*args, **kwargs) + self.container_type = container_type + + def process_bind_param(self, value, dialect): + if isinstance(value, self.container_type): + return json.dumps(value) + elif ( + isinstance(value, str) + and isinstance(json.loads(value), self.container_type) + ): + return value + else: + return TypeError() + + def process_result_value(self, value, dialect): + return json.loads(value) +# endregion type_decorators + + +############################################################################## +# Models # +############################################################################## +# region models class Role(HashidMixin, db.Model): __tablename__ = 'roles' # Primary key @@ -123,12 +212,6 @@ class Role(HashidMixin, db.Model): db.session.commit() -class UserSettingJobStatusMailNotificationLevel(IntEnum): - NONE = 1 - END = 2 - ALL = 3 - - class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key @@ -449,7 +532,6 @@ class TranskribusHTRModel(HashidMixin, db.Model): 'user_id': self.user.hashid, 'shared': self.shared, 'transkribus_model_id': self.transkribus_model_id, - 'transkribus_name': self.transkribus_name } if backrefs: dict_tesseract_ocr_model['user'] = \ @@ -466,20 +548,14 @@ class TranskribusHTRModel(HashidMixin, db.Model): # and 'creator' in m and m['creator'] == 'Transkribus Team' # and 'docType' in m and m['docType'] == 'handwritten' # ] - models = [ - m for m in TRANSKRIBUS_HTR_MODELS - if m['modelId'] in [35909, 33744, 33597, 29820, 37789, 13685, 37855, 26124, 37738, 30919, 34763] - ] - for m in models: + for m in TRANSKRIBUS_HTR_MODELS: model = TranskribusHTRModel.query.filter_by(transkribus_model_id=m['modelId']).first() # noqa if model is not None: model.shared = True model.transkribus_model_id = m['modelId'] - model.transkribus_name = m['name'] continue model = TranskribusHTRModel( shared=True, - transkribus_name=m['name'], transkribus_model_id=m['modelId'], user=user, ) @@ -605,17 +681,6 @@ class JobResult(FileMixin, HashidMixin, db.Model): return self.job.user_id -class JobStatus(IntEnum): - INITIALIZING = 1 - SUBMITTED = 2 - QUEUED = 3 - RUNNING = 4 - CANCELING = 5 - CANCELED = 6 - COMPLETED = 7 - FAILED = 8 - - class Job(HashidMixin, db.Model): ''' Class to define Jobs. @@ -828,18 +893,6 @@ class CorpusFile(FileMixin, HashidMixin, db.Model): return dict_corpus_file -class CorpusStatus(IntEnum): - UNPREPARED = 1 - SUBMITTED = 2 - QUEUED = 3 - BUILDING = 4 - BUILT = 5 - FAILED = 6 - STARTING_ANALYSIS_SESSION = 7 - RUNNING_ANALYSIS_SESSION = 8 - CANCELING_ANALYSIS_SESSION = 9 - - class Corpus(HashidMixin, db.Model): ''' Class to define a corpus. @@ -964,8 +1017,95 @@ class Corpus(HashidMixin, db.Model): for x in self.files } return dict_corpus +# endregion models +############################################################################## +# event_handlers # +############################################################################## +# region event_handlers +@db.event.listens_for(Corpus, 'after_delete') +@db.event.listens_for(CorpusFile, 'after_delete') +@db.event.listens_for(Job, 'after_delete') +@db.event.listens_for(JobInput, 'after_delete') +@db.event.listens_for(JobResult, 'after_delete') +def ressource_after_delete(mapper, connection, ressource): + jsonpatch = [{'op': 'remove', 'path': ressource.jsonpatch_path}] + room = f'users.{ressource.user_hashid}' + socketio.emit('users.patch', jsonpatch, room=room) + + +@db.event.listens_for(Corpus, 'after_insert') +@db.event.listens_for(CorpusFile, 'after_insert') +@db.event.listens_for(Job, 'after_insert') +@db.event.listens_for(JobInput, 'after_insert') +@db.event.listens_for(JobResult, 'after_insert') +def ressource_after_insert_handler(mapper, connection, ressource): + value = ressource.to_dict(backrefs=False, relationships=False) + for attr in mapper.relationships: + value[attr.key] = {} + jsonpatch = [ + {'op': 'add', 'path': ressource.jsonpatch_path, 'value': value} + ] + room = f'users.{ressource.user_hashid}' + socketio.emit('users.patch', jsonpatch, room=room) + + +@db.event.listens_for(Corpus, 'after_update') +@db.event.listens_for(CorpusFile, 'after_update') +@db.event.listens_for(Job, 'after_update') +@db.event.listens_for(JobInput, 'after_update') +@db.event.listens_for(JobResult, 'after_update') +def ressource_after_update_handler(mapper, connection, ressource): + jsonpatch = [] + for attr in db.inspect(ressource).attrs: + if attr.key in mapper.relationships: + continue + if not attr.load_history().has_changes(): + continue + if isinstance(attr.value, datetime): + value = attr.value.isoformat() + 'Z' + elif isinstance(attr.value, Enum): + value = attr.value.name + else: + value = attr.value + jsonpatch.append( + { + 'op': 'replace', + 'path': f'{ressource.jsonpatch_path}/{attr.key}', + 'value': value + } + ) + if jsonpatch: + room = f'users.{ressource.user_hashid}' + socketio.emit('users.patch', jsonpatch, room=room) + + +@db.event.listens_for(Job, 'after_update') +def job_after_update_handler(mapper, connection, job): + for attr in db.inspect(job).attrs: + if attr.key != 'status': + continue + if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.NONE: + return + if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.END: + if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: + return + msg = create_message( + job.user.email, + f'Status update for your Job "{job.title}"', + 'tasks/email/notification', + job=job + ) + mail.send(msg) +# endregion event_handlers + + +############################################################################## +# misc # +############################################################################## +# region misc @login.user_loader def load_user(user_id): return User.query.get(int(user_id)) +# endregion misc diff --git a/app/services/routes.py b/app/services/routes.py index a3386091..805dc9f4 100644 --- a/app/services/routes.py +++ b/app/services/routes.py @@ -144,7 +144,6 @@ def tesseract_ocr_pipeline(): x for x in TesseractOCRModel.query.filter().all() if version in x.compatible_service_versions and (x.shared == True or x.user == current_user) ] - current_app.logger.warning(tesseract_ocr_models) return render_template( 'services/tesseract_ocr_pipeline.html.j2', form=form, diff --git a/app/sqlalchemy_event_listeners.py b/app/sqlalchemy_event_listeners.py deleted file mode 100644 index 94470591..00000000 --- a/app/sqlalchemy_event_listeners.py +++ /dev/null @@ -1,87 +0,0 @@ -from app import db, mail, socketio -from app.email import create_message -from app.models import ( - Corpus, - CorpusFile, - Job, - JobInput, - JobResult, - JobStatus, - UserSettingJobStatusMailNotificationLevel -) -from datetime import datetime -from enum import Enum - - -@db.event.listens_for(Corpus, 'after_delete') -@db.event.listens_for(CorpusFile, 'after_delete') -@db.event.listens_for(Job, 'after_delete') -@db.event.listens_for(JobInput, 'after_delete') -@db.event.listens_for(JobResult, 'after_delete') -def ressource_after_delete(mapper, connection, ressource): - jsonpatch = [{'op': 'remove', 'path': ressource.jsonpatch_path}] - room = f'users.{ressource.user_hashid}' - socketio.emit('users.patch', jsonpatch, room=room) - - -@db.event.listens_for(Corpus, 'after_insert') -@db.event.listens_for(CorpusFile, 'after_insert') -@db.event.listens_for(Job, 'after_insert') -@db.event.listens_for(JobInput, 'after_insert') -@db.event.listens_for(JobResult, 'after_insert') -def ressource_after_insert_handler(mapper, connection, ressource): - value = ressource.to_dict(backrefs=False, relationships=False) - for attr in mapper.relationships: - value[attr.key] = {} - jsonpatch = [ - {'op': 'add', 'path': ressource.jsonpatch_path, 'value': value} - ] - room = f'users.{ressource.user_hashid}' - socketio.emit('users.patch', jsonpatch, room=room) - - -@db.event.listens_for(Corpus, 'after_update') -@db.event.listens_for(CorpusFile, 'after_update') -@db.event.listens_for(Job, 'after_update') -@db.event.listens_for(JobInput, 'after_update') -@db.event.listens_for(JobResult, 'after_update') -def ressource_after_update_handler(mapper, connection, ressource): - jsonpatch = [] - for attr in db.inspect(ressource).attrs: - if attr.key in mapper.relationships: - continue - if not attr.load_history().has_changes(): - continue - if isinstance(attr.value, datetime): - value = attr.value.isoformat() + 'Z' - elif isinstance(attr.value, Enum): - value = attr.value.name - else: - value = attr.value - jsonpatch.append( - { - 'op': 'replace', - 'path': f'{ressource.jsonpatch_path}/{attr.key}', - 'value': value - } - ) - if isinstance(ressource, Job) and attr.key == 'status': - _job_status_email_handler(ressource) - if jsonpatch: - room = f'users.{ressource.user_hashid}' - socketio.emit('users.patch', jsonpatch, room=room) - - -def _job_status_email_handler(job): - if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.NONE: # noqa - return - if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.END: # noqa - if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: - return - msg = create_message( - job.user.email, - f'Status update for your Job "{job.title}"', - 'tasks/email/notification', - job=job - ) - mail.send(msg) diff --git a/app/sqlalchemy_type_decorators.py b/app/sqlalchemy_type_decorators.py deleted file mode 100644 index ac97e308..00000000 --- a/app/sqlalchemy_type_decorators.py +++ /dev/null @@ -1,43 +0,0 @@ -from app import db -import json - - -class IntEnumColumn(db.TypeDecorator): - impl = db.Integer - - def __init__(self, enum_type, *args, **kwargs): - super().__init__(*args, **kwargs) - self.enum_type = enum_type - - def process_bind_param(self, value, dialect): - if isinstance(value, self.enum_type) and isinstance(value.value, int): - return value.value - elif isinstance(value, int): - return self.enum_type(value).value - else: - return TypeError() - - def process_result_value(self, value, dialect): - return self.enum_type(value) - - -class ContainerColumn(db.TypeDecorator): - impl = db.String - - def __init__(self, container_type, *args, **kwargs): - super().__init__(*args, **kwargs) - self.container_type = container_type - - def process_bind_param(self, value, dialect): - if isinstance(value, self.container_type): - return json.dumps(value) - elif ( - isinstance(value, str) - and isinstance(json.loads(value), self.container_type) - ): - return value - else: - return TypeError() - - def process_result_value(self, value, dialect): - return json.loads(value) \ No newline at end of file diff --git a/app/static/js/RessourceDisplays/JobDisplay.js b/app/static/js/RessourceDisplays/JobDisplay.js index f00242a5..1c252837 100644 --- a/app/static/js/RessourceDisplays/JobDisplay.js +++ b/app/static/js/RessourceDisplays/JobDisplay.js @@ -69,6 +69,14 @@ class JobDisplay extends RessourceDisplay { element.classList.remove('hide'); } } + elements = this.displayElement.querySelectorAll('.job-log-trigger'); + for (element of elements) { + if (['COMPLETED', 'FAILED'].includes(status)) { + element.classList.remove('hide'); + } else { + element.classList.add('hide'); + } + } elements = this.displayElement.querySelectorAll('.job-restart-trigger'); for (element of elements) { if (['COMPLETED', 'FAILED'].includes(status)) { diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 00000000..a43f171b --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,9 @@ +from app.daemon import daemon + + +def register(app, scheduler): + if app.config['NOPAQUE_IS_PRIMARY_INSTANCE']: + @scheduler.task('interval', id='daemon', seconds=3) + def daemon_task(): + with app.app_context(): + daemon() diff --git a/app/templates/jobs/job.html.j2 b/app/templates/jobs/job.html.j2 index 7a2caa10..0ec70a3f 100644 --- a/app/templates/jobs/job.html.j2 +++ b/app/templates/jobs/job.html.j2 @@ -79,6 +79,7 @@
{% if current_user.is_administrator() %} + repeatLog repeatRestart {% endif %} @@ -143,6 +144,16 @@ {% block modals %} {{ super() }} + +