Cleanup in cqi over socketio

This commit is contained in:
Patrick Jentsch 2023-07-13 12:42:47 +02:00
parent b7483af8e9
commit 4ae4b88a44
9 changed files with 244 additions and 279 deletions

View File

@ -74,6 +74,8 @@ def create_app(config: Config = Config) -> Flask:
app.register_blueprint(contributions_blueprint, url_prefix='/contributions')
from .corpora import bp as corpora_blueprint
from .corpora.cqi_over_sio import CQiNamespace
socketio.on_namespace(CQiNamespace('/cqi_over_sio'))
default_breadcrumb_root(corpora_blueprint, '.corpora')
app.register_blueprint(corpora_blueprint, cli_group='corpus', url_prefix='/corpora')

View File

@ -16,4 +16,4 @@ def before_request():
pass
from . import cli, cqi_over_sio, files, followers, routes, json_routes
from . import cli, files, followers, routes, json_routes

View File

@ -1,113 +1,200 @@
from cqi import CQiClient
from cqi.errors import CQiException
from cqi.status import CQiStatus
from flask import session
from flask_login import current_user
from flask_socketio import ConnectionRefusedError
from flask_socketio import Namespace
from inspect import signature
from threading import Lock
from typing import Callable, Dict, List
import math
from app import db, hashids, socketio
from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus
import math
from . import extensions
'''
This package tunnels the Corpus Query interface (CQi) protocol through
Socket.IO (SIO) by wrapping each CQi function in a seperate SIO event.
This module only handles the SIO connect/disconnect, which handles the setup
and teardown of necessary ressources for later use. Each CQi function has a
corresponding SIO event. The event handlers are spread across the different
modules within this package.
Socket.IO (SIO) by tunneling CQi API calls through an event called "exec".
Basic concept:
1. A client connects to the SIO namespace and provides the id of a corpus to be
analysed.
1. A client connects to the "/cqi_over_sio" namespace.
2. The client emits the "init" event and provides a corpus id for the corpus
that should be analysed in this session.
1.1 The analysis session counter of the corpus is incremented.
1.2 A CQiClient and a (Mutex) Lock belonging to it is created.
1.3 Wait until the CQP server is running.
1.4 Connect the CQiClient to the server.
1.5 Save the CQiClient and the Lock in the session for subsequential use.
2. A client emits an event and may provide a single json object with necessary
arguments for the targeted CQi function.
3. A SIO event handler (decorated with cqi_over_socketio) gets executed.
- The event handler function defines all arguments. Hence the client
is sent as a single json object, the decorator decomposes it to fit
the functions signature. This also includes type checking and proper
use of the lock (acquire/release) mechanism.
1.5 Save the CQiClient, the Lock and the corpus id in the session for
subsequential use.
2. The client emits the "exec" event provides the name of a CQi API function
arguments (optional).
- The event "exec" handler will execute the function, make sure that the
result is serializable and returns the result back to the client.
4. Wait for more events
5. The client disconnects from the SIO namespace
5. The client disconnects from the "/cqi_over_sio" namespace
1.1 The analysis session counter of the corpus is decremented.
1.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
'''
NAMESPACE = '/cqi_over_sio'
CQI_API_FUNCTION_NAMES: List[str] = [
'ask_feature_cl_2_3',
'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3',
'cl_alg2cpos',
'cl_attribute_size',
'cl_cpos2alg',
'cl_cpos2id',
'cl_cpos2lbound',
'cl_cpos2rbound',
'cl_cpos2str',
'cl_cpos2struc',
'cl_drop_attribute',
'cl_id2cpos',
'cl_id2freq',
'cl_id2str',
'cl_idlist2cpos',
'cl_lexicon_size',
'cl_regex2id',
'cl_str2id',
'cl_struc2cpos',
'cl_struc2str',
'corpus_alignment_attributes',
'corpus_charset',
'corpus_drop_corpus',
'corpus_full_name',
'corpus_info',
'corpus_list_corpora',
'corpus_positional_attributes',
'corpus_properties',
'corpus_structural_attribute_has_values',
'corpus_structural_attributes',
'cqp_drop_subcorpus',
'cqp_dump_subcorpus',
'cqp_fdist_1',
'cqp_fdist_2',
'cqp_list_subcorpora',
'cqp_query',
'cqp_subcorpus_has_field',
'cqp_subcorpus_size',
'ctrl_bye',
'ctrl_connect',
'ctrl_last_general_error',
'ctrl_ping',
'ctrl_user_abort'
]
from .cqi import * # noqa
@socketio.on('connect', namespace=NAMESPACE)
@socketio_login_required
def connect(auth):
# the auth variable is used in a hacky way. It contains the corpus id for
# which a corpus analysis session should be started.
corpus_id = hashids.decode(auth['corpus_id'])
corpus = Corpus.query.get(corpus_id)
if corpus is None:
# return {'code': 404, 'msg': 'Not Found'}
raise ConnectionRefusedError('Not Found')
if not (corpus.user == current_user
or current_user.is_following_corpus(corpus)
or current_user.is_administrator()):
# return {'code': 403, 'msg': 'Forbidden'}
raise ConnectionRefusedError('Forbidden')
if corpus.status not in [
CorpusStatus.BUILT,
CorpusStatus.STARTING_ANALYSIS_SESSION,
CorpusStatus.RUNNING_ANALYSIS_SESSION,
CorpusStatus.CANCELING_ANALYSIS_SESSION
]:
# return {'code': 424, 'msg': 'Failed Dependency'}
raise ConnectionRefusedError('Failed Dependency')
if corpus.num_analysis_sessions is None:
corpus.num_analysis_sessions = 0
db.session.commit()
corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit()
retry_counter = 20
while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
if retry_counter == 0:
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
return {'code': 408, 'msg': 'Request Timeout'}
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(corpus)
cqi_client = CQiClient(f'cqpserver_{corpus_id}', timeout=math.inf)
session['cqi_over_sio'] = {
'corpus_id': corpus_id,
'cqi_client': cqi_client,
'cqi_client_lock': Lock(),
}
# return {'code': 200, 'msg': 'OK'}
@socketio.on('disconnect', namespace=NAMESPACE)
def disconnect():
try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock']
except KeyError:
return
cqi_client_lock.acquire()
try:
cqi_client.api.ctrl_bye()
except (BrokenPipeError, CQiException):
class CQiNamespace(Namespace):
@socketio_login_required
def on_connect(self):
pass
cqi_client_lock.release()
corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
session.pop('cqi_over_sio')
# return {'code': 200, 'msg': 'OK'}
@socketio_login_required
def on_init(self, db_corpus_hashid: str):
db_corpus_id = hashids.decode(db_corpus_hashid)
db_corpus = Corpus.query.get(db_corpus_id)
if db_corpus is None:
return {'code': 404, 'msg': 'Not Found'}
if not (db_corpus.user == current_user
or current_user.is_following_corpus(db_corpus)
or current_user.is_administrator()):
return {'code': 403, 'msg': 'Forbidden'}
if db_corpus.status not in [
CorpusStatus.BUILT,
CorpusStatus.STARTING_ANALYSIS_SESSION,
CorpusStatus.RUNNING_ANALYSIS_SESSION,
CorpusStatus.CANCELING_ANALYSIS_SESSION
]:
return {'code': 424, 'msg': 'Failed Dependency'}
if db_corpus.num_analysis_sessions is None:
db_corpus.num_analysis_sessions = 0
db.session.commit()
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit()
retry_counter = 20
while db_corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
if retry_counter == 0:
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()
return {'code': 408, 'msg': 'Request Timeout'}
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(db_corpus)
cqi_client = CQiClient(f'cqpserver_{db_corpus_id}', timeout=math.inf)
session['cqi_over_sio'] = {}
session['cqi_over_sio']['cqi_client'] = cqi_client
session['cqi_over_sio']['cqi_client_lock'] = Lock()
session['cqi_over_sio']['db_corpus_id'] = db_corpus_id
return {'code': 200, 'msg': 'OK'}
@socketio_login_required
def on_exec(self, fn_name: str, fn_args: Dict = {}):
try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock']
except KeyError:
return {'code': 424, 'msg': 'Failed Dependency'}
if fn_name in CQI_API_FUNCTION_NAMES:
fn: Callable = getattr(cqi_client.api, fn_name)
elif fn_name in extensions.CQI_EXTENSION_FUNCTION_NAMES:
fn: Callable = getattr(extensions, fn_name)
else:
return {'code': 400, 'msg': 'Bad Request'}
for param in signature(fn).parameters.values():
if param.default is param.empty:
if param.name not in fn_args:
return {'code': 400, 'msg': 'Bad Request'}
else:
if param.name not in fn_args:
continue
if type(fn_args[param.name]) is not param.annotation:
return {'code': 400, 'msg': 'Bad Request'}
cqi_client_lock.acquire()
try:
fn_return_value = fn(**fn_args)
except BrokenPipeError as e:
return {'code': 500, 'msg': 'Internal Server Error'}
except CQiException as e:
return {
'code': 502,
'msg': 'Bad Gateway',
'payload': {
'code': e.code,
'desc': e.description,
'msg': e.__class__.__name__
}
}
finally:
cqi_client_lock.release()
if isinstance(fn_return_value, CQiStatus):
payload = {
'code': fn_return_value.code,
'msg': fn_return_value.__class__.__name__
}
else:
payload = fn_return_value
return {'code': 200, 'msg': 'OK', 'payload': payload}
def on_disconnect(self):
try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock']
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id']
except KeyError:
return
cqi_client_lock.acquire()
try:
session.pop('cqi_over_sio')
except KeyError:
pass
try:
cqi_client.api.ctrl_bye()
except (BrokenPipeError, CQiException):
pass
cqi_client_lock.release()
db_corpus = Corpus.query.get(db_corpus_id)
if db_corpus is not None:
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit()

View File

@ -1,110 +0,0 @@
from cqi import CQiClient
from cqi.errors import CQiException
from cqi.status import CQiStatus
from flask import session
from inspect import signature
from threading import Lock
from typing import Callable, Dict, List
from app import socketio
from app.decorators import socketio_login_required
from . import NAMESPACE as ns
from .extensions import CQI_EXTENSION_FUNCTION_NAMES
from . import extensions as extensions_module
CQI_FUNCTION_NAMES: List[str] = [
'ask_feature_cl_2_3',
'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3',
'cl_alg2cpos',
'cl_attribute_size',
'cl_cpos2alg',
'cl_cpos2id',
'cl_cpos2lbound',
'cl_cpos2rbound',
'cl_cpos2str',
'cl_cpos2struc',
'cl_drop_attribute',
'cl_id2cpos',
'cl_id2freq',
'cl_id2str',
'cl_idlist2cpos',
'cl_lexicon_size',
'cl_regex2id',
'cl_str2id',
'cl_struc2cpos',
'cl_struc2str',
'corpus_alignment_attributes',
'corpus_charset',
'corpus_drop_corpus',
'corpus_full_name',
'corpus_info',
'corpus_list_corpora',
'corpus_positional_attributes',
'corpus_properties',
'corpus_structural_attribute_has_values',
'corpus_structural_attributes',
'cqp_drop_subcorpus',
'cqp_dump_subcorpus',
'cqp_fdist_1',
'cqp_fdist_2',
'cqp_list_subcorpora',
'cqp_query',
'cqp_subcorpus_has_field',
'cqp_subcorpus_size',
'ctrl_bye',
'ctrl_connect',
'ctrl_last_general_error',
'ctrl_ping',
'ctrl_user_abort'
]
@socketio.on('cqi', namespace=ns)
@socketio_login_required
def cqi_over_sio(fn_name: str, fn_args: Dict = {}):
try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock']
except KeyError:
return {'code': 424, 'msg': 'Failed Dependency'}
if fn_name in CQI_FUNCTION_NAMES:
fn: Callable = getattr(cqi_client.api, fn_name)
elif fn_name in CQI_EXTENSION_FUNCTION_NAMES:
fn: Callable = getattr(extensions_module, fn_name)
else:
return {'code': 400, 'msg': 'Bad Request'}
for param in signature(fn).parameters.values():
if param.default is param.empty:
if param.name not in fn_args:
return {'code': 400, 'msg': 'Bad Request'}
else:
if param.name not in fn_args:
continue
if type(fn_args[param.name]) is not param.annotation:
return {'code': 400, 'msg': 'Bad Request'}
cqi_client_lock.acquire()
try:
fn_return_value = fn(**fn_args)
except BrokenPipeError as e:
return {'code': 500, 'msg': 'Internal Server Error'}
except CQiException as e:
return {
'code': 502,
'msg': 'Bad Gateway',
'payload': {
'code': e.code,
'desc': e.description,
'msg': e.__class__.__name__
}
}
finally:
cqi_client_lock.release()
if isinstance(fn_return_value, CQiStatus):
payload = {
'code': fn_return_value.code,
'msg': fn_return_value.__class__.__name__
}
else:
payload = fn_return_value
return {'code': 200, 'msg': 'OK', 'payload': payload}

