nopaque/app/namespaces/cqi_over_sio/__init__.py
2024-12-09 16:12:49 +01:00

235 lines
7.6 KiB
Python

from cqi import CQiClient
from cqi.errors import CQiException
from cqi.status import CQiStatus
from flask import current_app
from flask_login import current_user
from flask_socketio import Namespace
from inspect import signature
from threading import Lock
from app import db, docker_client, hashids, socketio
from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus
from . import cqi_extension_functions
from .utils import SessionManager
'''
This package tunnels the Corpus Query interface (CQi) protocol through
Socket.IO (SIO) by tunneling CQi API calls through an event called "exec".
Basic concept:
1. A client connects to the namespace.
2. The client emits the "init" event and provides a corpus id for the corpus
that should be analysed in this session.
1.1 The analysis session counter of the corpus is incremented.
1.2 A CQiClient and a (Mutex) Lock belonging to it is created.
1.3 Wait until the CQP server is running.
1.4 Connect the CQiClient to the server.
1.5 Save the CQiClient, the Lock and the corpus id in the session for
subsequential use.
3. The client emits "exec" events, within which it provides the name of a CQi
API function and the corresponding arguments.
3.1 The "exec" event handler will execute the function, make sure that
the result is serializable and returns the result back to the client.
4. The client disconnects from the namespace
4.1 The analysis session counter of the corpus is decremented.
4.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
'''
CQI_API_FUNCTION_NAMES = [
'ask_feature_cl_2_3',
'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3',
'cl_alg2cpos',
'cl_attribute_size',
'cl_cpos2alg',
'cl_cpos2id',
'cl_cpos2lbound',
'cl_cpos2rbound',
'cl_cpos2str',
'cl_cpos2struc',
'cl_drop_attribute',
'cl_id2cpos',
'cl_id2freq',
'cl_id2str',
'cl_idlist2cpos',
'cl_lexicon_size',
'cl_regex2id',
'cl_str2id',
'cl_struc2cpos',
'cl_struc2str',
'corpus_alignment_attributes',
'corpus_charset',
'corpus_drop_corpus',
'corpus_full_name',
'corpus_info',
'corpus_list_corpora',
'corpus_positional_attributes',
'corpus_properties',
'corpus_structural_attribute_has_values',
'corpus_structural_attributes',
'cqp_drop_subcorpus',
'cqp_dump_subcorpus',
'cqp_fdist_1',
'cqp_fdist_2',
'cqp_list_subcorpora',
'cqp_query',
'cqp_subcorpus_has_field',
'cqp_subcorpus_size',
'ctrl_bye',
'ctrl_connect',
'ctrl_last_general_error',
'ctrl_ping',
'ctrl_user_abort'
]
CQI_EXTENSION_FUNCTION_NAMES = [
'ext_corpus_update_db',
'ext_corpus_static_data',
'ext_corpus_paginate_corpus',
'ext_cqp_paginate_subcorpus',
'ext_cqp_partial_export_subcorpus',
'ext_cqp_export_subcorpus',
]
class CQiOverSocketIONamespace(Namespace):
@socketio_login_required
def on_connect(self):
pass
@socketio_login_required
def on_init(self, corpus_hashid: str) -> dict:
corpus_id = hashids.decode(corpus_hashid)
if not isinstance(corpus_id, int):
return {'code': 400, 'msg': 'Bad Request'}
corpus = Corpus.query.get(corpus_id)
if corpus is None:
return {'code': 404, 'msg': 'Not Found'}
if not (
corpus.user == current_user
or current_user.is_following_corpus(corpus)
or current_user.is_administrator
):
return {'code': 403, 'msg': 'Forbidden'}
if corpus.status not in [
CorpusStatus.BUILT,
CorpusStatus.STARTING_ANALYSIS_SESSION,
CorpusStatus.RUNNING_ANALYSIS_SESSION,
CorpusStatus.CANCELING_ANALYSIS_SESSION
]:
return {'code': 424, 'msg': 'Failed Dependency'}
corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit()
retry_counter = 20
while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
if retry_counter == 0:
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
return {'code': 408, 'msg': 'Request Timeout'}
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(corpus)
cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}'
cqpserver_container = docker_client.containers.get(cqpserver_container_name)
cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress']
cqi_client = CQiClient(cqpserver_ip_address)
cqi_client_lock = Lock()
SessionManager.setup()
SessionManager.set_corpus_id(corpus_id)
SessionManager.set_cqi_client(cqi_client)
SessionManager.set_cqi_client_lock(cqi_client_lock)
return {'code': 200, 'msg': 'OK'}
@socketio_login_required
def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict:
try:
cqi_client = SessionManager.get_cqi_client()
cqi_client_lock = SessionManager.get_cqi_client_lock()
except KeyError:
return {'code': 424, 'msg': 'Failed Dependency'}
if fn_name in CQI_API_FUNCTION_NAMES:
fn = getattr(cqi_client.api, fn_name)
elif fn_name in CQI_EXTENSION_FUNCTION_NAMES:
fn = getattr(cqi_extension_functions, fn_name)
else:
return {'code': 400, 'msg': 'Bad Request'}
for param in signature(fn).parameters.values():
# Check if the parameter is optional or required
# The following is true for required parameters
if param.default is param.empty:
if param.name not in fn_args:
return {'code': 400, 'msg': 'Bad Request'}
else:
if param.name not in fn_args:
continue
if type(fn_args[param.name]) is not param.annotation:
return {'code': 400, 'msg': 'Bad Request'}
cqi_client_lock.acquire()
try:
fn_return_value = fn(**fn_args)
except BrokenPipeError as e:
return {'code': 500, 'msg': 'Internal Server Error'}
except CQiException as e:
return {
'code': 502,
'msg': 'Bad Gateway',
'payload': {
'code': e.code,
'desc': e.description,
'msg': e.__class__.__name__
}
}
finally:
cqi_client_lock.release()
if isinstance(fn_return_value, CQiStatus):
payload = {
'code': fn_return_value.code,
'msg': fn_return_value.__class__.__name__
}
else:
payload = fn_return_value
return {'code': 200, 'msg': 'OK', 'payload': payload}
def on_disconnect(self):
try:
corpus_id = SessionManager.get_corpus_id()
cqi_client = SessionManager.get_cqi_client()
cqi_client_lock = SessionManager.get_cqi_client_lock()
SessionManager.teardown()
except KeyError:
return
cqi_client_lock.acquire()
try:
cqi_client.api.ctrl_bye()
except (BrokenPipeError, CQiException):
pass
cqi_client_lock.release()
corpus = Corpus.query.get(corpus_id)
if corpus is None:
return
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()