From 5771e156ce441d92425d2500f34ac6d339dda5e4 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Mon, 4 Jul 2022 14:11:10 +0200 Subject: [PATCH] Change the APScheduler Logic and try to catch more errors in Daemon --- app/__init__.py | 8 ++--- app/daemon/__init__.py | 10 +++--- app/daemon/corpus_utils.py | 73 +++++++++++--------------------------- app/daemon/job_utils.py | 47 +++++++----------------- app/tasks.py | 9 ----- config.py | 13 +++++++ 6 files changed, 56 insertions(+), 104 deletions(-) delete mode 100644 app/tasks.py diff --git a/app/__init__.py b/app/__init__.py index cef8fa19..27233b15 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -33,7 +33,7 @@ def create_app(config: Config = Config) -> Flask: app.config.from_object(config) config.init_app(app) docker_client.login( - username=app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'], + app.config['NOPAQUE_DOCKER_REGISTRY_USERNAME'], password=app.config['NOPAQUE_DOCKER_REGISTRY_PASSWORD'], registry=app.config['NOPAQUE_DOCKER_REGISTRY'] ) @@ -48,9 +48,6 @@ def create_app(config: Config = Config) -> Flask: scheduler.init_app(app) socketio.init_app(app, message_queue=app.config['NOPAQUE_SOCKETIO_MESSAGE_QUEUE_URI']) # noqa - from . import tasks - tasks.register(app, scheduler) - from .admin import bp as admin_blueprint app.register_blueprint(admin_blueprint, url_prefix='/admin') @@ -81,4 +78,7 @@ def create_app(config: Config = Config) -> Flask: from .settings import bp as settings_blueprint app.register_blueprint(settings_blueprint, url_prefix='/settings') + from .users import bp as users_blueprint + app.register_blueprint(users_blueprint, url_prefix='/users') + return app diff --git a/app/daemon/__init__.py b/app/daemon/__init__.py index 3d038dce..9cf16bc8 100644 --- a/app/daemon/__init__.py +++ b/app/daemon/__init__.py @@ -1,9 +1,11 @@ from app import db +from flask import Flask from .corpus_utils import check_corpora from .job_utils import check_jobs -def daemon(): - check_corpora() - check_jobs() - db.session.commit() +def daemon(app: Flask): + with app.app_context(): + check_corpora() + check_jobs() + db.session.commit() diff --git a/app/daemon/corpus_utils.py b/app/daemon/corpus_utils.py index 08712f1f..1703521a 100644 --- a/app/daemon/corpus_utils.py +++ b/app/daemon/corpus_utils.py @@ -90,11 +90,8 @@ def _create_build_corpus_service(corpus): restart_policy=restart_policy, user='0:0' ) - except docker.errors.APIError as e: - current_app.logger.error( - f'Create service "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Create service "{name}" failed: {e}') return corpus.status = CorpusStatus.QUEUED @@ -103,17 +100,11 @@ def _checkout_build_corpus_service(corpus): try: service = docker_client.services.get(service_name) except docker.errors.NotFound as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.NotFound": {e}' - ) + current_app.logger.error(f'Get service "{service_name}" failed: {e}') corpus.status = CorpusStatus.FAILED return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get service "{service_name}" failed: {e}') service_tasks = service.tasks() if not service_tasks: return @@ -129,11 +120,8 @@ def _checkout_build_corpus_service(corpus): return try: service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Remove service "{service_name}" failed: {e}') def _create_cqpserver_container(corpus): ''' # Docker container settings # ''' @@ -174,20 +162,14 @@ def _create_cqpserver_container(corpus): container = docker_client.containers.get(name) except docker.errors.NotFound: pass - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get container "{name}" failed: {e}') return else: try: container.remove(force=True) - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove container "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Remove container "{name}" failed: {e}') return try: docker_client.containers.run( @@ -207,11 +189,8 @@ def _create_cqpserver_container(corpus): ) corpus.status = CorpusStatus.FAILED return - except docker.errors.APIError as e: - current_app.logger.error( - f'Run container "{name}" failed ' - f'due to "docker.errors.APIError" error: {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Run container "{name}" failed: {e}') return corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION @@ -220,17 +199,11 @@ def _checkout_analysing_corpus_container(corpus): try: docker_client.containers.get(container_name) except docker.errors.NotFound as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.NotFound": {e}' - ) + current_app.logger.error(f'Get container "{container_name}" failed: {e}') corpus.num_analysis_sessions = 0 corpus.status = CorpusStatus.BUILT - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get container "{container_name}" failed: {e}') def _remove_cqpserver_container(corpus): container_name = f'cqpserver_{corpus.id}' @@ -239,16 +212,10 @@ def _remove_cqpserver_container(corpus): except docker.errors.NotFound: corpus.status = CorpusStatus.BUILT return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get container "{container_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get container "{container_name}" failed: {e}') return try: container.remove(force=True) - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove container "{container_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Remove container "{container_name}" failed: {e}') diff --git a/app/daemon/job_utils.py b/app/daemon/job_utils.py index 9624f253..02f6bb9e 100644 --- a/app/daemon/job_utils.py +++ b/app/daemon/job_utils.py @@ -134,11 +134,8 @@ def _create_job_service(job): restart_policy=restart_policy, user='0:0' ) - except docker.errors.APIError as e: - current_app.logger.error( - f'Create service "{name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Create service "{name}" failed: {e}') return job.status = JobStatus.QUEUED @@ -147,17 +144,11 @@ def _checkout_job_service(job): try: service = docker_client.services.get(service_name) except docker.errors.NotFound as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.NotFound": {e}' - ) + current_app.logger.error(f'Get service "{service_name}" failed: {e}') job.status = JobStatus.FAILED return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get service "{service_name}" failed: {e}') return service_tasks = service.tasks() if not service_tasks: @@ -194,11 +185,8 @@ def _checkout_job_service(job): job.end_date = datetime.utcnow() try: service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Remove service "{service_name}" failed: {e}') def _remove_job_service(job): service_name = f'job_{job.id}' @@ -207,24 +195,15 @@ def _remove_job_service(job): except docker.errors.NotFound: job.status = JobStatus.CANCELED return - except docker.errors.APIError as e: - current_app.logger.error( - f'Get service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Get service "{service_name}" failed: {e}') return try: service.update(mounts=None) - except docker.errors.APIError as e: - current_app.logger.error( - f'Update service "{service_name}" failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Update service "{service_name}" failed: {e}') return try: service.remove() - except docker.errors.APIError as e: - current_app.logger.error( - f'Remove "{service_name}" service failed ' - f'due to "docker.errors.APIError": {e}' - ) + except docker.errors.DockerException as e: + current_app.logger.error(f'Remove "{service_name}" service failed: {e}') diff --git a/app/tasks.py b/app/tasks.py deleted file mode 100644 index a43f171b..00000000 --- a/app/tasks.py +++ /dev/null @@ -1,9 +0,0 @@ -from app.daemon import daemon - - -def register(app, scheduler): - if app.config['NOPAQUE_IS_PRIMARY_INSTANCE']: - @scheduler.task('interval', id='daemon', seconds=3) - def daemon_task(): - with app.app_context(): - daemon() diff --git a/config.py b/config.py index 773f8089..ec27529f 100644 --- a/config.py +++ b/config.py @@ -18,6 +18,9 @@ class Config: SESSION_COOKIE_SECURE = \ os.environ.get('SESSION_COOKIE_SECURE', 'false').lower() == 'true' + ''' # Flask-APScheduler # ''' + JOBS = [] + ''' # Flask-Assets ''' ASSETS_DEBUG = os.environ.get('ASSETS_DEBUG', 'false').lower() == 'true' @@ -112,6 +115,16 @@ class Config: fmt=app.config['NOPAQUE_LOG_FORMAT'], datefmt=app.config['NOPAQUE_LOG_DATE_FORMAT'] ) + if app.config['NOPAQUE_IS_PRIMARY_INSTANCE']: + app.config['JOBS'].append( + { + "id": "daemon", + "func": "app.daemon:daemon", + "args": (app,), + "trigger": "interval", + "seconds": 3, + } + ) if app.config['NOPAQUE_LOG_STDERR_ENABLED']: stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter)