From 2a28f19660d22d75337d436489c7841291aa1590 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Wed, 6 Nov 2024 12:27:49 +0100 Subject: [PATCH] Move Socket.IO Namespaces to dedicated directory --- app/__init__.py | 4 +- app/blueprints/corpora/cqi_over_sio/utils.py | 130 ---------- .../cqi_over_sio/__init__.py | 109 +++++---- .../cqi_over_sio/extensions.py | 222 ++++++++++++++---- app/namespaces/cqi_over_sio/utils.py | 37 +++ 5 files changed, 273 insertions(+), 229 deletions(-) delete mode 100644 app/blueprints/corpora/cqi_over_sio/utils.py rename app/{blueprints/corpora => namespaces}/cqi_over_sio/__init__.py (68%) rename app/{blueprints/corpora => namespaces}/cqi_over_sio/extensions.py (53%) create mode 100644 app/namespaces/cqi_over_sio/utils.py diff --git a/app/__init__.py b/app/__init__.py index c3e8a0c2..eb4f7bd6 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -130,8 +130,8 @@ def create_app(config: Config = Config) -> Flask: # endregion Blueprints # region SocketIO Namespaces - from .blueprints.corpora.cqi_over_sio import CQiOverSocketIO - socketio.on_namespace(CQiOverSocketIO('/cqi_over_sio')) + from .namespaces.cqi_over_sio import CQiOverSocketIONamespace + socketio.on_namespace(CQiOverSocketIONamespace('/cqi_over_sio')) # endregion SocketIO Namespaces # region Database event Listeners diff --git a/app/blueprints/corpora/cqi_over_sio/utils.py b/app/blueprints/corpora/cqi_over_sio/utils.py deleted file mode 100644 index 76695b54..00000000 --- a/app/blueprints/corpora/cqi_over_sio/utils.py +++ /dev/null @@ -1,130 +0,0 @@ -from cqi.models.corpora import Corpus as CQiCorpus -from cqi.models.subcorpora import Subcorpus as CQiSubcorpus - - -def lookups_by_cpos(corpus: CQiCorpus, cpos_list: list[int]) -> dict: - lookups = {} - lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list} - for attr in corpus.positional_attributes.list(): - cpos_attr_values: list[str] = attr.values_by_cpos(cpos_list) - for i, cpos in enumerate(cpos_list): - lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i] - for attr in corpus.structural_attributes.list(): - # We only want to iterate over non subattributes, identifiable by - # attr.has_values == False - if attr.has_values: - continue - cpos_attr_ids: list[int] = attr.ids_by_cpos(cpos_list) - for i, cpos in enumerate(cpos_list): - if cpos_attr_ids[i] == -1: - continue - lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_ids[i] - occured_attr_ids = [x for x in set(cpos_attr_ids) if x != -1] - if len(occured_attr_ids) == 0: - continue - subattrs = corpus.structural_attributes.list(filters={'part_of': attr}) - if len(subattrs) == 0: - continue - lookup_name: str = f'{attr.name}_lookup' - lookups[lookup_name] = {} - for attr_id in occured_attr_ids: - lookups[lookup_name][attr_id] = {} - for subattr in subattrs: - subattr_name = subattr.name[(len(attr.name) + 1):] # noqa - for i, subattr_value in enumerate(subattr.values_by_ids(occured_attr_ids)): # noqa - lookups[lookup_name][occured_attr_ids[i]][subattr_name] = subattr_value # noqa - return lookups - - -def partial_export_subcorpus( - subcorpus: CQiSubcorpus, - match_id_list: list[int], - context: int = 25 -) -> dict: - if subcorpus.size == 0: - return {"matches": []} - match_boundaries = [] - for match_id in match_id_list: - if match_id < 0 or match_id >= subcorpus.size: - continue - match_boundaries.append( - ( - match_id, - subcorpus.dump(subcorpus.fields['match'], match_id, match_id)[0], - subcorpus.dump(subcorpus.fields['matchend'], match_id, match_id)[0] - ) - ) - cpos_set = set() - matches = [] - for match_boundary in match_boundaries: - match_num, match_start, match_end = match_boundary - c = (match_start, match_end) - if match_start == 0 or context == 0: - lc = None - cpos_list_lbound = match_start - else: - lc_lbound = max(0, (match_start - context)) - lc_rbound = match_start - 1 - lc = (lc_lbound, lc_rbound) - cpos_list_lbound = lc_lbound - if match_end == (subcorpus.collection.corpus.size - 1) or context == 0: - rc = None - cpos_list_rbound = match_end - else: - rc_lbound = match_end + 1 - rc_rbound = min( - (match_end + context), - (subcorpus.collection.corpus.size - 1) - ) - rc = (rc_lbound, rc_rbound) - cpos_list_rbound = rc_rbound - match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc} - matches.append(match) - cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1)) - lookups = lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set)) - return {'matches': matches, **lookups} - - -def export_subcorpus( - subcorpus: CQiSubcorpus, - context: int = 25, - cutoff: float = float('inf'), - offset: int = 0 -) -> dict: - if subcorpus.size == 0: - return {"matches": []} - first_match = max(0, offset) - last_match = min((offset + cutoff - 1), (subcorpus.size - 1)) - match_boundaries = zip( - range(first_match, last_match + 1), - subcorpus.dump(subcorpus.fields['match'], first_match, last_match), - subcorpus.dump(subcorpus.fields['matchend'], first_match, last_match) - ) - cpos_set = set() - matches = [] - for match_num, match_start, match_end in match_boundaries: - c = (match_start, match_end) - if match_start == 0 or context == 0: - lc = None - cpos_list_lbound = match_start - else: - lc_lbound = max(0, (match_start - context)) - lc_rbound = match_start - 1 - lc = (lc_lbound, lc_rbound) - cpos_list_lbound = lc_lbound - if match_end == (subcorpus.collection.corpus.size - 1) or context == 0: - rc = None - cpos_list_rbound = match_end - else: - rc_lbound = match_end + 1 - rc_rbound = min( - (match_end + context), - (subcorpus.collection.corpus.size - 1) - ) - rc = (rc_lbound, rc_rbound) - cpos_list_rbound = rc_rbound - match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc} - matches.append(match) - cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1)) - lookups = lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set)) - return {'matches': matches, **lookups} diff --git a/app/blueprints/corpora/cqi_over_sio/__init__.py b/app/namespaces/cqi_over_sio/__init__.py similarity index 68% rename from app/blueprints/corpora/cqi_over_sio/__init__.py rename to app/namespaces/cqi_over_sio/__init__.py index a6c038ff..e690fb74 100644 --- a/app/blueprints/corpora/cqi_over_sio/__init__.py +++ b/app/namespaces/cqi_over_sio/__init__.py @@ -1,17 +1,16 @@ from cqi import CQiClient from cqi.errors import CQiException from cqi.status import CQiStatus -from docker.models.containers import Container -from flask import current_app, session +from flask import current_app from flask_login import current_user from flask_socketio import Namespace from inspect import signature from threading import Lock -from typing import Callable from app import db, docker_client, hashids, socketio from app.decorators import socketio_login_required from app.models import Corpus, CorpusStatus from . import extensions +from .utils import CQiOverSocketIOSessionManager ''' @@ -38,7 +37,7 @@ Basic concept: ''' -CQI_API_FUNCTION_NAMES: list[str] = [ +CQI_API_FUNCTION_NAMES = [ 'ask_feature_cl_2_3', 'ask_feature_cqi_1_0', 'ask_feature_cqp_2_3', @@ -86,68 +85,80 @@ CQI_API_FUNCTION_NAMES: list[str] = [ ] -class CQiOverSocketIO(Namespace): +class CQiOverSocketIONamespace(Namespace): @socketio_login_required def on_connect(self): pass @socketio_login_required - def on_init(self, db_corpus_hashid: str): - db_corpus_id: int = hashids.decode(db_corpus_hashid) - db_corpus: Corpus | None = Corpus.query.get(db_corpus_id) - if db_corpus is None: + def on_init(self, corpus_hashid: str) -> dict: + corpus_id = hashids.decode(corpus_hashid) + + if not isinstance(corpus_id, int): + return {'code': 400, 'msg': 'Bad Request'} + + corpus = Corpus.query.get(corpus_id) + + if corpus is None: return {'code': 404, 'msg': 'Not Found'} - if not (db_corpus.user == current_user - or current_user.is_following_corpus(db_corpus) - or current_user.is_administrator): + + if not ( + corpus.user == current_user + or current_user.is_following_corpus(corpus) + or current_user.is_administrator + ): return {'code': 403, 'msg': 'Forbidden'} - if db_corpus.status not in [ + + if corpus.status not in [ CorpusStatus.BUILT, CorpusStatus.STARTING_ANALYSIS_SESSION, CorpusStatus.RUNNING_ANALYSIS_SESSION, CorpusStatus.CANCELING_ANALYSIS_SESSION ]: return {'code': 424, 'msg': 'Failed Dependency'} - if db_corpus.num_analysis_sessions is None: - db_corpus.num_analysis_sessions = 0 - db.session.commit() - db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1 + + corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1 db.session.commit() - retry_counter: int = 20 - while db_corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION: + retry_counter = 20 + while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION: if retry_counter == 0: - db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 + corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 db.session.commit() return {'code': 408, 'msg': 'Request Timeout'} socketio.sleep(3) retry_counter -= 1 - db.session.refresh(db_corpus) - # cqi_client: CQiClient = CQiClient(f'cqpserver_{db_corpus_id}') - cqpserver_container_name: str = f'cqpserver_{db_corpus_id}' - cqpserver_container: Container = docker_client.containers.get(cqpserver_container_name) - cqpserver_host: str = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress'] - cqi_client: CQiClient = CQiClient(cqpserver_host) - session['cqi_over_sio'] = { - 'cqi_client': cqi_client, - 'cqi_client_lock': Lock(), - 'db_corpus_id': db_corpus_id - } + db.session.refresh(corpus) + + cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}' + cqpserver_container = docker_client.containers.get(cqpserver_container_name) + cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress'] + cqi_client = CQiClient(cqpserver_ip_address) + cqi_client_lock = Lock() + + CQiOverSocketIOSessionManager.setup() + CQiOverSocketIOSessionManager.set_corpus_id(corpus_id) + CQiOverSocketIOSessionManager.set_cqi_client(cqi_client) + CQiOverSocketIOSessionManager.set_cqi_client_lock(cqi_client_lock) + return {'code': 200, 'msg': 'OK'} @socketio_login_required - def on_exec(self, fn_name: str, fn_args: dict = {}): + def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict: try: - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] - cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock'] + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() + cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock() except KeyError: return {'code': 424, 'msg': 'Failed Dependency'} + if fn_name in CQI_API_FUNCTION_NAMES: - fn: Callable = getattr(cqi_client.api, fn_name) + fn = getattr(cqi_client.api, fn_name) elif fn_name in extensions.CQI_EXTENSION_FUNCTION_NAMES: - fn: Callable = getattr(extensions, fn_name) + fn = getattr(extensions, fn_name) else: return {'code': 400, 'msg': 'Bad Request'} + for param in signature(fn).parameters.values(): + # Check if the parameter is optional or required if param.default is param.empty: if param.name not in fn_args: return {'code': 400, 'msg': 'Bad Request'} @@ -156,6 +167,7 @@ class CQiOverSocketIO(Namespace): continue if type(fn_args[param.name]) is not param.annotation: return {'code': 400, 'msg': 'Bad Request'} + cqi_client_lock.acquire() try: fn_return_value = fn(**fn_args) @@ -173,6 +185,7 @@ class CQiOverSocketIO(Namespace): } finally: cqi_client_lock.release() + if isinstance(fn_return_value, CQiStatus): payload = { 'code': fn_return_value.code, @@ -180,27 +193,31 @@ class CQiOverSocketIO(Namespace): } else: payload = fn_return_value + return {'code': 200, 'msg': 'OK', 'payload': payload} def on_disconnect(self): try: - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] - cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock'] - db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] + corpus_id = CQiOverSocketIOSessionManager.get_corpus_id() + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() + cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock() + CQiOverSocketIOSessionManager.teardown() except KeyError: return + cqi_client_lock.acquire() - try: - session.pop('cqi_over_sio') - except KeyError: - pass + try: cqi_client.api.ctrl_bye() except (BrokenPipeError, CQiException): pass + cqi_client_lock.release() - db_corpus: Corpus | None = Corpus.query.get(db_corpus_id) - if db_corpus is None: + + corpus = Corpus.query.get(corpus_id) + + if corpus is None: return - db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 + + corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 db.session.commit() diff --git a/app/blueprints/corpora/cqi_over_sio/extensions.py b/app/namespaces/cqi_over_sio/extensions.py similarity index 53% rename from app/blueprints/corpora/cqi_over_sio/extensions.py rename to app/namespaces/cqi_over_sio/extensions.py index c255c558..e0322672 100644 --- a/app/blueprints/corpora/cqi_over_sio/extensions.py +++ b/app/namespaces/cqi_over_sio/extensions.py @@ -1,22 +1,17 @@ from collections import Counter -from cqi import CQiClient from cqi.models.corpora import Corpus as CQiCorpus from cqi.models.subcorpora import Subcorpus as CQiSubcorpus -from cqi.models.attributes import ( - PositionalAttribute as CQiPositionalAttribute, - StructuralAttribute as CQiStructuralAttribute -) from cqi.status import StatusOk as CQiStatusOk -from flask import session +from flask import current_app import gzip import json import math from app import db from app.models import Corpus -from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus +from .utils import CQiOverSocketIOSessionManager -CQI_EXTENSION_FUNCTION_NAMES: list[str] = [ +CQI_EXTENSION_FUNCTION_NAMES = [ 'ext_corpus_update_db', 'ext_corpus_static_data', 'ext_corpus_paginate_corpus', @@ -27,28 +22,28 @@ CQI_EXTENSION_FUNCTION_NAMES: list[str] = [ def ext_corpus_update_db(corpus: str) -> CQiStatusOk: - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] - db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] - db_corpus: Corpus = Corpus.query.get(db_corpus_id) - cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus) + corpus_id = CQiOverSocketIOSessionManager.get_corpus_id() + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() + db_corpus = Corpus.query.get(corpus_id) + cqi_corpus = cqi_client.corpora.get(corpus) db_corpus.num_tokens = cqi_corpus.size db.session.commit() return CQiStatusOk() def ext_corpus_static_data(corpus: str) -> dict: - db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] - db_corpus: Corpus = Corpus.query.get(db_corpus_id) + corpus_id = CQiOverSocketIOSessionManager.get_corpus_id() + db_corpus = Corpus.query.get(corpus_id) static_data_file_path = db_corpus.path / 'cwb' / 'static.json.gz' if static_data_file_path.exists(): with static_data_file_path.open('rb') as f: return f.read() - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] - cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus) - cqi_p_attrs: list[CQiPositionalAttribute] = cqi_corpus.positional_attributes.list() - cqi_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list() + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() + cqi_corpus = cqi_client.corpora.get(corpus) + cqi_p_attrs = cqi_corpus.positional_attributes.list() + cqi_s_attrs = cqi_corpus.structural_attributes.list() static_data = { 'corpus': { @@ -61,21 +56,21 @@ def ext_corpus_static_data(corpus: str) -> dict: } for p_attr in cqi_p_attrs: - print(f'corpus.freqs.{p_attr.name}') + current_app.logger.info(f'corpus.freqs.{p_attr.name}') static_data['corpus']['freqs'][p_attr.name] = [] - p_attr_id_list: list[int] = list(range(p_attr.lexicon_size)) + p_attr_id_list = list(range(p_attr.lexicon_size)) static_data['corpus']['freqs'][p_attr.name].extend(p_attr.freqs_by_ids(p_attr_id_list)) del p_attr_id_list - print(f'p_attrs.{p_attr.name}') + current_app.logger.info(f'p_attrs.{p_attr.name}') static_data['p_attrs'][p_attr.name] = [] - cpos_list: list[int] = list(range(cqi_corpus.size)) + cpos_list = list(range(cqi_corpus.size)) static_data['p_attrs'][p_attr.name].extend(p_attr.ids_by_cpos(cpos_list)) del cpos_list - print(f'values.p_attrs.{p_attr.name}') + current_app.logger.info(f'values.p_attrs.{p_attr.name}') static_data['values']['p_attrs'][p_attr.name] = [] - p_attr_id_list: list[int] = list(range(p_attr.lexicon_size)) + p_attr_id_list = list(range(p_attr.lexicon_size)) static_data['values']['p_attrs'][p_attr.name].extend(p_attr.values_by_ids(p_attr_id_list)) del p_attr_id_list @@ -91,9 +86,9 @@ def ext_corpus_static_data(corpus: str) -> dict: # Note: Needs more testing, don't use it in production # ############################################################## cqi_corpus.query('Last', f'<{s_attr.name}> []* ;') - cqi_subcorpus: CQiSubcorpus = cqi_corpus.subcorpora.get('Last') - first_match: int = 0 - last_match: int = cqi_subcorpus.size - 1 + cqi_subcorpus = cqi_corpus.subcorpora.get('Last') + first_match = 0 + last_match = cqi_subcorpus.size - 1 match_boundaries = zip( range(first_match, last_match + 1), cqi_subcorpus.dump( @@ -111,7 +106,7 @@ def ext_corpus_static_data(corpus: str) -> dict: del cqi_subcorpus, first_match, last_match for id, lbound, rbound in match_boundaries: static_data['s_attrs'][s_attr.name]['lexicon'].append({}) - print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') + current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound] del match_boundaries @@ -123,33 +118,33 @@ def ext_corpus_static_data(corpus: str) -> dict: # This is a very slow operation, thats why we only use it for # the text attribute lbound, rbound = s_attr.cpos_by_id(id) - print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') + current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound] static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {} - cpos_list: list[int] = list(range(lbound, rbound + 1)) + cpos_list = list(range(lbound, rbound + 1)) for p_attr in cqi_p_attrs: - p_attr_ids: list[int] = [] + p_attr_ids = [] p_attr_ids.extend(p_attr.ids_by_cpos(cpos_list)) - print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}') + current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}') static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids)) del p_attr_ids del cpos_list - sub_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr}) - print(f's_attrs.{s_attr.name}.values') + sub_s_attrs = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr}) + current_app.logger.info(f's_attrs.{s_attr.name}.values') static_data['s_attrs'][s_attr.name]['values'] = [ sub_s_attr.name[(len(s_attr.name) + 1):] for sub_s_attr in sub_s_attrs ] - s_attr_id_list: list[int] = list(range(s_attr.size)) - sub_s_attr_values: list[str] = [] + s_attr_id_list = list(range(s_attr.size)) + sub_s_attr_values = [] for sub_s_attr in sub_s_attrs: tmp = [] tmp.extend(sub_s_attr.values_by_ids(s_attr_id_list)) sub_s_attr_values.append(tmp) del tmp del s_attr_id_list - print(f'values.s_attrs.{s_attr.name}') + current_app.logger.info(f'values.s_attrs.{s_attr.name}') static_data['values']['s_attrs'][s_attr.name] = [ { s_attr_value_name: sub_s_attr_values[s_attr_value_name_idx][s_attr_id] @@ -159,11 +154,11 @@ def ext_corpus_static_data(corpus: str) -> dict: } for s_attr_id in range(0, s_attr.size) ] del sub_s_attr_values - print('Saving static data to file') + current_app.logger.info('Saving static data to file') with gzip.open(static_data_file_path, 'wt') as f: json.dump(static_data, f) del static_data - print('Sending static data to client') + current_app.logger.info('Sending static data to client') with open(static_data_file_path, 'rb') as f: return f.read() @@ -173,7 +168,7 @@ def ext_corpus_paginate_corpus( page: int = 1, per_page: int = 20 ) -> dict: - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_corpus = cqi_client.corpora.get(corpus) # Sanity checks if ( @@ -188,7 +183,7 @@ def ext_corpus_paginate_corpus( first_cpos = (page - 1) * per_page last_cpos = min(cqi_corpus.size, first_cpos + per_page) cpos_list = [*range(first_cpos, last_cpos)] - lookups = lookups_by_cpos(cqi_corpus, cpos_list) + lookups = _lookups_by_cpos(cqi_corpus, cpos_list) payload = {} # the items for the current page payload['items'] = [cpos_list] @@ -220,7 +215,7 @@ def ext_cqp_paginate_subcorpus( per_page: int = 20 ) -> dict: corpus_name, subcorpus_name = subcorpus.split(':', 1) - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) # Sanity checks @@ -235,7 +230,7 @@ def ext_cqp_paginate_subcorpus( return {'code': 416, 'msg': 'Range Not Satisfiable'} offset = (page - 1) * per_page cutoff = per_page - cqi_results_export = export_subcorpus( + cqi_results_export = _export_subcorpus( cqi_subcorpus, context=context, cutoff=cutoff, offset=offset) payload = {} # the items for the current page @@ -267,20 +262,145 @@ def ext_cqp_partial_export_subcorpus( context: int = 50 ) -> dict: corpus_name, subcorpus_name = subcorpus.split(':', 1) - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) - cqi_subcorpus_partial_export = partial_export_subcorpus(cqi_subcorpus, match_id_list, context=context) + cqi_subcorpus_partial_export = _partial_export_subcorpus(cqi_subcorpus, match_id_list, context=context) return cqi_subcorpus_partial_export -def ext_cqp_export_subcorpus( - subcorpus: str, - context: int = 50 -) -> dict: +def ext_cqp_export_subcorpus(subcorpus: str, context: int = 50) -> dict: corpus_name, subcorpus_name = subcorpus.split(':', 1) - cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] + cqi_client = CQiOverSocketIOSessionManager.get_cqi_client() cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) - cqi_subcorpus_export = export_subcorpus(cqi_subcorpus, context=context) + cqi_subcorpus_export = _export_subcorpus(cqi_subcorpus, context=context) return cqi_subcorpus_export + + +def _lookups_by_cpos(corpus: CQiCorpus, cpos_list: list[int]) -> dict: + lookups = {} + lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list} + for attr in corpus.positional_attributes.list(): + cpos_attr_values = attr.values_by_cpos(cpos_list) + for i, cpos in enumerate(cpos_list): + lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i] + for attr in corpus.structural_attributes.list(): + # We only want to iterate over non subattributes, identifiable by + # attr.has_values == False + if attr.has_values: + continue + cpos_attr_ids = attr.ids_by_cpos(cpos_list) + for i, cpos in enumerate(cpos_list): + if cpos_attr_ids[i] == -1: + continue + lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_ids[i] + occured_attr_ids = [x for x in set(cpos_attr_ids) if x != -1] + if len(occured_attr_ids) == 0: + continue + subattrs = corpus.structural_attributes.list(filters={'part_of': attr}) + if len(subattrs) == 0: + continue + lookup_name = f'{attr.name}_lookup' + lookups[lookup_name] = {} + for attr_id in occured_attr_ids: + lookups[lookup_name][attr_id] = {} + for subattr in subattrs: + subattr_name = subattr.name[(len(attr.name) + 1):] # noqa + for i, subattr_value in enumerate(subattr.values_by_ids(occured_attr_ids)): # noqa + lookups[lookup_name][occured_attr_ids[i]][subattr_name] = subattr_value # noqa + return lookups + + +def _partial_export_subcorpus( + subcorpus: CQiSubcorpus, + match_id_list: list[int], + context: int = 25 +) -> dict: + if subcorpus.size == 0: + return {'matches': []} + match_boundaries = [] + for match_id in match_id_list: + if match_id < 0 or match_id >= subcorpus.size: + continue + match_boundaries.append( + ( + match_id, + subcorpus.dump(subcorpus.fields['match'], match_id, match_id)[0], + subcorpus.dump(subcorpus.fields['matchend'], match_id, match_id)[0] + ) + ) + cpos_set = set() + matches = [] + for match_boundary in match_boundaries: + match_num, match_start, match_end = match_boundary + c = (match_start, match_end) + if match_start == 0 or context == 0: + lc = None + cpos_list_lbound = match_start + else: + lc_lbound = max(0, (match_start - context)) + lc_rbound = match_start - 1 + lc = (lc_lbound, lc_rbound) + cpos_list_lbound = lc_lbound + if match_end == (subcorpus.collection.corpus.size - 1) or context == 0: + rc = None + cpos_list_rbound = match_end + else: + rc_lbound = match_end + 1 + rc_rbound = min( + (match_end + context), + (subcorpus.collection.corpus.size - 1) + ) + rc = (rc_lbound, rc_rbound) + cpos_list_rbound = rc_rbound + match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc} + matches.append(match) + cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1)) + lookups = _lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set)) + return {'matches': matches, **lookups} + + +def _export_subcorpus( + subcorpus: CQiSubcorpus, + context: int = 25, + cutoff: float = float('inf'), + offset: int = 0 +) -> dict: + if subcorpus.size == 0: + return {'matches': []} + first_match = max(0, offset) + last_match = min((offset + cutoff - 1), (subcorpus.size - 1)) + match_boundaries = zip( + range(first_match, last_match + 1), + subcorpus.dump(subcorpus.fields['match'], first_match, last_match), + subcorpus.dump(subcorpus.fields['matchend'], first_match, last_match) + ) + cpos_set = set() + matches = [] + for match_num, match_start, match_end in match_boundaries: + c = (match_start, match_end) + if match_start == 0 or context == 0: + lc = None + cpos_list_lbound = match_start + else: + lc_lbound = max(0, (match_start - context)) + lc_rbound = match_start - 1 + lc = (lc_lbound, lc_rbound) + cpos_list_lbound = lc_lbound + if match_end == (subcorpus.collection.corpus.size - 1) or context == 0: + rc = None + cpos_list_rbound = match_end + else: + rc_lbound = match_end + 1 + rc_rbound = min( + (match_end + context), + (subcorpus.collection.corpus.size - 1) + ) + rc = (rc_lbound, rc_rbound) + cpos_list_rbound = rc_rbound + match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc} + matches.append(match) + cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1)) + lookups = _lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set)) + return {'matches': matches, **lookups} diff --git a/app/namespaces/cqi_over_sio/utils.py b/app/namespaces/cqi_over_sio/utils.py new file mode 100644 index 00000000..5bef9f82 --- /dev/null +++ b/app/namespaces/cqi_over_sio/utils.py @@ -0,0 +1,37 @@ +from cqi import CQiClient +from threading import Lock +from flask import session + + +class CQiOverSocketIOSessionManager: + @staticmethod + def setup(): + session['cqi_over_sio'] = {} + + @staticmethod + def teardown(): + session.pop('cqi_over_sio') + + @staticmethod + def set_corpus_id(corpus_id: int): + session['cqi_over_sio']['corpus_id'] = corpus_id + + @staticmethod + def get_corpus_id() -> int: + return session['cqi_over_sio']['corpus_id'] + + @staticmethod + def set_cqi_client(cqi_client: CQiClient): + session['cqi_over_sio']['cqi_client'] = cqi_client + + @staticmethod + def get_cqi_client() -> CQiClient: + return session['cqi_over_sio']['cqi_client'] + + @staticmethod + def set_cqi_client_lock(cqi_client_lock: Lock): + session['cqi_over_sio']['cqi_client_lock'] = cqi_client_lock + + @staticmethod + def get_cqi_client_lock() -> Lock: + return session['cqi_over_sio']['cqi_client_lock']