View File

@ -28,8 +28,9 @@ CQI_EXTENSION_FUNCTION_NAMES: List[str] = [
def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id']
db_corpus: Corpus = Corpus.query.get(db_corpus_id)
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
db_corpus.num_tokens = cqi_corpus.size
db.session.commit()
@ -37,10 +38,11 @@ def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
def ext_corpus_static_data(corpus: str) -> Dict:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
static_corpus_data_file = os.path.join(db_corpus.path, 'cwb', 'static.json.gz')
if os.path.exists(static_corpus_data_file):
with open(static_corpus_data_file, 'rb') as f:
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id']
db_corpus: Corpus = Corpus.query.get(db_corpus_id)
cache_file_path: str = os.path.join(db_corpus.path, 'cwb', 'static.json.gz')
if os.path.exists(cache_file_path):
with open(cache_file_path, 'rb') as f:
return f.read()
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
@ -189,10 +191,10 @@ def ext_corpus_static_data(corpus: str) -> Dict:
} for s_attr_id_idx, s_attr_id in enumerate(range(0, s_attr.size))
}
del sub_s_attr_values
with gzip.open(static_corpus_data_file, 'wt') as f:
with gzip.open(cache_file_path, 'wt') as f:
json.dump(static_corpus_data, f)
del static_corpus_data
with open(static_corpus_data_file, 'rb') as f:
with open(cache_file_path, 'rb') as f:
return f.read()

