from flask import current_app, request from flask_login import current_user from socket import gaierror from . import cqi from .. import db, logger, socketio from ..decorators import socketio_login_required from ..events import connected_sessions from ..models import Corpus, User import math ''' ' A dictionary containing lists of, with corpus ids associated, Socket.IO ' session ids (sid). {: [, ...], ...} ''' pj_corpus_analysis_sessions = {} ''' ' A dictionary containing Socket.IO session id - CQi client pairs. ' {: CQiClient, ...} ''' pj_corpus_analysis_clients = {} @socketio.on('pj_corpus_analysis_init') @socketio_login_required def pj_init_corpus_analysis(corpus_id): socketio.start_background_task(pj_corpus_analysis_session_handler, current_app._get_current_object(), corpus_id, current_user.id, request.sid) @socketio.on('pj_corpus_analysis_query') @socketio_login_required def pj_corpus_analysis_query(query): client = pj_corpus_analysis_clients.get(request.sid) if client is None: response = {'code': 404, 'desc': 'No client found for this session', 'msg': 'Failed Dependency'} socketio.emit('pj_corpus_analysis_query', response, room=request.sid) return if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) try: corpus = client.corpora.get('CORPUS') except cqi.errors.CQiException as e: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': {'code': e.code, 'desc': e.description, 'msg': e.name}} socketio.emit('pj_corpus_analysis_query', response, room=request.sid) return try: query_status = corpus.query(query) except cqi.errors.CQiException as e: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': {'code': e.code, 'desc': e.description, 'msg': e.name}} socketio.emit('pj_corpus_analysis_query', response, room=request.sid) return try: results = corpus.subcorpora.get('Results') except cqi.errors.CQiException as e: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': {'code': e.code, 'desc': e.description, 'msg': e.name}} socketio.emit('pj_corpus_analysis_query', response, room=request.sid) return response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': {**query_status, 'match_count': results.size}} socketio.emit('pj_corpus_analysis_query', response, room=request.sid) chunk_size = 100 chunk_start = 0 context = 100 progress = 0 client.status = 'running' while chunk_start <= results.size: logger.warning(client.status) print(client.status) if client.status == 'abort': break chunk = results.export(context=context, cutoff=chunk_size, expand_lists=False, offset=chunk_start) chunk['cpos_ranges'] = True if (results.size == 0): progress = 100 else: progress = ((chunk_start + chunk_size) / results.size) * 100 progress = min(100, int(math.ceil(progress))) response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': {'chunk': chunk, 'progress': progress}} socketio.emit('pj_corpus_analysis_query_results', response, room=request.sid) chunk_start += chunk_size client.status = 'ready' def pj_corpus_analysis_session_handler(app, corpus_id, user_id, session_id): with app.app_context(): ''' Setup analysis session ''' corpus = Corpus.query.get(corpus_id) user = User.query.get(user_id) if corpus is None: response = {'code': 404, 'desc': None, 'msg': 'Not Found'} socketio.emit('pj_corpus_analysis_init', response, room=session_id) return elif not (corpus.creator == user or user.is_administrator()): response = {'code': 403, 'desc': None, 'msg': 'Forbidden'} socketio.emit('pj_corpus_analysis_init', response, room=session_id) return while corpus.status != 'analysing': db.session.refresh(corpus) socketio.sleep(3) client = cqi.CQiClient('corpus_{}_analysis'.format(corpus_id)) try: connect_status = client.connect() except cqi.errors.CQiException as e: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': {'code': e.code, 'desc': e.description, 'msg': e.name}} socketio.emit('pj_corpus_analysis_init', response, room=session_id) return except gaierror: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error'} socketio.emit('pj_corpus_analysis_init', response, room=session_id) return pj_corpus_analysis_clients[session_id] = client if corpus_id not in pj_corpus_analysis_sessions: pj_corpus_analysis_sessions[corpus_id] = [session_id] else: pj_corpus_analysis_sessions[corpus_id].append(session_id) client.status = 'ready' response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': {**connect_status}} socketio.emit('pj_corpus_analysis_init', response, room=session_id) ''' Observe analysis session ''' while session_id in connected_sessions: socketio.sleep(3) ''' Teardown analysis session ''' if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) try: client.disconnect() except cqi.errors.CQiException: pass pj_corpus_analysis_clients.pop(session_id, None) pj_corpus_analysis_sessions[corpus_id].remove(session_id) if not pj_corpus_analysis_sessions[corpus_id]: pj_corpus_analysis_sessions.pop(corpus_id, None) corpus.status = 'stop analysis' db.session.commit()