from cqi import CQiClient from cqi.errors import CQiException from cqi.status import CQiStatus from flask import current_app from flask_login import current_user from flask_socketio import Namespace from inspect import signature from threading import Lock from app import db, docker_client, hashids, socketio from app.decorators import socketio_login_required from app.models import Corpus, CorpusStatus from . import cqi_extensions from .utils import CQiOverSocketIOSessionManager ''' This package tunnels the Corpus Query interface (CQi) protocol through Socket.IO (SIO) by tunneling CQi API calls through an event called "exec". Basic concept: 1. A client connects to the namespace. 2. The client emits the "init" event and provides a corpus id for the corpus that should be analysed in this session. 1.1 The analysis session counter of the corpus is incremented. 1.2 A CQiClient and a (Mutex) Lock belonging to it is created. 1.3 Wait until the CQP server is running. 1.4 Connect the CQiClient to the server. 1.5 Save the CQiClient, the Lock and the corpus id in the session for subsequential use. 3. The client emits "exec" events, within which it provides the name of a CQi API function and the corresponding arguments. 3.1 The "exec" event handler will execute the function, make sure that the result is serializable and returns the result back to the client. 4. The client disconnects from the namespace 4.1 The analysis session counter of the corpus is decremented. 4.2 The CQiClient and (Mutex) Lock belonging to it are teared down. ''' CQI_API_FUNCTION_NAMES = [ 'ask_feature_cl_2_3', 'ask_feature_cqi_1_0', 'ask_feature_cqp_2_3', 'cl_alg2cpos', 'cl_attribute_size', 'cl_cpos2alg', 'cl_cpos2id', 'cl_cpos2lbound', 'cl_cpos2rbound', 'cl_cpos2str', 'cl_cpos2struc', 'cl_drop_attribute', 'cl_id2cpos', 'cl_id2freq', 'cl_id2str', 'cl_idlist2cpos', 'cl_lexicon_size', 'cl_regex2id', 'cl_str2id', 'cl_struc2cpos', 'cl_struc2str', 'corpus_alignment_attributes', 'corpus_charset', 'corpus_drop_corpus', 'corpus_full_name', 'corpus_info', 'corpus_list_corpora', 'corpus_positional_attributes', 'corpus_properties', 'corpus_structural_attribute_has_values', 'corpus_structural_attributes', 'cqp_drop_subcorpus', 'cqp_dump_subcorpus', 'cqp_fdist_1', 'cqp_fdist_2', 'cqp_list_subcorpora', 'cqp_query', 'cqp_subcorpus_has_field', 'cqp_subcorpus_size', 'ctrl_bye', 'ctrl_connect', 'ctrl_last_general_error', 'ctrl_ping', 'ctrl_user_abort' ] class CQiOverSocketIONamespace(Namespace): @socketio_login_required def on_connect(self): pass @socketio_login_required def on_init(self, corpus_hashid: str) -> dict: corpus_id = hashids.decode(corpus_hashid) if not isinstance(corpus_id, int): return {'code': 400, 'msg': 'Bad Request'} corpus = Corpus.query.get(corpus_id) if corpus is None: return {'code': 404, 'msg': 'Not Found'} if not ( corpus.user == current_user or current_user.is_following_corpus(corpus) or current_user.is_administrator ): return {'code': 403, 'msg': 'Forbidden'} if corpus.status not in [ CorpusStatus.BUILT, CorpusStatus.STARTING_ANALYSIS_SESSION, CorpusStatus.RUNNING_ANALYSIS_SESSION, CorpusStatus.CANCELING_ANALYSIS_SESSION ]: return {'code': 424, 'msg': 'Failed Dependency'} corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1 db.session.commit() retry_counter = 20 while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION: if retry_counter == 0: corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 db.session.commit() return {'code': 408, 'msg': 'Request Timeout'} socketio.sleep(3) retry_counter -= 1 db.session.refresh(corpus) cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}' cqpserver_container = docker_client.containers.get(cqpserver_container_name) cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress'] cqi_client = CQiClient(cqpserver_ip_address) cqi_client_lock = Lock() CQiOverSocketIOSessionManager.setup() CQiOverSocketIOSessionManager.set_corpus_id(corpus_id) CQiOverSocketIOSessionManager.set_cqi_client(cqi_client) CQiOverSocketIOSessionManager.set_cqi_client_lock(cqi_client_lock) return {'code': 200, 'msg': 'OK'} @socketio_login_required def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict: try: cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock() except KeyError: return {'code': 424, 'msg': 'Failed Dependency'} if fn_name in CQI_API_FUNCTION_NAMES: fn = getattr(cqi_client.api, fn_name) elif fn_name in cqi_extensions.CQI_EXTENSION_FUNCTION_NAMES: fn = getattr(cqi_extensions, fn_name) else: return {'code': 400, 'msg': 'Bad Request'} for param in signature(fn).parameters.values(): # Check if the parameter is optional or required if param.default is param.empty: if param.name not in fn_args: return {'code': 400, 'msg': 'Bad Request'} else: if param.name not in fn_args: continue if type(fn_args[param.name]) is not param.annotation: return {'code': 400, 'msg': 'Bad Request'} cqi_client_lock.acquire() try: fn_return_value = fn(**fn_args) except BrokenPipeError as e: return {'code': 500, 'msg': 'Internal Server Error'} except CQiException as e: return { 'code': 502, 'msg': 'Bad Gateway', 'payload': { 'code': e.code, 'desc': e.description, 'msg': e.__class__.__name__ } } finally: cqi_client_lock.release() if isinstance(fn_return_value, CQiStatus): payload = { 'code': fn_return_value.code, 'msg': fn_return_value.__class__.__name__ } else: payload = fn_return_value return {'code': 200, 'msg': 'OK', 'payload': payload} def on_disconnect(self): try: corpus_id = CQiOverSocketIOSessionManager.get_corpus_id() cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock() CQiOverSocketIOSessionManager.teardown() except KeyError: return cqi_client_lock.acquire() try: cqi_client.api.ctrl_bye() except (BrokenPipeError, CQiException): pass cqi_client_lock.release() corpus = Corpus.query.get(corpus_id) if corpus is None: return corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 db.session.commit()