View File

@ -1,13 +1,15 @@
class CorpusAnalysisApp {
constructor(corpusId) {
this.corpusId = corpusId;
this.data = {};
// HTML elements
this.elements = {
container: document.querySelector('#corpus-analysis-app-container'),
extensionCards: document.querySelector('#corpus-analysis-app-extension-cards'),
extensionTabs: document.querySelector('#corpus-analysis-app-extension-tabs'),
initModal: document.querySelector('#corpus-analysis-app-init-modal'),
overview: document.querySelector('#corpus-analysis-app-overview')
initModal: document.querySelector('#corpus-analysis-app-init-modal')
};
// Materialize elements
this.elements.m = {
@ -17,52 +19,49 @@ class CorpusAnalysisApp {
this.extensions = {};
this.settings = {
corpusId: corpusId
};
this.settings = {};
}
init() {
async init() {
this.disableActionElements();
this.elements.m.initModal.open();
// Init data
this.data.cqiClient = new cqi.CQiClient('/cqi_over_sio', this.settings.corpusId);
this.data.cqiClient.connect('anonymous', '')
.then((cqiStatus) => {
return this.data.cqiClient.corpora.list();
})
.then((cqiCorpora) => {
this.data.corpus = {o: cqiCorpora[0]};
console.log(this.data.corpus.o.staticData);
// this.renderGeneralCorpusInfo();
// this.renderTextInfoList();
// this.renderTextProportionsGraphic()
// this.renderFrequenciesGraphic();
// this.renderBoundsGraphic();
// TODO: Don't do this hgere
this.data.corpus.o.updateDb();
this.enableActionElements();
for (let extension of Object.values(this.extensions)) {extension.init();}
this.elements.m.initModal.close();
},
(cqiError) => {
let errorString = `${cqiError.code}: ${cqiError.constructor.name}`;
let errorsElement = this.elements.initModal.querySelector('.errors');
let progressElement = this.elements.initModal.querySelector('.progress');
errorsElement.innerText = errorString;
errorsElement.classList.remove('hide');
app.flash(errorString, 'error');
progressElement.classList.add('hide');
}
);
// Add event listeners
for (let extensionSelectorElement of this.elements.overview.querySelectorAll('.extension-selector')) {
// Setup CQi over SocketIO connection and gather data from the CQPServer
let cqiClient;
let cqiCorpus;
try {
cqiClient = new cqi.CQiClient('/cqi_over_sio');
let response = await cqiClient.api.socket.emitWithAck('init', this.corpusId);
if (response.code !== 200) {throw new Error();}
await cqiClient.connect('anonymous', '');
cqiCorpus = await cqiClient.corpora.get(`NOPAQUE-${this.corpusId.toUpperCase()}`);
// TODO: Don't do this hgere
await cqiCorpus.updateDb();
} catch (error) {
// TODO: Currently we can only handle CQiErrors here,
// but we should also handle other errors.
let errorString = `${error.code}: ${error.constructor.name}`;
let errorsElement = this.elements.initModal.querySelector('.errors');
let progressElement = this.elements.initModal.querySelector('.progress');
errorsElement.innerText = errorString;
errorsElement.classList.remove('hide');
progressElement.classList.add('hide');
return;
}
this.data.cqiClient = cqiClient;
this.data.cqiCorpus = cqiCorpus;
this.data.corpus = {o: cqiCorpus}; // legacy
// Initialize extensions
for (let extension of Object.values(this.extensions)) {extension.init();}
for (let extensionSelectorElement of this.elements.extensionCards.querySelectorAll('.extension-selector')) {
extensionSelectorElement.addEventListener('click', () => {
this.elements.m.extensionTabs.select(extensionSelectorElement.dataset.target);
});
}
this.enableActionElements();
this.elements.m.initModal.close();
}
registerExtension(extension) {
@ -71,7 +70,6 @@ class CorpusAnalysisApp {
return;
}
this.extensions[extension.name] = extension;
if ('cQiClient' in this.data && this.data.cQiClient.connected) {extension.init();}
}
disableActionElements() {

View File

@ -1,18 +1,16 @@
cqi.api.APIClient = class APIClient {
/**
* @param {string} host
* @param {string} corpusId
* @param {number} [timeout=60] timeout
* @param {string} [version=0.1] version
*/
constructor(host, corpus_id, timeout = 60, version = '0.1') {
constructor(host, timeout = 60, version = '0.1') {
this.host = host;
this.timeout = timeout * 1000; // convert seconds to milliseconds
this.version = version;
this.socket = io(
this.host,
{
auth: {corpus_id: corpus_id},
transports: ['websocket'],
upgrade: false
}
@ -24,22 +22,16 @@ cqi.api.APIClient = class APIClient {
* @param {object} [fn_args={}]
* @returns {Promise}
*/
#request(fn_name, fn_args = {}) {
return new Promise((resolve, reject) => {
// this.socket.timeout(this.timeout).emit('cqi', {fn_name: fn_name, fn_args: fn_args}, (timeoutError, response) => {
// if (timeoutError) {
// reject(timeoutError);
// }
this.socket.emit('cqi', fn_name, fn_args, (response) => {
if (response.code === 200) {
resolve(response.payload);
} else if (response.code === 500) {
reject(new Error(`[${response.code}] ${response.msg}`));
} else if (response.code === 502) {
reject(new cqi.errors.lookup[response.payload.code]());
}
});
});
async #request(fn_name, fn_args = {}) {
// TODO: implement timeout
let response = await this.socket.emitWithAck('exec', fn_name, fn_args);
if (response.code === 200) {
return response.payload;
} else if (response.code === 500) {
throw new Error(`[${response.code}] ${response.msg}`);
} else if (response.code === 502) {
throw new cqi.errors.lookup[response.payload.code]();
}
}
/**

View File

@ -1,13 +1,12 @@
cqi.CQiClient = class CQiClient {
/**
* @param {string} host
* @param {string} corpusId
* @param {number} [timeout=60] timeout
* @param {string} [version=0.1] version
*/
constructor(host, corpusId, timeout = 60, version = '0.1') {
constructor(host, timeout = 60, version = '0.1') {
/** @type {cqi.api.APIClient} */
this.api = new cqi.api.APIClient(host, corpusId, timeout, version);
this.api = new cqi.api.APIClient(host, timeout, version);
}
/**

View File

@ -11,20 +11,17 @@
{% block page_content %}
<ul class="row tabs no-autoinit" id="corpus-analysis-app-extension-tabs">
<li class="tab col s3"><a class="active" href="#corpus-analysis-app-overview"><i class="nopaque-icons service-icons left" data-service="corpus-analysis"></i>Corpus analysis</a></li>
{% for extension in extensions %}
{% if extension.name != 'Static Visualization' %}
<li class="tab col s3"><a class="active" href="#corpus-analysis-app-home-container"><i class="nopaque-icons service-icons left" data-service="corpus-analysis"></i>Corpus analysis</a></li>
{% for extension in extensions if extension.name != 'Static Visualization' %}
<li class="tab col s3"><a href="#{{ extension.id_prefix }}-container">{{ extension.tab_content }}</a></li>
{% endif %}
{% endfor %}
</ul>
<div class="row" id="corpus-analysis-app-overview">
<div class="col s12">
<h1>{{ title }}</h1>
<div id="corpus-analysis-app-home-container">
<h1>{{ title }}</h1>
{% for extension in extensions %}
{% if extension.name != 'Static Visualization' %}
<div class="row" id="corpus-analysis-app-extension-cards">
{% for extension in extensions if extension.name != 'Static Visualization' %}
<div class="col s3">
<div class="card extension-selector hoverable" data-target="{{ extension.id_prefix }}-container">
<div class="card-content">
@ -33,9 +30,7 @@
</div>
</div>
</div>
{% endif %}
{% endfor %}
</div>
</div>