diff --git a/app/corpora/events.py b/app/corpora/events.py index 34e08ab6..7175cda7 100644 --- a/app/corpora/events.py +++ b/app/corpora/events.py @@ -13,58 +13,48 @@ import json ' A dictionary containing lists of, with corpus ids associated, Socket.IO ' session ids (sid). {: [, ...], ...} ''' -analysis_sessions = {} +corpus_analysis_sessions = {} ''' ' A dictionary containing Socket.IO session id - CQi client pairs. ' {: CQiClient, ...} ''' -analysis_clients = {} +corpus_analysis_clients = {} -@socketio.on('init_corpus_analysis') +@socketio.on('request_corpus_analysis') @login_required -def init_corpus_analysis(corpus_id): +def request_corpus_analysis(corpus_id): corpus = Corpus.query.get(corpus_id) if corpus is None: - socketio.emit('init_corpus_analysis', '[ERROR 404]: Not Found', + socketio.emit('init_corpus_analysis', '[404]: Not Found', room=request.sid) elif not (corpus.creator == current_user or current_user.is_administrator()): - socketio.emit('init_corpus_analysis', '[ERROR 403]: Forbidden', + socketio.emit('init_corpus_analysis', '[403]: Forbidden', room=request.sid) else: - if corpus_id not in analysis_sessions: - analysis_sessions[corpus_id] = [request.sid] - else: - analysis_sessions[corpus_id].append(request.sid) - while corpus.status != 'analysing': - db.session.refresh(corpus) - socketio.sleep(3) - analysis_clients[request.sid] = CQiWrapper( - host='analyse_corpus_{}'.format(corpus.id)) - analysis_clients[request.sid].connect() - socketio.emit('init_corpus_analysis', 'Ready', room=request.sid) - socketio.start_background_task(observe_corpus_analysis_connection, + socketio.start_background_task(corpus_analysis_session_handler, current_app._get_current_object(), corpus_id, request.sid) -@socketio.on('query') +@socketio.on('corpus_analysis') @login_required -def recv_query(message): - analysis_client = analysis_clients.get(request.sid) - if analysis_client is None: - socketio.emit('query', '[ERROR 424]: Failed Dependency', +def corpus_analysis(message): + client = corpus_analysis_clients.get(request.sid) + if client is None: + socketio.emit('query', '[424]: Failed Dependency', room=request.sid) return """ Prepare and execute a query """ logger.warning('Payload: {}'.format(message)) corpus_name = 'CORPUS' query = message['query'] - analysis_client.select_corpus(corpus_name) - analysis_client.query_subcorpus(query) - results = analysis_client.show_query_results(result_len=int(message['hits_per_page']), - context_len=int(message['context'])) + client.select_corpus(corpus_name) + client.query_subcorpus(query) + results = client.show_query_results( + result_len=int(message['hits_per_page']), + context_len=int(message['context'])) # logger.warning('RESULTS: {}'.format(results)) size_internal_dict = sys.getsizeof(results) / 1000000 size_dict_to_str = sys.getsizeof(str(results)) / 1000000 @@ -76,19 +66,35 @@ def recv_query(message): logger.warning('Size of dict as raw string for {} hits per page and context len {}: {} MB'.format(message['hits_per_page'], message['context'], size_dict_to_str)) logger.warning('Size of gzip compressed dict to string for {} hits per page and context len {}: {} MB'.format(message['hits_per_page'], message['context'], size_dict_to_str_compressed)) logger.warning('Size of zlib compressed and utf-8 encoded string for {} hits per page and context len {}: {} MB'.format(message['hits_per_page'], message['context'], size_zlib_compressed)) - socketio.emit('query', zlib_compressed, room=request.sid) + socketio.emit('corpus_analysis', zlib_compressed, room=request.sid) -def observe_corpus_analysis_connection(app, corpus_id, session_id): +def corpus_analysis_session_handler(app, corpus_id, session_id): with app.app_context(): - while session_id in connected_sessions: + ''' Setup analysis session ''' + logger.warning('[{}] Setup analysis session'.format(session_id)) + corpus = Corpus.query.get(corpus_id) + while corpus.status != 'analysing': + db.session.refresh(corpus) socketio.sleep(3) - analysis_client = analysis_clients.pop(session_id, None) - if analysis_client is not None: - analysis_client.disconnect() - analysis_sessions[corpus_id].remove(session_id) - if not analysis_sessions[corpus_id]: - analysis_sessions.pop(corpus_id, None) - corpus = Corpus.query.get(corpus_id) + client = CQiWrapper(host='analyse_corpus_{}'.format(corpus_id)) + client.connect() + corpus_analysis_clients[session_id] = client + if corpus_id not in corpus_analysis_sessions: + corpus_analysis_sessions[corpus_id] = [session_id] + else: + corpus_analysis_sessions[corpus_id].append(session_id) + socketio.emit('request_corpus_analysis', '[201]: Created', room=session_id) + ''' Observe analysis session ''' + while session_id in connected_sessions: + logger.warning('[{}] Observe analysis session'.format(session_id)) + socketio.sleep(3) + ''' Teardown analysis session ''' + logger.warning('[{}] Teardown analysis session'.format(session_id)) + client.disconnect() + corpus_analysis_clients.pop(session_id, None) + corpus_analysis_sessions[corpus_id].remove(session_id) + if not corpus_analysis_sessions[corpus_id]: + corpus_analysis_sessions.pop(corpus_id, None) corpus.status = 'stop analysis' db.session.commit() diff --git a/app/templates/corpora/analyse_corpus.html.j2 b/app/templates/corpora/analyse_corpus.html.j2 index abe800eb..a03c09cb 100644 --- a/app/templates/corpora/analyse_corpus.html.j2 +++ b/app/templates/corpora/analyse_corpus.html.j2 @@ -96,7 +96,10 @@ {"dismissible": false}); loadingModal.open(); }); - socket.emit('init_corpus_analysis', {{ corpus_id }}); + socket.emit('request_corpus_analysis', {{ corpus_id }}); + socket.on('request_corpus_analysis', function(msg) { + if (msg === '[201]: Created') {loadingModal.close();} + }); var queryFormElement = document.getElementById("query-form"); var queryFormSubmitElement = document.getElementById("query-form-submit"); @@ -108,13 +111,10 @@ let queryData = {'context': formData.get('context'), 'hits_per_page': formData.get('hits_per_page'), 'query': formData.get('query')}; - socket.emit('query', queryData); + socket.emit('corpus_analysis', queryData); M.toast({html: 'Query has been sent!'}); }); - socket.on('init_corpus_analysis', function(msg) { - if (msg === 'Ready') {loadingModal.close();} - }); function decodeResults(resultsByteArray) { console.log(resultsByteArray); var decompressedData = pako.inflate(resultsByteArray); // decompresses the recieved ArrayBuffer holding the compressed Byte data @@ -134,7 +134,7 @@ } return infos } - socket.on('query', function(results) { + socket.on('corpus_analysis', function(results) { console.log(results); var decodedJSONStr = decodeResults(results); var results = JSON.parse(decodedJSONStr);