From 3ccae085d286dadca8667a4b684dd6669e6efc1a Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Thu, 7 Nov 2019 14:33:48 +0100 Subject: [PATCH 1/3] Remove unused import --- app/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/app/events.py b/app/events.py index b0ac4dc4..aded211f 100644 --- a/app/events.py +++ b/app/events.py @@ -5,7 +5,6 @@ from .decorators import admin_required from .models import User import json import jsonpatch -import logging ''' From ebf2f00e0d5ddcb07d3de96da7246faf1ee8491c Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Thu, 7 Nov 2019 14:33:58 +0100 Subject: [PATCH 2/3] Add example for client management --- app/corpora/events.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/app/corpora/events.py b/app/corpora/events.py index 36f9f34d..8ad98fd3 100644 --- a/app/corpora/events.py +++ b/app/corpora/events.py @@ -8,12 +8,14 @@ import logging import time +analysis_clients = {} + + @socketio.on('init_corpus_analysis') @login_required def init_corpus_analysis(corpus_id): - logger = logging.getLogger(__name__) - logger.warning('init_corpus_analysis') ''' TODO: Check if current_user is allowed to subscribe to this ''' + time.sleep(5) # wait for IP of container only for dev socketio.start_background_task(observe_corpus_analysis_connection, current_app._get_current_object(), corpus_id, @@ -24,20 +26,20 @@ def init_corpus_analysis(corpus_id): def recv_query(message): logger = logging.getLogger(__name__) logger.warning(message) + analysis_client = analysis_clients[request.sid] def observe_corpus_analysis_connection(app, corpus_id, session_id): logger = logging.getLogger(__name__) with app.app_context(): - time.sleep(5) # wait for IP of container only for dev corpus = Corpus.query.filter_by(id=corpus_id).first() - logger.warning('IP adress is: {}'.format(corpus.analysis_container_ip)) - cqi_client = CQiClient(host=corpus.analysis_container_ip) - cqi_client.ctrl_connect('opaque', 'opaque') + logger.warning('MÖÖÖÖP') + analysis_client = CQiClient(host=corpus.analysis_container_ip) + analysis_client.ctrl_connect('opaque', 'opaque') + analysis_clients[session_id] = analysis_client while session_id in connected_sessions: - logger.warning(cqi_client.ctrl_ping()) - logger.warning('Run container, run!') + logger.warning(analysis_client.ctrl_ping()) socketio.sleep(3) + analysis_client.ctrl_bye() corpus.status = 'stop analysis' db.session.commit() - logger.warning('Stop container, stop!') From 03b956c45781465fa46a6b4513c6283f788e8e49 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Thu, 7 Nov 2019 15:28:07 +0100 Subject: [PATCH 3/3] Remove workaround to wait for container --- app/corpora/events.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/app/corpora/events.py b/app/corpora/events.py index 8ad98fd3..28d3bad7 100644 --- a/app/corpora/events.py +++ b/app/corpora/events.py @@ -5,7 +5,6 @@ from flask import current_app, request from flask_login import login_required from .CQiClient.CQiClient import CQiClient import logging -import time analysis_clients = {} @@ -15,7 +14,6 @@ analysis_clients = {} @login_required def init_corpus_analysis(corpus_id): ''' TODO: Check if current_user is allowed to subscribe to this ''' - time.sleep(5) # wait for IP of container only for dev socketio.start_background_task(observe_corpus_analysis_connection, current_app._get_current_object(), corpus_id, @@ -33,13 +31,17 @@ def observe_corpus_analysis_connection(app, corpus_id, session_id): logger = logging.getLogger(__name__) with app.app_context(): corpus = Corpus.query.filter_by(id=corpus_id).first() - logger.warning('MÖÖÖÖP') - analysis_client = CQiClient(host=corpus.analysis_container_ip) - analysis_client.ctrl_connect('opaque', 'opaque') + while corpus.status != 'analysing': + db.session.refresh(corpus) + socketio.sleep(3) + analysis_server = '{}_analysis_container{}'.format(corpus.creator.username, corpus.id) + analysis_client = CQiClient(host=analysis_server) analysis_clients[session_id] = analysis_client + analysis_client.ctrl_connect('opaque', 'opaque') while session_id in connected_sessions: logger.warning(analysis_client.ctrl_ping()) socketio.sleep(3) analysis_client.ctrl_bye() + analysis_clients.pop(session_id, None) corpus.status = 'stop analysis' db.session.commit()