from app import db, socketio from app.events import connected_sessions from app.models import Corpus from flask import current_app, request from flask_login import current_user, login_required from .CQiWrapper.CQiWrapper import CQiWrapper import logging import json ''' ' A dictionary containing lists of, with corpus ids associated, Socket.IO ' session ids (sid). {: [, ...], ...} ''' analysis_sessions = {} ''' ' A dictionary containing Socket.IO session id - CQi client pairs. ' {: CQiClient, ...} ''' analysis_clients = {} @socketio.on('init_corpus_analysis') @login_required def init_corpus_analysis(corpus_id): corpus = Corpus.query.filter_by(id=corpus_id).first() if corpus is None: socketio.emit('init_corpus_analysis', '[ERROR 404]: Not Found', room=request.sid) if not (corpus.creator == current_user or current_user.is_administrator()): socketio.emit('init_corpus_analysis', '[ERROR 403]: Forbidden', room=request.sid) if str(corpus_id) not in analysis_sessions: analysis_sessions[str(corpus_id)] = [request.sid] socketio.start_background_task(observe_corpus_analysis_connection, current_app._get_current_object(), corpus_id, request.sid) @socketio.on('query_event') @login_required def recv_query(message): logger = logging.getLogger(__name__) logger.warning(message) analysis_client = analysis_clients[request.sid] corpus_name = 'CORPUS' result_subcorpus_name = 'Query-results' query = message['query'] analysis_client.set_corpus_name(corpus_name) analysis_client.create_attribute_strings() analysis_client.query_subcorpus(result_subcorpus_name, query) results = analysis_client.show_results() logger.warning('Query results: {}'.format(str(results))) json_results = json.dumps(results) logger.warning('JSON results are {}'.format(json_results)) socketio.emit('query_results', json_results) def observe_corpus_analysis_connection(app, corpus_id, session_id): logger = logging.getLogger(__name__) with app.app_context(): corpus = Corpus.query.filter_by(id=corpus_id).first() while corpus.status != 'analysing': db.session.refresh(corpus) socketio.sleep(3) analysis_client = CQiWrapper(host='{}_analysis_container{}'.format(corpus.creator.username, corpus.id), password='opaque', port=4877, username='opaque') analysis_client.connect() analysis_clients[session_id] = analysis_client socketio.emit('init_corpus_analysis', 'Ready', room=session_id) while session_id in connected_sessions: try: analysis_client.ctrl_ping() except Exception as err: logger.warning('[Exception]: {}'.format(err)) break else: socketio.sleep(3) analysis_client.disconnect() analysis_clients.pop(session_id, None) analysis_sessions[str(corpus_id)].remove(session_id) if not analysis_sessions[str(corpus_id)]: analysis_sessions.pop(str(corpus_id), None) corpus.status = 'stop analysis' db.session.commit()