Add a full featured cqi Javascript client for cqi_over_sio

This commit is contained in:
Patrick Jentsch
2023-06-29 12:09:28 +02:00
parent e816a2fb15
commit efa8712cd9
16 changed files with 1799 additions and 1 deletions

View File

@ -17,3 +17,4 @@ def before_request():
from . import cli, cqi_over_socketio, files, followers, routes, json_routes
from . import cqi_over_sio

View File

@ -0,0 +1,109 @@
from cqi import CQiClient
from cqi.errors import CQiException
from flask import session
from flask_login import current_user
from flask_socketio import ConnectionRefusedError
from threading import Lock
from app import db, hashids, socketio
from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus
'''
This package tunnels the Corpus Query interface (CQi) protocol through
Socket.IO (SIO) by wrapping each CQi function in a seperate SIO event.
This module only handles the SIO connect/disconnect, which handles the setup
and teardown of necessary ressources for later use. Each CQi function has a
corresponding SIO event. The event handlers are spread across the different
modules within this package.
Basic concept:
1. A client connects to the SIO namespace and provides the id of a corpus to be
analysed.
1.1 The analysis session counter of the corpus is incremented.
1.2 A CQiClient and a (Mutex) Lock belonging to it is created.
1.3 Wait until the CQP server is running.
1.4 Connect the CQiClient to the server.
1.5 Save the CQiClient and the Lock in the session for subsequential use.
2. A client emits an event and may provide a single json object with necessary
arguments for the targeted CQi function.
3. A SIO event handler (decorated with cqi_over_socketio) gets executed.
- The event handler function defines all arguments. Hence the client
is sent as a single json object, the decorator decomposes it to fit
the functions signature. This also includes type checking and proper
use of the lock (acquire/release) mechanism.
4. Wait for more events
5. The client disconnects from the SIO namespace
1.1 The analysis session counter of the corpus is decremented.
1.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
'''
NAMESPACE = '/cqi_over_sio'
from .cqi import * # noqa
@socketio.on('connect', namespace=NAMESPACE)
@socketio_login_required
def connect(auth):
# the auth variable is used in a hacky way. It contains the corpus id for
# which a corpus analysis session should be started.
corpus_id = hashids.decode(auth['corpus_id'])
corpus = Corpus.query.get(corpus_id)
if corpus is None:
# return {'code': 404, 'msg': 'Not Found'}
raise ConnectionRefusedError('Not Found')
if not (corpus.user == current_user
or current_user.is_following_corpus(corpus)
or current_user.is_administrator()):
# return {'code': 403, 'msg': 'Forbidden'}
raise ConnectionRefusedError('Forbidden')
if corpus.status not in [
CorpusStatus.BUILT,
CorpusStatus.STARTING_ANALYSIS_SESSION,
CorpusStatus.RUNNING_ANALYSIS_SESSION,
CorpusStatus.CANCELING_ANALYSIS_SESSION
]:
# return {'code': 424, 'msg': 'Failed Dependency'}
raise ConnectionRefusedError('Failed Dependency')
if corpus.num_analysis_sessions is None:
corpus.num_analysis_sessions = 0
db.session.commit()
corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit()
retry_counter = 20
while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
if retry_counter == 0:
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
return {'code': 408, 'msg': 'Request Timeout'}
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(corpus)
cqi_client = CQiClient(f'cqpserver_{corpus_id}')
session['d'] = {
'corpus_id': corpus_id,
'cqi_client': cqi_client,
'cqi_client_lock': Lock(),
}
# return {'code': 200, 'msg': 'OK'}
@socketio.on('disconnect', namespace=NAMESPACE)
def disconnect():
if 'd' not in session:
return
session['d']['cqi_client_lock'].acquire()
try:
session['d']['cqi_client'].api.ctrl_bye()
except (BrokenPipeError, CQiException):
pass
session['d']['cqi_client_lock'].release()
corpus = Corpus.query.get(session['d']['corpus_id'])
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
session.pop('d')
# return {'code': 200, 'msg': 'OK'}

View File

@ -0,0 +1,111 @@
from cqi import APIClient
from cqi.errors import CQiException
from cqi.status import CQiStatus
from flask import session
from inspect import signature
from typing import Callable, Dict, List
from app import socketio
from app.decorators import socketio_login_required
from . import NAMESPACE as ns
CQI_API_FUNCTIONS: List[str] = [
'ask_feature_cl_2_3',
'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3',
'cl_alg2cpos',
'cl_attribute_size',
'cl_cpos2alg',
'cl_cpos2id',
'cl_cpos2lbound',
'cl_cpos2rbound',
'cl_cpos2str',
'cl_cpos2struc',
'cl_drop_attribute',
'cl_id2cpos',
'cl_id2freq',
'cl_id2str',
'cl_idlist2cpos',
'cl_lexicon_size',
'cl_regex2id',
'cl_str2id',
'cl_struc2cpos',
'cl_struc2str',
'corpus_alignment_attributes',
'corpus_charset',
'corpus_drop_corpus',
'corpus_full_name',
'corpus_info',
'corpus_list_corpora',
'corpus_positional_attributes',
'corpus_properties',
'corpus_structural_attribute_has_values',
'corpus_structural_attributes',
'cqp_drop_subcorpus',
'cqp_dump_subcorpus',
'cqp_fdist_1',
'cqp_fdist_2',
'cqp_list_subcorpora',
'cqp_query',
'cqp_subcorpus_has_field',
'cqp_subcorpus_size',
'ctrl_bye',
'ctrl_connect',
'ctrl_last_general_error',
'ctrl_ping',
'ctrl_user_abort'
]
@socketio.on('cqi_client.api', namespace=ns)
@socketio_login_required
def cqi_over_sio(fn_data):
fn_name: str = fn_data['fn_name']
fn_args: Dict = fn_data.get('fn_args', {})
print(f'{fn_name}({fn_args})')
if 'd' not in session:
return {'code': 424, 'msg': 'Failed Dependency'}
api_client: APIClient = session['d']['cqi_client'].api
if fn_name not in CQI_API_FUNCTIONS:
return {'code': 400, 'msg': 'Bad Request'}
try:
fn: Callable = getattr(api_client, fn_name)
except AttributeError:
return {'code': 400, 'msg': 'Bad Request'}
for param in signature(fn).parameters.values():
if param.default is param.empty:
if param.name not in fn_args:
return {'code': 400, 'msg': 'Bad Request'}
else:
if param.name not in fn_args:
continue
if type(fn_args[param.name]) is not param.annotation:
return {'code': 400, 'msg': 'Bad Request'}
session['d']['cqi_client_lock'].acquire()
try:
return_value = fn(**fn_args)
except BrokenPipeError:
return_value = {
'code': 500,
'msg': 'Internal Server Error'
}
except CQiException as e:
return_value = {
'code': 502,
'msg': 'Bad Gateway',
'payload': {
'code': e.code,
'desc': e.description,
'msg': e.__class__.__name__
}
}
finally:
session['d']['cqi_client_lock'].release()
if isinstance(return_value, CQiStatus):
payload = {
'code': return_value.code,
'msg': return_value.__class__.__name__
}
else:
payload = return_value
return {'code': 200, 'msg': 'OK', 'payload': payload}

View File

@ -49,7 +49,7 @@ def cqi_over_socketio(f):
'payload': {
'code': e.code,
'desc': e.description,
'msg': e.name
'msg': e.__class__.__name__
}
}
finally: