mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-07-07 21:23:17 +00:00
Move Socket.IO Namespaces to dedicated directory
This commit is contained in:
223
app/namespaces/cqi_over_sio/__init__.py
Normal file
223
app/namespaces/cqi_over_sio/__init__.py
Normal file
@ -0,0 +1,223 @@
|
||||
from cqi import CQiClient
|
||||
from cqi.errors import CQiException
|
||||
from cqi.status import CQiStatus
|
||||
from flask import current_app
|
||||
from flask_login import current_user
|
||||
from flask_socketio import Namespace
|
||||
from inspect import signature
|
||||
from threading import Lock
|
||||
from app import db, docker_client, hashids, socketio
|
||||
from app.decorators import socketio_login_required
|
||||
from app.models import Corpus, CorpusStatus
|
||||
from . import extensions
|
||||
from .utils import CQiOverSocketIOSessionManager
|
||||
|
||||
|
||||
'''
|
||||
This package tunnels the Corpus Query interface (CQi) protocol through
|
||||
Socket.IO (SIO) by tunneling CQi API calls through an event called "exec".
|
||||
|
||||
Basic concept:
|
||||
1. A client connects to the namespace.
|
||||
2. The client emits the "init" event and provides a corpus id for the corpus
|
||||
that should be analysed in this session.
|
||||
1.1 The analysis session counter of the corpus is incremented.
|
||||
1.2 A CQiClient and a (Mutex) Lock belonging to it is created.
|
||||
1.3 Wait until the CQP server is running.
|
||||
1.4 Connect the CQiClient to the server.
|
||||
1.5 Save the CQiClient, the Lock and the corpus id in the session for
|
||||
subsequential use.
|
||||
3. The client emits "exec" events, within which it provides the name of a CQi
|
||||
API function and the corresponding arguments.
|
||||
3.1 The "exec" event handler will execute the function, make sure that
|
||||
the result is serializable and returns the result back to the client.
|
||||
4. The client disconnects from the namespace
|
||||
4.1 The analysis session counter of the corpus is decremented.
|
||||
4.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
|
||||
'''
|
||||
|
||||
|
||||
CQI_API_FUNCTION_NAMES = [
|
||||
'ask_feature_cl_2_3',
|
||||
'ask_feature_cqi_1_0',
|
||||
'ask_feature_cqp_2_3',
|
||||
'cl_alg2cpos',
|
||||
'cl_attribute_size',
|
||||
'cl_cpos2alg',
|
||||
'cl_cpos2id',
|
||||
'cl_cpos2lbound',
|
||||
'cl_cpos2rbound',
|
||||
'cl_cpos2str',
|
||||
'cl_cpos2struc',
|
||||
'cl_drop_attribute',
|
||||
'cl_id2cpos',
|
||||
'cl_id2freq',
|
||||
'cl_id2str',
|
||||
'cl_idlist2cpos',
|
||||
'cl_lexicon_size',
|
||||
'cl_regex2id',
|
||||
'cl_str2id',
|
||||
'cl_struc2cpos',
|
||||
'cl_struc2str',
|
||||
'corpus_alignment_attributes',
|
||||
'corpus_charset',
|
||||
'corpus_drop_corpus',
|
||||
'corpus_full_name',
|
||||
'corpus_info',
|
||||
'corpus_list_corpora',
|
||||
'corpus_positional_attributes',
|
||||
'corpus_properties',
|
||||
'corpus_structural_attribute_has_values',
|
||||
'corpus_structural_attributes',
|
||||
'cqp_drop_subcorpus',
|
||||
'cqp_dump_subcorpus',
|
||||
'cqp_fdist_1',
|
||||
'cqp_fdist_2',
|
||||
'cqp_list_subcorpora',
|
||||
'cqp_query',
|
||||
'cqp_subcorpus_has_field',
|
||||
'cqp_subcorpus_size',
|
||||
'ctrl_bye',
|
||||
'ctrl_connect',
|
||||
'ctrl_last_general_error',
|
||||
'ctrl_ping',
|
||||
'ctrl_user_abort'
|
||||
]
|
||||
|
||||
|
||||
class CQiOverSocketIONamespace(Namespace):
|
||||
@socketio_login_required
|
||||
def on_connect(self):
|
||||
pass
|
||||
|
||||
@socketio_login_required
|
||||
def on_init(self, corpus_hashid: str) -> dict:
|
||||
corpus_id = hashids.decode(corpus_hashid)
|
||||
|
||||
if not isinstance(corpus_id, int):
|
||||
return {'code': 400, 'msg': 'Bad Request'}
|
||||
|
||||
corpus = Corpus.query.get(corpus_id)
|
||||
|
||||
if corpus is None:
|
||||
return {'code': 404, 'msg': 'Not Found'}
|
||||
|
||||
if not (
|
||||
corpus.user == current_user
|
||||
or current_user.is_following_corpus(corpus)
|
||||
or current_user.is_administrator
|
||||
):
|
||||
return {'code': 403, 'msg': 'Forbidden'}
|
||||
|
||||
if corpus.status not in [
|
||||
CorpusStatus.BUILT,
|
||||
CorpusStatus.STARTING_ANALYSIS_SESSION,
|
||||
CorpusStatus.RUNNING_ANALYSIS_SESSION,
|
||||
CorpusStatus.CANCELING_ANALYSIS_SESSION
|
||||
]:
|
||||
return {'code': 424, 'msg': 'Failed Dependency'}
|
||||
|
||||
corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
|
||||
db.session.commit()
|
||||
retry_counter = 20
|
||||
while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
|
||||
if retry_counter == 0:
|
||||
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
|
||||
db.session.commit()
|
||||
return {'code': 408, 'msg': 'Request Timeout'}
|
||||
socketio.sleep(3)
|
||||
retry_counter -= 1
|
||||
db.session.refresh(corpus)
|
||||
|
||||
cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}'
|
||||
cqpserver_container = docker_client.containers.get(cqpserver_container_name)
|
||||
cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress']
|
||||
cqi_client = CQiClient(cqpserver_ip_address)
|
||||
cqi_client_lock = Lock()
|
||||
|
||||
CQiOverSocketIOSessionManager.setup()
|
||||
CQiOverSocketIOSessionManager.set_corpus_id(corpus_id)
|
||||
CQiOverSocketIOSessionManager.set_cqi_client(cqi_client)
|
||||
CQiOverSocketIOSessionManager.set_cqi_client_lock(cqi_client_lock)
|
||||
|
||||
return {'code': 200, 'msg': 'OK'}
|
||||
|
||||
@socketio_login_required
|
||||
def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict:
|
||||
try:
|
||||
cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
|
||||
cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock()
|
||||
except KeyError:
|
||||
return {'code': 424, 'msg': 'Failed Dependency'}
|
||||
|
||||
if fn_name in CQI_API_FUNCTION_NAMES:
|
||||
fn = getattr(cqi_client.api, fn_name)
|
||||
elif fn_name in extensions.CQI_EXTENSION_FUNCTION_NAMES:
|
||||
fn = getattr(extensions, fn_name)
|
||||
else:
|
||||
return {'code': 400, 'msg': 'Bad Request'}
|
||||
|
||||
for param in signature(fn).parameters.values():
|
||||
# Check if the parameter is optional or required
|
||||
if param.default is param.empty:
|
||||
if param.name not in fn_args:
|
||||
return {'code': 400, 'msg': 'Bad Request'}
|
||||
else:
|
||||
if param.name not in fn_args:
|
||||
continue
|
||||
if type(fn_args[param.name]) is not param.annotation:
|
||||
return {'code': 400, 'msg': 'Bad Request'}
|
||||
|
||||
cqi_client_lock.acquire()
|
||||
try:
|
||||
fn_return_value = fn(**fn_args)
|
||||
except BrokenPipeError as e:
|
||||
return {'code': 500, 'msg': 'Internal Server Error'}
|
||||
except CQiException as e:
|
||||
return {
|
||||
'code': 502,
|
||||
'msg': 'Bad Gateway',
|
||||
'payload': {
|
||||
'code': e.code,
|
||||
'desc': e.description,
|
||||
'msg': e.__class__.__name__
|
||||
}
|
||||
}
|
||||
finally:
|
||||
cqi_client_lock.release()
|
||||
|
||||
if isinstance(fn_return_value, CQiStatus):
|
||||
payload = {
|
||||
'code': fn_return_value.code,
|
||||
'msg': fn_return_value.__class__.__name__
|
||||
}
|
||||
else:
|
||||
payload = fn_return_value
|
||||
|
||||
return {'code': 200, 'msg': 'OK', 'payload': payload}
|
||||
|
||||
def on_disconnect(self):
|
||||
try:
|
||||
corpus_id = CQiOverSocketIOSessionManager.get_corpus_id()
|
||||
cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
|
||||
cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock()
|
||||
CQiOverSocketIOSessionManager.teardown()
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
cqi_client_lock.acquire()
|
||||
|
||||
try:
|
||||
cqi_client.api.ctrl_bye()
|
||||
except (BrokenPipeError, CQiException):
|
||||
pass
|
||||
|
||||
cqi_client_lock.release()
|
||||
|
||||
corpus = Corpus.query.get(corpus_id)
|
||||
|
||||
if corpus is None:
|
||||
return
|
||||
|
||||
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
|
||||
db.session.commit()
|
Reference in New Issue
Block a user