mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-03 20:02:47 +00:00 
			
		
		
		
	Change the APScheduler Logic and try to catch more errors in Daemon
This commit is contained in:
		@@ -33,7 +33,7 @@ def create_app(config: Config = Config) -> Flask:
 | 
			
		||||
    app.config.from_object(config)
 | 
			
		||||
    config.init_app(app)
 | 
			
		||||
    docker_client.login(
 | 
			
		||||
        username=app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'],
 | 
			
		||||
        app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'],
 | 
			
		||||
        password=app.config['NOPAQUE_DOCKER_REGISTRY_PASSWORD'],
 | 
			
		||||
        registry=app.config['NOPAQUE_DOCKER_REGISTRY']
 | 
			
		||||
    )
 | 
			
		||||
@@ -48,9 +48,6 @@ def create_app(config: Config = Config) -> Flask:
 | 
			
		||||
    scheduler.init_app(app)
 | 
			
		||||
    socketio.init_app(app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI'])  # noqa
 | 
			
		||||
 | 
			
		||||
    from . import tasks
 | 
			
		||||
    tasks.register(app, scheduler)
 | 
			
		||||
 | 
			
		||||
    from .admin import bp as admin_blueprint
 | 
			
		||||
    app.register_blueprint(admin_blueprint, url_prefix='/admin')
 | 
			
		||||
 | 
			
		||||
@@ -81,4 +78,7 @@ def create_app(config: Config = Config) -> Flask:
 | 
			
		||||
    from .settings import bp as settings_blueprint
 | 
			
		||||
    app.register_blueprint(settings_blueprint, url_prefix='/settings')
 | 
			
		||||
 | 
			
		||||
    from .users import bp as users_blueprint
 | 
			
		||||
    app.register_blueprint(users_blueprint, url_prefix='/users')
 | 
			
		||||
 | 
			
		||||
    return app
 | 
			
		||||
 
 | 
			
		||||
@@ -1,9 +1,11 @@
 | 
			
		||||
from app import db
 | 
			
		||||
from flask import Flask
 | 
			
		||||
from .corpus_utils import check_corpora
 | 
			
		||||
from .job_utils import check_jobs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def daemon():
 | 
			
		||||
    check_corpora()
 | 
			
		||||
    check_jobs()
 | 
			
		||||
    db.session.commit()
 | 
			
		||||
def daemon(app: Flask):
 | 
			
		||||
    with app.app_context():
 | 
			
		||||
        check_corpora()
 | 
			
		||||
        check_jobs()
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
 
 | 
			
		||||
@@ -90,11 +90,8 @@ def _create_build_corpus_service(corpus):
 | 
			
		||||
            restart_policy=restart_policy,
 | 
			
		||||
            user='0:0'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Create service "{name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Create service "{name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    corpus.status = CorpusStatus.QUEUED
 | 
			
		||||
 | 
			
		||||
@@ -103,17 +100,11 @@ def _checkout_build_corpus_service(corpus):
 | 
			
		||||
    try:
 | 
			
		||||
        service = docker_client.services.get(service_name)
 | 
			
		||||
    except docker.errors.NotFound as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
        )
 | 
			
		||||
        current_app.logger.error(f'Get service "{service_name}" failed: {e}')
 | 
			
		||||
        corpus.status = CorpusStatus.FAILED
 | 
			
		||||
        return
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get service "{service_name}" failed: {e}')
 | 
			
		||||
    service_tasks = service.tasks()
 | 
			
		||||
    if not service_tasks:
 | 
			
		||||
        return
 | 
			
		||||
@@ -129,11 +120,8 @@ def _checkout_build_corpus_service(corpus):
 | 
			
		||||
        return
 | 
			
		||||
    try:
 | 
			
		||||
        service.remove()
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Remove service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Remove service "{service_name}" failed: {e}')
 | 
			
		||||
 | 
			
		||||
def _create_cqpserver_container(corpus):
 | 
			
		||||
    ''' # Docker container settings # '''
 | 
			
		||||
@@ -174,20 +162,14 @@ def _create_cqpserver_container(corpus):
 | 
			
		||||
        container = docker_client.containers.get(name)
 | 
			
		||||
    except docker.errors.NotFound:
 | 
			
		||||
        pass
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get container "{name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get container "{name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    else:
 | 
			
		||||
        try:
 | 
			
		||||
            container.remove(force=True)
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
                f'Remove container "{name}" failed '
 | 
			
		||||
                f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
            )
 | 
			
		||||
        except docker.errors.DockerException as e:
 | 
			
		||||
            current_app.logger.error(f'Remove container "{name}" failed: {e}')
 | 
			
		||||
            return
 | 
			
		||||
    try:
 | 
			
		||||
        docker_client.containers.run(
 | 
			
		||||
@@ -207,11 +189,8 @@ def _create_cqpserver_container(corpus):
 | 
			
		||||
        )
 | 
			
		||||
        corpus.status = CorpusStatus.FAILED
 | 
			
		||||
        return
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Run container "{name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError" error: {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Run container "{name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION
 | 
			
		||||
 | 
			
		||||
@@ -220,17 +199,11 @@ def _checkout_analysing_corpus_container(corpus):
 | 
			
		||||
    try:
 | 
			
		||||
        docker_client.containers.get(container_name)
 | 
			
		||||
    except docker.errors.NotFound as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get container "{container_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
        )
 | 
			
		||||
        current_app.logger.error(f'Get container "{container_name}" failed: {e}')
 | 
			
		||||
        corpus.num_analysis_sessions = 0
 | 
			
		||||
        corpus.status = CorpusStatus.BUILT
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get container "{container_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get container "{container_name}" failed: {e}')
 | 
			
		||||
 | 
			
		||||
def _remove_cqpserver_container(corpus):
 | 
			
		||||
    container_name = f'cqpserver_{corpus.id}'
 | 
			
		||||
@@ -239,16 +212,10 @@ def _remove_cqpserver_container(corpus):
 | 
			
		||||
    except docker.errors.NotFound:
 | 
			
		||||
        corpus.status = CorpusStatus.BUILT
 | 
			
		||||
        return
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get container "{container_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get container "{container_name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    try:
 | 
			
		||||
        container.remove(force=True)
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Remove container "{container_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Remove container "{container_name}" failed: {e}')
 | 
			
		||||
 
 | 
			
		||||
@@ -134,11 +134,8 @@ def _create_job_service(job):
 | 
			
		||||
            restart_policy=restart_policy,
 | 
			
		||||
            user='0:0'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Create service "{name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Create service "{name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    job.status = JobStatus.QUEUED
 | 
			
		||||
 | 
			
		||||
@@ -147,17 +144,11 @@ def _checkout_job_service(job):
 | 
			
		||||
    try:
 | 
			
		||||
        service = docker_client.services.get(service_name)
 | 
			
		||||
    except docker.errors.NotFound as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
        )
 | 
			
		||||
        current_app.logger.error(f'Get service "{service_name}" failed: {e}')
 | 
			
		||||
        job.status = JobStatus.FAILED
 | 
			
		||||
        return
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get service "{service_name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    service_tasks = service.tasks()
 | 
			
		||||
    if not service_tasks:
 | 
			
		||||
@@ -194,11 +185,8 @@ def _checkout_job_service(job):
 | 
			
		||||
    job.end_date = datetime.utcnow()
 | 
			
		||||
    try:
 | 
			
		||||
        service.remove()
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Remove service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Remove service "{service_name}" failed: {e}')
 | 
			
		||||
 | 
			
		||||
def _remove_job_service(job):
 | 
			
		||||
    service_name = f'job_{job.id}'
 | 
			
		||||
@@ -207,24 +195,15 @@ def _remove_job_service(job):
 | 
			
		||||
    except docker.errors.NotFound:
 | 
			
		||||
        job.status = JobStatus.CANCELED
 | 
			
		||||
        return
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Get service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Get service "{service_name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    try:
 | 
			
		||||
        service.update(mounts=None)
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Update service "{service_name}" failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Update service "{service_name}" failed: {e}')
 | 
			
		||||
        return
 | 
			
		||||
    try:
 | 
			
		||||
        service.remove()
 | 
			
		||||
    except docker.errors.APIError as e:
 | 
			
		||||
        current_app.logger.error(
 | 
			
		||||
            f'Remove "{service_name}" service failed '
 | 
			
		||||
            f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        )
 | 
			
		||||
    except docker.errors.DockerException as e:
 | 
			
		||||
        current_app.logger.error(f'Remove "{service_name}" service failed: {e}')
 | 
			
		||||
 
 | 
			
		||||
@@ -1,9 +0,0 @@
 | 
			
		||||
from app.daemon import daemon
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def register(app, scheduler):
 | 
			
		||||
    if app.config['NOPAQUE_IS_PRIMARY_INSTANCE']:
 | 
			
		||||
        @scheduler.task('interval', id='daemon', seconds=3)
 | 
			
		||||
        def daemon_task():
 | 
			
		||||
            with app.app_context():
 | 
			
		||||
                daemon()
 | 
			
		||||
		Reference in New Issue
	
	Block a user