mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2024-12-26 19:34:19 +00:00
234 lines
7.5 KiB
Python
234 lines
7.5 KiB
Python
from cqi import CQiClient
|
|
from cqi.errors import CQiException
|
|
from cqi.status import CQiStatus
|
|
from flask import current_app
|
|
from flask_login import current_user
|
|
from flask_socketio import Namespace
|
|
from inspect import signature
|
|
from threading import Lock
|
|
from app import db, docker_client, hashids, socketio
|
|
from app.decorators import socketio_login_required
|
|
from app.models import Corpus, CorpusStatus
|
|
from . import cqi_extension_functions
|
|
from .utils import SessionManager
|
|
|
|
|
|
'''
|
|
This package tunnels the Corpus Query interface (CQi) protocol through
|
|
Socket.IO (SIO) by tunneling CQi API calls through an event called "exec".
|
|
|
|
Basic concept:
|
|
1. A client connects to the namespace.
|
|
2. The client emits the "init" event and provides a corpus id for the corpus
|
|
that should be analysed in this session.
|
|
1.1 The analysis session counter of the corpus is incremented.
|
|
1.2 A CQiClient and a (Mutex) Lock belonging to it is created.
|
|
1.3 Wait until the CQP server is running.
|
|
1.4 Connect the CQiClient to the server.
|
|
1.5 Save the CQiClient, the Lock and the corpus id in the session for
|
|
subsequential use.
|
|
3. The client emits "exec" events, within which it provides the name of a CQi
|
|
API function and the corresponding arguments.
|
|
3.1 The "exec" event handler will execute the function, make sure that
|
|
the result is serializable and returns the result back to the client.
|
|
4. The client disconnects from the namespace
|
|
4.1 The analysis session counter of the corpus is decremented.
|
|
4.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
|
|
'''
|
|
|
|
|
|
CQI_API_FUNCTION_NAMES = [
|
|
'ask_feature_cl_2_3',
|
|
'ask_feature_cqi_1_0',
|
|
'ask_feature_cqp_2_3',
|
|
'cl_alg2cpos',
|
|
'cl_attribute_size',
|
|
'cl_cpos2alg',
|
|
'cl_cpos2id',
|
|
'cl_cpos2lbound',
|
|
'cl_cpos2rbound',
|
|
'cl_cpos2str',
|
|
'cl_cpos2struc',
|
|
'cl_drop_attribute',
|
|
'cl_id2cpos',
|
|
'cl_id2freq',
|
|
'cl_id2str',
|
|
'cl_idlist2cpos',
|
|
'cl_lexicon_size',
|
|
'cl_regex2id',
|
|
'cl_str2id',
|
|
'cl_struc2cpos',
|
|
'cl_struc2str',
|
|
'corpus_alignment_attributes',
|
|
'corpus_charset',
|
|
'corpus_drop_corpus',
|
|
'corpus_full_name',
|
|
'corpus_info',
|
|
'corpus_list_corpora',
|
|
'corpus_positional_attributes',
|
|
'corpus_properties',
|
|
'corpus_structural_attribute_has_values',
|
|
'corpus_structural_attributes',
|
|
'cqp_drop_subcorpus',
|
|
'cqp_dump_subcorpus',
|
|
'cqp_fdist_1',
|
|
'cqp_fdist_2',
|
|
'cqp_list_subcorpora',
|
|
'cqp_query',
|
|
'cqp_subcorpus_has_field',
|
|
'cqp_subcorpus_size',
|
|
'ctrl_bye',
|
|
'ctrl_connect',
|
|
'ctrl_last_general_error',
|
|
'ctrl_ping',
|
|
'ctrl_user_abort'
|
|
]
|
|
|
|
|
|
CQI_EXTENSION_FUNCTION_NAMES = [
|
|
'ext_corpus_update_db',
|
|
'ext_corpus_static_data',
|
|
'ext_corpus_paginate_corpus',
|
|
'ext_cqp_paginate_subcorpus',
|
|
'ext_cqp_partial_export_subcorpus',
|
|
'ext_cqp_export_subcorpus',
|
|
]
|
|
|
|
|
|
class CQiOverSocketIONamespace(Namespace):
|
|
@socketio_login_required
|
|
def on_connect(self):
|
|
pass
|
|
|
|
@socketio_login_required
|
|
def on_init(self, corpus_hashid: str) -> dict:
|
|
corpus_id = hashids.decode(corpus_hashid)
|
|
|
|
if not isinstance(corpus_id, int):
|
|
return {'code': 400, 'msg': 'Bad Request'}
|
|
|
|
corpus = Corpus.query.get(corpus_id)
|
|
|
|
if corpus is None:
|
|
return {'code': 404, 'msg': 'Not Found'}
|
|
|
|
if not (
|
|
corpus.user == current_user
|
|
or current_user.is_following_corpus(corpus)
|
|
or current_user.is_administrator
|
|
):
|
|
return {'code': 403, 'msg': 'Forbidden'}
|
|
|
|
if corpus.status not in [
|
|
CorpusStatus.BUILT,
|
|
CorpusStatus.STARTING_ANALYSIS_SESSION,
|
|
CorpusStatus.RUNNING_ANALYSIS_SESSION,
|
|
CorpusStatus.CANCELING_ANALYSIS_SESSION
|
|
]:
|
|
return {'code': 424, 'msg': 'Failed Dependency'}
|
|
|
|
corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
|
|
db.session.commit()
|
|
retry_counter = 20
|
|
while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
|
|
if retry_counter == 0:
|
|
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
|
|
db.session.commit()
|
|
return {'code': 408, 'msg': 'Request Timeout'}
|
|
socketio.sleep(3)
|
|
retry_counter -= 1
|
|
db.session.refresh(corpus)
|
|
|
|
cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}'
|
|
cqpserver_container = docker_client.containers.get(cqpserver_container_name)
|
|
cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress']
|
|
cqi_client = CQiClient(cqpserver_ip_address)
|
|
cqi_client_lock = Lock()
|
|
|
|
SessionManager.setup()
|
|
SessionManager.set_corpus_id(corpus_id)
|
|
SessionManager.set_cqi_client(cqi_client)
|
|
SessionManager.set_cqi_client_lock(cqi_client_lock)
|
|
|
|
return {'code': 200, 'msg': 'OK'}
|
|
|
|
@socketio_login_required
|
|
def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict:
|
|
try:
|
|
cqi_client = SessionManager.get_cqi_client()
|
|
cqi_client_lock = SessionManager.get_cqi_client_lock()
|
|
except KeyError:
|
|
return {'code': 424, 'msg': 'Failed Dependency'}
|
|
|
|
if fn_name in CQI_API_FUNCTION_NAMES:
|
|
fn = getattr(cqi_client.api, fn_name)
|
|
elif fn_name in cqi_extension_functions.CQI_EXTENSION_FUNCTION_NAMES:
|
|
fn = getattr(cqi_extension_functions, fn_name)
|
|
else:
|
|
return {'code': 400, 'msg': 'Bad Request'}
|
|
|
|
for param in signature(fn).parameters.values():
|
|
# Check if the parameter is optional or required
|
|
if param.default is param.empty:
|
|
if param.name not in fn_args:
|
|
return {'code': 400, 'msg': 'Bad Request'}
|
|
else:
|
|
if param.name not in fn_args:
|
|
continue
|
|
if type(fn_args[param.name]) is not param.annotation:
|
|
return {'code': 400, 'msg': 'Bad Request'}
|
|
|
|
cqi_client_lock.acquire()
|
|
try:
|
|
fn_return_value = fn(**fn_args)
|
|
except BrokenPipeError as e:
|
|
return {'code': 500, 'msg': 'Internal Server Error'}
|
|
except CQiException as e:
|
|
return {
|
|
'code': 502,
|
|
'msg': 'Bad Gateway',
|
|
'payload': {
|
|
'code': e.code,
|
|
'desc': e.description,
|
|
'msg': e.__class__.__name__
|
|
}
|
|
}
|
|
finally:
|
|
cqi_client_lock.release()
|
|
|
|
if isinstance(fn_return_value, CQiStatus):
|
|
payload = {
|
|
'code': fn_return_value.code,
|
|
'msg': fn_return_value.__class__.__name__
|
|
}
|
|
else:
|
|
payload = fn_return_value
|
|
|
|
return {'code': 200, 'msg': 'OK', 'payload': payload}
|
|
|
|
def on_disconnect(self):
|
|
try:
|
|
corpus_id = SessionManager.get_corpus_id()
|
|
cqi_client = SessionManager.get_cqi_client()
|
|
cqi_client_lock = SessionManager.get_cqi_client_lock()
|
|
SessionManager.teardown()
|
|
except KeyError:
|
|
return
|
|
|
|
cqi_client_lock.acquire()
|
|
|
|
try:
|
|
cqi_client.api.ctrl_bye()
|
|
except (BrokenPipeError, CQiException):
|
|
pass
|
|
|
|
cqi_client_lock.release()
|
|
|
|
corpus = Corpus.query.get(corpus_id)
|
|
|
|
if corpus is None:
|
|
return
|
|
|
|
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
|
|
db.session.commit()
|