2019-11-12 12:23:54 +00:00
from app import db , logger , socketio
2019-11-06 13:35:33 +00:00
from app . events import connected_sessions
2019-11-06 13:45:08 +00:00
from app . models import Corpus
2019-11-06 13:35:33 +00:00
from flask import current_app , request
2019-11-12 11:04:07 +00:00
from flask_login import current_user , login_required
2019-11-18 13:24:31 +00:00
from . CQiWrapper . CQiWrapper import CQiWrapper
2019-11-19 14:21:42 +00:00
import sys
import gzip
import zlib
2019-11-21 15:05:00 +00:00
import json
2019-11-06 13:35:33 +00:00
2019-11-11 13:20:44 +00:00
'''
2019-11-12 11:04:07 +00:00
' A dictionary containing lists of, with corpus ids associated, Socket.IO
' session ids (sid). { <corpus_id>: [<sid>, ...], ...}
2019-11-11 13:20:44 +00:00
'''
2019-11-11 10:51:18 +00:00
analysis_sessions = { }
2019-11-11 13:20:44 +00:00
'''
' A dictionary containing Socket.IO session id - CQi client pairs.
2019-11-12 11:04:07 +00:00
' { <sid>: CQiClient, ...}
2019-11-11 13:20:44 +00:00
'''
2019-11-07 13:33:58 +00:00
analysis_clients = { }
2019-11-06 13:35:33 +00:00
@socketio.on ( ' init_corpus_analysis ' )
@login_required
def init_corpus_analysis ( corpus_id ) :
2019-11-15 12:09:12 +00:00
corpus = Corpus . query . get ( corpus_id )
2019-11-12 11:04:07 +00:00
if corpus is None :
socketio . emit ( ' init_corpus_analysis ' , ' [ERROR 404]: Not Found ' ,
room = request . sid )
2019-11-18 10:08:33 +00:00
elif not ( corpus . creator == current_user
or current_user . is_administrator ( ) ) :
2019-11-12 11:04:07 +00:00
socketio . emit ( ' init_corpus_analysis ' , ' [ERROR 403]: Forbidden ' ,
room = request . sid )
2019-11-14 12:19:05 +00:00
else :
2019-11-18 10:08:33 +00:00
if corpus_id not in analysis_sessions :
analysis_sessions [ corpus_id ] = [ request . sid ]
else :
analysis_sessions [ corpus_id ] . append ( request . sid )
while corpus . status != ' analysing ' :
db . session . refresh ( corpus )
socketio . sleep ( 3 )
2019-11-18 15:07:07 +00:00
analysis_clients [ request . sid ] = CQiWrapper (
2019-11-19 10:42:23 +00:00
host = ' analyse_corpus_ {} ' . format ( corpus . id ) )
2019-11-18 15:07:07 +00:00
analysis_clients [ request . sid ] . connect ( )
2019-11-18 10:08:33 +00:00
socketio . emit ( ' init_corpus_analysis ' , ' Ready ' , room = request . sid )
socketio . start_background_task ( observe_corpus_analysis_connection ,
current_app . _get_current_object ( ) ,
corpus_id , request . sid )
2019-11-06 13:35:33 +00:00
2019-11-14 14:34:07 +00:00
@socketio.on ( ' query ' )
2019-11-12 11:04:07 +00:00
@login_required
2019-11-06 13:35:33 +00:00
def recv_query ( message ) :
2019-11-18 10:08:33 +00:00
analysis_client = analysis_clients . get ( request . sid )
if analysis_client is None :
socketio . emit ( ' query ' , ' [ERROR 424]: Failed Dependency ' ,
room = request . sid )
return
2019-11-12 13:00:03 +00:00
""" Prepare and execute a query """
2019-11-19 14:21:42 +00:00
logger . warning ( ' Payload: {} ' . format ( message ) )
2019-11-18 13:24:31 +00:00
corpus_name = ' CORPUS '
2019-11-12 15:45:54 +00:00
query = message [ ' query ' ]
2019-11-18 15:07:07 +00:00
analysis_client . select_corpus ( corpus_name )
analysis_client . query_subcorpus ( query )
2019-11-19 14:21:42 +00:00
results = analysis_client . show_query_results ( result_len = int ( message [ ' hits_per_page ' ] ) ,
context_len = int ( message [ ' context ' ] ) )
2019-11-21 15:05:00 +00:00
# logger.warning('RESULTS: {}'.format(results))
2019-11-19 14:21:42 +00:00
size_internal_dict = sys . getsizeof ( results ) / 1000000
size_dict_to_str = sys . getsizeof ( str ( results ) ) / 1000000
compressed_str = gzip . compress ( str ( results ) . encode ( ) )
size_dict_to_str_compressed = sys . getsizeof ( compressed_str ) / 1000000
2019-11-21 15:05:00 +00:00
zlib_compressed = zlib . compress ( json . dumps ( results ) . encode ( ' utf-8 ' ) )
size_zlib_compressed = sys . getsizeof ( zlib_compressed ) / 1000000
logger . warning ( ' Internal size of dict for {} hits per page and context len {} : {} MB ' . format ( message [ ' hits_per_page ' ] , message [ ' context ' ] , size_internal_dict ) )
logger . warning ( ' Size of dict as raw string for {} hits per page and context len {} : {} MB ' . format ( message [ ' hits_per_page ' ] , message [ ' context ' ] , size_dict_to_str ) )
logger . warning ( ' Size of gzip compressed dict to string for {} hits per page and context len {} : {} MB ' . format ( message [ ' hits_per_page ' ] , message [ ' context ' ] , size_dict_to_str_compressed ) )
logger . warning ( ' Size of zlib compressed and utf-8 encoded string for {} hits per page and context len {} : {} MB ' . format ( message [ ' hits_per_page ' ] , message [ ' context ' ] , size_zlib_compressed ) )
socketio . emit ( ' query ' , zlib_compressed , room = request . sid )
2019-11-06 13:35:33 +00:00
2019-11-06 13:45:08 +00:00
def observe_corpus_analysis_connection ( app , corpus_id , session_id ) :
2019-11-06 13:35:33 +00:00
with app . app_context ( ) :
while session_id in connected_sessions :
2019-11-12 12:23:54 +00:00
socketio . sleep ( 3 )
2019-11-18 10:08:33 +00:00
analysis_client = analysis_clients . pop ( session_id , None )
if analysis_client is not None :
2019-11-18 15:07:07 +00:00
analysis_client . disconnect ( )
2019-11-14 12:19:05 +00:00
analysis_sessions [ corpus_id ] . remove ( session_id )
if not analysis_sessions [ corpus_id ] :
analysis_sessions . pop ( corpus_id , None )
2019-11-18 10:08:33 +00:00
corpus = Corpus . query . get ( corpus_id )
2019-11-11 10:51:18 +00:00
corpus . status = ' stop analysis '
2019-11-12 11:04:07 +00:00
db . session . commit ( )