Compare commits

...

2 Commits

Author SHA1 Message Date
Patrick Jentsch
289a551122 Create dedicated '/users' Socket.IO Namespace 2024-11-06 13:04:30 +01:00
Patrick Jentsch
2a28f19660 Move Socket.IO Namespaces to dedicated directory 2024-11-06 12:27:49 +01:00
9 changed files with 370 additions and 374 deletions

View File

@ -130,8 +130,11 @@ def create_app(config: Config = Config) -> Flask:
# endregion Blueprints # endregion Blueprints
# region SocketIO Namespaces # region SocketIO Namespaces
from .blueprints.corpora.cqi_over_sio import CQiOverSocketIO from .namespaces.cqi_over_sio import CQiOverSocketIONamespace
socketio.on_namespace(CQiOverSocketIO('/cqi_over_sio')) socketio.on_namespace(CQiOverSocketIONamespace('/cqi_over_sio'))
from .namespaces.users import UsersNamespace
socketio.on_namespace(UsersNamespace('/users'))
# endregion SocketIO Namespaces # endregion SocketIO Namespaces
# region Database event Listeners # region Database event Listeners

View File

@ -1,130 +0,0 @@
from cqi.models.corpora import Corpus as CQiCorpus
from cqi.models.subcorpora import Subcorpus as CQiSubcorpus
def lookups_by_cpos(corpus: CQiCorpus, cpos_list: list[int]) -> dict:
lookups = {}
lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list}
for attr in corpus.positional_attributes.list():
cpos_attr_values: list[str] = attr.values_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list):
lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i]
for attr in corpus.structural_attributes.list():
# We only want to iterate over non subattributes, identifiable by
# attr.has_values == False
if attr.has_values:
continue
cpos_attr_ids: list[int] = attr.ids_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list):
if cpos_attr_ids[i] == -1:
continue
lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_ids[i]
occured_attr_ids = [x for x in set(cpos_attr_ids) if x != -1]
if len(occured_attr_ids) == 0:
continue
subattrs = corpus.structural_attributes.list(filters={'part_of': attr})
if len(subattrs) == 0:
continue
lookup_name: str = f'{attr.name}_lookup'
lookups[lookup_name] = {}
for attr_id in occured_attr_ids:
lookups[lookup_name][attr_id] = {}
for subattr in subattrs:
subattr_name = subattr.name[(len(attr.name) + 1):] # noqa
for i, subattr_value in enumerate(subattr.values_by_ids(occured_attr_ids)): # noqa
lookups[lookup_name][occured_attr_ids[i]][subattr_name] = subattr_value # noqa
return lookups
def partial_export_subcorpus(
subcorpus: CQiSubcorpus,
match_id_list: list[int],
context: int = 25
) -> dict:
if subcorpus.size == 0:
return {"matches": []}
match_boundaries = []
for match_id in match_id_list:
if match_id < 0 or match_id >= subcorpus.size:
continue
match_boundaries.append(
(
match_id,
subcorpus.dump(subcorpus.fields['match'], match_id, match_id)[0],
subcorpus.dump(subcorpus.fields['matchend'], match_id, match_id)[0]
)
)
cpos_set = set()
matches = []
for match_boundary in match_boundaries:
match_num, match_start, match_end = match_boundary
c = (match_start, match_end)
if match_start == 0 or context == 0:
lc = None
cpos_list_lbound = match_start
else:
lc_lbound = max(0, (match_start - context))
lc_rbound = match_start - 1
lc = (lc_lbound, lc_rbound)
cpos_list_lbound = lc_lbound
if match_end == (subcorpus.collection.corpus.size - 1) or context == 0:
rc = None
cpos_list_rbound = match_end
else:
rc_lbound = match_end + 1
rc_rbound = min(
(match_end + context),
(subcorpus.collection.corpus.size - 1)
)
rc = (rc_lbound, rc_rbound)
cpos_list_rbound = rc_rbound
match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc}
matches.append(match)
cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1))
lookups = lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set))
return {'matches': matches, **lookups}
def export_subcorpus(
subcorpus: CQiSubcorpus,
context: int = 25,
cutoff: float = float('inf'),
offset: int = 0
) -> dict:
if subcorpus.size == 0:
return {"matches": []}
first_match = max(0, offset)
last_match = min((offset + cutoff - 1), (subcorpus.size - 1))
match_boundaries = zip(
range(first_match, last_match + 1),
subcorpus.dump(subcorpus.fields['match'], first_match, last_match),
subcorpus.dump(subcorpus.fields['matchend'], first_match, last_match)
)
cpos_set = set()
matches = []
for match_num, match_start, match_end in match_boundaries:
c = (match_start, match_end)
if match_start == 0 or context == 0:
lc = None
cpos_list_lbound = match_start
else:
lc_lbound = max(0, (match_start - context))
lc_rbound = match_start - 1
lc = (lc_lbound, lc_rbound)
cpos_list_lbound = lc_lbound
if match_end == (subcorpus.collection.corpus.size - 1) or context == 0:
rc = None
cpos_list_rbound = match_end
else:
rc_lbound = match_end + 1
rc_rbound = min(
(match_end + context),
(subcorpus.collection.corpus.size - 1)
)
rc = (rc_lbound, rc_rbound)
cpos_list_rbound = rc_rbound
match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc}
matches.append(match)
cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1))
lookups = lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set))
return {'matches': matches, **lookups}

View File

@ -1,138 +0,0 @@
from flask import current_app
from flask_login import current_user
from flask_socketio import Namespace
from app import db, hashids, socketio
from app.extensions.flask_socketio import admin_required, login_required
from app.models import Job, JobStatus
class JobsNamespace(Namespace):
@login_required
def on_delete(self, job_hashid: str):
# Decode the job hashid
job_id = hashids.decode(job_hashid)
# Validate job_id
if not isinstance(job_id, int):
return {
'code': 400,
'body': 'job_id is invalid'
}
# Load job from database
job = Job.query.get(job_id)
if job is None:
return {
'code': 404,
'body': 'Job not found'
}
# Check if the current user is allowed to delete the job
if not (job.user == current_user or current_user.is_administrator):
return {
'code': 403,
'body': 'Forbidden'
}
# TODO: This should be a method in the Job model
def _delete_job(app, job_id):
with app.app_context():
job = Job.query.get(job_id)
job.delete()
db.session.commit()
# Delete the job in a background task
socketio.start_background_task(
target=_delete_job,
app=current_app._get_current_object(),
job_id=job_id
)
return {
'code': 202,
'body': f'Job "{job.title}" marked for deletion'
}
@admin_required
def on_get_log(self, job_hashid: str):
# Decode the job hashid
job_id = hashids.decode(job_hashid)
# Validate job_id
if not isinstance(job_id, int):
return {
'code': 400,
'body': 'job_id is invalid'
}
# Load job from database
job = Job.query.get(job_id)
if job is None:
return {
'code': 404,
'body': 'Job not found'
}
# Check if the job is already processed
if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]:
return {
'code': 409,
'body': 'Job is not done processing'
}
# Read the log file
with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file:
job_log = log_file.read()
return {
'code': 200,
'body': job_log
}
@login_required
def on_restart(self, job_hashid: str):
# Decode the job hashid
job_id = hashids.decode(job_hashid)
# Validate job_id
if not isinstance(job_id, int):
return {
'code': 400,
'body': 'job_id is invalid'
}
# Load job from database
job = Job.query.get(job_id)
if job is None:
return {
'code': 404,
'body': 'Job not found'
}
# Check if the current user is allowed to restart the job
if not (job.user == current_user or current_user.is_administrator):
return {
'code': 403,
'body': 'Forbidden'
}
# TODO: This should be a method in the Job model
def _restart_job(app, job_id):
with app.app_context():
job = Job.query.get(job_id)
job.restart()
db.session.commit()
# Restart the job in a background task
socketio.start_background_task(
target=_restart_job,
app=current_app._get_current_object(),
job_id=job_id
)
return {
'code': 202,
'body': f'Job "{job.title}" restarted'
}

View File

@ -15,4 +15,4 @@ def before_request():
pass pass
from . import cli, events, json_routes, routes, settings from . import cli, json_routes, routes, settings

View File

@ -1,17 +1,16 @@
from cqi import CQiClient from cqi import CQiClient
from cqi.errors import CQiException from cqi.errors import CQiException
from cqi.status import CQiStatus from cqi.status import CQiStatus
from docker.models.containers import Container from flask import current_app
from flask import current_app, session
from flask_login import current_user from flask_login import current_user
from flask_socketio import Namespace from flask_socketio import Namespace
from inspect import signature from inspect import signature
from threading import Lock from threading import Lock
from typing import Callable
from app import db, docker_client, hashids, socketio from app import db, docker_client, hashids, socketio
from app.decorators import socketio_login_required from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus from app.models import Corpus, CorpusStatus
from . import extensions from . import extensions
from .utils import CQiOverSocketIOSessionManager
''' '''
@ -38,7 +37,7 @@ Basic concept:
''' '''
CQI_API_FUNCTION_NAMES: list[str] = [ CQI_API_FUNCTION_NAMES = [
'ask_feature_cl_2_3', 'ask_feature_cl_2_3',
'ask_feature_cqi_1_0', 'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3', 'ask_feature_cqp_2_3',
@ -86,68 +85,80 @@ CQI_API_FUNCTION_NAMES: list[str] = [
] ]
class CQiOverSocketIO(Namespace): class CQiOverSocketIONamespace(Namespace):
@socketio_login_required @socketio_login_required
def on_connect(self): def on_connect(self):
pass pass
@socketio_login_required @socketio_login_required
def on_init(self, db_corpus_hashid: str): def on_init(self, corpus_hashid: str) -> dict:
db_corpus_id: int = hashids.decode(db_corpus_hashid) corpus_id = hashids.decode(corpus_hashid)
db_corpus: Corpus | None = Corpus.query.get(db_corpus_id)
if db_corpus is None: if not isinstance(corpus_id, int):
return {'code': 400, 'msg': 'Bad Request'}
corpus = Corpus.query.get(corpus_id)
if corpus is None:
return {'code': 404, 'msg': 'Not Found'} return {'code': 404, 'msg': 'Not Found'}
if not (db_corpus.user == current_user
or current_user.is_following_corpus(db_corpus) if not (
or current_user.is_administrator): corpus.user == current_user
or current_user.is_following_corpus(corpus)
or current_user.is_administrator
):
return {'code': 403, 'msg': 'Forbidden'} return {'code': 403, 'msg': 'Forbidden'}
if db_corpus.status not in [
if corpus.status not in [
CorpusStatus.BUILT, CorpusStatus.BUILT,
CorpusStatus.STARTING_ANALYSIS_SESSION, CorpusStatus.STARTING_ANALYSIS_SESSION,
CorpusStatus.RUNNING_ANALYSIS_SESSION, CorpusStatus.RUNNING_ANALYSIS_SESSION,
CorpusStatus.CANCELING_ANALYSIS_SESSION CorpusStatus.CANCELING_ANALYSIS_SESSION
]: ]:
return {'code': 424, 'msg': 'Failed Dependency'} return {'code': 424, 'msg': 'Failed Dependency'}
if db_corpus.num_analysis_sessions is None:
db_corpus.num_analysis_sessions = 0 corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit()
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions + 1
db.session.commit() db.session.commit()
retry_counter: int = 20 retry_counter = 20
while db_corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION: while corpus.status != CorpusStatus.RUNNING_ANALYSIS_SESSION:
if retry_counter == 0: if retry_counter == 0:
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit() db.session.commit()
return {'code': 408, 'msg': 'Request Timeout'} return {'code': 408, 'msg': 'Request Timeout'}
socketio.sleep(3) socketio.sleep(3)
retry_counter -= 1 retry_counter -= 1
db.session.refresh(db_corpus) db.session.refresh(corpus)
# cqi_client: CQiClient = CQiClient(f'cqpserver_{db_corpus_id}')
cqpserver_container_name: str = f'cqpserver_{db_corpus_id}' cqpserver_container_name = f'nopaque-cqpserver-{corpus_id}'
cqpserver_container: Container = docker_client.containers.get(cqpserver_container_name) cqpserver_container = docker_client.containers.get(cqpserver_container_name)
cqpserver_host: str = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress'] cqpserver_ip_address = cqpserver_container.attrs['NetworkSettings']['Networks'][current_app.config['NOPAQUE_DOCKER_NETWORK_NAME']]['IPAddress']
cqi_client: CQiClient = CQiClient(cqpserver_host) cqi_client = CQiClient(cqpserver_ip_address)
session['cqi_over_sio'] = { cqi_client_lock = Lock()
'cqi_client': cqi_client,
'cqi_client_lock': Lock(), CQiOverSocketIOSessionManager.setup()
'db_corpus_id': db_corpus_id CQiOverSocketIOSessionManager.set_corpus_id(corpus_id)
} CQiOverSocketIOSessionManager.set_cqi_client(cqi_client)
CQiOverSocketIOSessionManager.set_cqi_client_lock(cqi_client_lock)
return {'code': 200, 'msg': 'OK'} return {'code': 200, 'msg': 'OK'}
@socketio_login_required @socketio_login_required
def on_exec(self, fn_name: str, fn_args: dict = {}): def on_exec(self, fn_name: str, fn_args: dict = {}) -> dict:
try: try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock'] cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock()
except KeyError: except KeyError:
return {'code': 424, 'msg': 'Failed Dependency'} return {'code': 424, 'msg': 'Failed Dependency'}
if fn_name in CQI_API_FUNCTION_NAMES: if fn_name in CQI_API_FUNCTION_NAMES:
fn: Callable = getattr(cqi_client.api, fn_name) fn = getattr(cqi_client.api, fn_name)
elif fn_name in extensions.CQI_EXTENSION_FUNCTION_NAMES: elif fn_name in extensions.CQI_EXTENSION_FUNCTION_NAMES:
fn: Callable = getattr(extensions, fn_name) fn = getattr(extensions, fn_name)
else: else:
return {'code': 400, 'msg': 'Bad Request'} return {'code': 400, 'msg': 'Bad Request'}
for param in signature(fn).parameters.values(): for param in signature(fn).parameters.values():
# Check if the parameter is optional or required
if param.default is param.empty: if param.default is param.empty:
if param.name not in fn_args: if param.name not in fn_args:
return {'code': 400, 'msg': 'Bad Request'} return {'code': 400, 'msg': 'Bad Request'}
@ -156,6 +167,7 @@ class CQiOverSocketIO(Namespace):
continue continue
if type(fn_args[param.name]) is not param.annotation: if type(fn_args[param.name]) is not param.annotation:
return {'code': 400, 'msg': 'Bad Request'} return {'code': 400, 'msg': 'Bad Request'}
cqi_client_lock.acquire() cqi_client_lock.acquire()
try: try:
fn_return_value = fn(**fn_args) fn_return_value = fn(**fn_args)
@ -173,6 +185,7 @@ class CQiOverSocketIO(Namespace):
} }
finally: finally:
cqi_client_lock.release() cqi_client_lock.release()
if isinstance(fn_return_value, CQiStatus): if isinstance(fn_return_value, CQiStatus):
payload = { payload = {
'code': fn_return_value.code, 'code': fn_return_value.code,
@ -180,27 +193,31 @@ class CQiOverSocketIO(Namespace):
} }
else: else:
payload = fn_return_value payload = fn_return_value
return {'code': 200, 'msg': 'OK', 'payload': payload} return {'code': 200, 'msg': 'OK', 'payload': payload}
def on_disconnect(self): def on_disconnect(self):
try: try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] corpus_id = CQiOverSocketIOSessionManager.get_corpus_id()
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] cqi_client_lock = CQiOverSocketIOSessionManager.get_cqi_client_lock()
CQiOverSocketIOSessionManager.teardown()
except KeyError: except KeyError:
return return
cqi_client_lock.acquire() cqi_client_lock.acquire()
try:
session.pop('cqi_over_sio')
except KeyError:
pass
try: try:
cqi_client.api.ctrl_bye() cqi_client.api.ctrl_bye()
except (BrokenPipeError, CQiException): except (BrokenPipeError, CQiException):
pass pass
cqi_client_lock.release() cqi_client_lock.release()
db_corpus: Corpus | None = Corpus.query.get(db_corpus_id)
if db_corpus is None: corpus = Corpus.query.get(corpus_id)
if corpus is None:
return return
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1
db.session.commit() db.session.commit()

View File

@ -1,22 +1,17 @@
from collections import Counter from collections import Counter
from cqi import CQiClient
from cqi.models.corpora import Corpus as CQiCorpus from cqi.models.corpora import Corpus as CQiCorpus
from cqi.models.subcorpora import Subcorpus as CQiSubcorpus from cqi.models.subcorpora import Subcorpus as CQiSubcorpus
from cqi.models.attributes import (
PositionalAttribute as CQiPositionalAttribute,
StructuralAttribute as CQiStructuralAttribute
)
from cqi.status import StatusOk as CQiStatusOk from cqi.status import StatusOk as CQiStatusOk
from flask import session from flask import current_app
import gzip import gzip
import json import json
import math import math
from app import db from app import db
from app.models import Corpus from app.models import Corpus
from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus from .utils import CQiOverSocketIOSessionManager
CQI_EXTENSION_FUNCTION_NAMES: list[str] = [ CQI_EXTENSION_FUNCTION_NAMES = [
'ext_corpus_update_db', 'ext_corpus_update_db',
'ext_corpus_static_data', 'ext_corpus_static_data',
'ext_corpus_paginate_corpus', 'ext_corpus_paginate_corpus',
@ -27,28 +22,28 @@ CQI_EXTENSION_FUNCTION_NAMES: list[str] = [
def ext_corpus_update_db(corpus: str) -> CQiStatusOk: def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] corpus_id = CQiOverSocketIOSessionManager.get_corpus_id()
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
db_corpus: Corpus = Corpus.query.get(db_corpus_id) db_corpus = Corpus.query.get(corpus_id)
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus) cqi_corpus = cqi_client.corpora.get(corpus)
db_corpus.num_tokens = cqi_corpus.size db_corpus.num_tokens = cqi_corpus.size
db.session.commit() db.session.commit()
return CQiStatusOk() return CQiStatusOk()
def ext_corpus_static_data(corpus: str) -> dict: def ext_corpus_static_data(corpus: str) -> dict:
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] corpus_id = CQiOverSocketIOSessionManager.get_corpus_id()
db_corpus: Corpus = Corpus.query.get(db_corpus_id) db_corpus = Corpus.query.get(corpus_id)
static_data_file_path = db_corpus.path / 'cwb' / 'static.json.gz' static_data_file_path = db_corpus.path / 'cwb' / 'static.json.gz'
if static_data_file_path.exists(): if static_data_file_path.exists():
with static_data_file_path.open('rb') as f: with static_data_file_path.open('rb') as f:
return f.read() return f.read()
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus) cqi_corpus = cqi_client.corpora.get(corpus)
cqi_p_attrs: list[CQiPositionalAttribute] = cqi_corpus.positional_attributes.list() cqi_p_attrs = cqi_corpus.positional_attributes.list()
cqi_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list() cqi_s_attrs = cqi_corpus.structural_attributes.list()
static_data = { static_data = {
'corpus': { 'corpus': {
@ -61,21 +56,21 @@ def ext_corpus_static_data(corpus: str) -> dict:
} }
for p_attr in cqi_p_attrs: for p_attr in cqi_p_attrs:
print(f'corpus.freqs.{p_attr.name}') current_app.logger.info(f'corpus.freqs.{p_attr.name}')
static_data['corpus']['freqs'][p_attr.name] = [] static_data['corpus']['freqs'][p_attr.name] = []
p_attr_id_list: list[int] = list(range(p_attr.lexicon_size)) p_attr_id_list = list(range(p_attr.lexicon_size))
static_data['corpus']['freqs'][p_attr.name].extend(p_attr.freqs_by_ids(p_attr_id_list)) static_data['corpus']['freqs'][p_attr.name].extend(p_attr.freqs_by_ids(p_attr_id_list))
del p_attr_id_list del p_attr_id_list
print(f'p_attrs.{p_attr.name}') current_app.logger.info(f'p_attrs.{p_attr.name}')
static_data['p_attrs'][p_attr.name] = [] static_data['p_attrs'][p_attr.name] = []
cpos_list: list[int] = list(range(cqi_corpus.size)) cpos_list = list(range(cqi_corpus.size))
static_data['p_attrs'][p_attr.name].extend(p_attr.ids_by_cpos(cpos_list)) static_data['p_attrs'][p_attr.name].extend(p_attr.ids_by_cpos(cpos_list))
del cpos_list del cpos_list
print(f'values.p_attrs.{p_attr.name}') current_app.logger.info(f'values.p_attrs.{p_attr.name}')
static_data['values']['p_attrs'][p_attr.name] = [] static_data['values']['p_attrs'][p_attr.name] = []
p_attr_id_list: list[int] = list(range(p_attr.lexicon_size)) p_attr_id_list = list(range(p_attr.lexicon_size))
static_data['values']['p_attrs'][p_attr.name].extend(p_attr.values_by_ids(p_attr_id_list)) static_data['values']['p_attrs'][p_attr.name].extend(p_attr.values_by_ids(p_attr_id_list))
del p_attr_id_list del p_attr_id_list
@ -91,9 +86,9 @@ def ext_corpus_static_data(corpus: str) -> dict:
# Note: Needs more testing, don't use it in production # # Note: Needs more testing, don't use it in production #
############################################################## ##############################################################
cqi_corpus.query('Last', f'<{s_attr.name}> []* </{s_attr.name}>;') cqi_corpus.query('Last', f'<{s_attr.name}> []* </{s_attr.name}>;')
cqi_subcorpus: CQiSubcorpus = cqi_corpus.subcorpora.get('Last') cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
first_match: int = 0 first_match = 0
last_match: int = cqi_subcorpus.size - 1 last_match = cqi_subcorpus.size - 1
match_boundaries = zip( match_boundaries = zip(
range(first_match, last_match + 1), range(first_match, last_match + 1),
cqi_subcorpus.dump( cqi_subcorpus.dump(
@ -111,7 +106,7 @@ def ext_corpus_static_data(corpus: str) -> dict:
del cqi_subcorpus, first_match, last_match del cqi_subcorpus, first_match, last_match
for id, lbound, rbound in match_boundaries: for id, lbound, rbound in match_boundaries:
static_data['s_attrs'][s_attr.name]['lexicon'].append({}) static_data['s_attrs'][s_attr.name]['lexicon'].append({})
print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.bounds')
static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound] static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
del match_boundaries del match_boundaries
@ -123,33 +118,33 @@ def ext_corpus_static_data(corpus: str) -> dict:
# This is a very slow operation, thats why we only use it for # This is a very slow operation, thats why we only use it for
# the text attribute # the text attribute
lbound, rbound = s_attr.cpos_by_id(id) lbound, rbound = s_attr.cpos_by_id(id)
print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.bounds')
static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound] static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {} static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {}
cpos_list: list[int] = list(range(lbound, rbound + 1)) cpos_list = list(range(lbound, rbound + 1))
for p_attr in cqi_p_attrs: for p_attr in cqi_p_attrs:
p_attr_ids: list[int] = [] p_attr_ids = []
p_attr_ids.extend(p_attr.ids_by_cpos(cpos_list)) p_attr_ids.extend(p_attr.ids_by_cpos(cpos_list))
print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}') current_app.logger.info(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}')
static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids)) static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids))
del p_attr_ids del p_attr_ids
del cpos_list del cpos_list
sub_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr}) sub_s_attrs = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr})
print(f's_attrs.{s_attr.name}.values') current_app.logger.info(f's_attrs.{s_attr.name}.values')
static_data['s_attrs'][s_attr.name]['values'] = [ static_data['s_attrs'][s_attr.name]['values'] = [
sub_s_attr.name[(len(s_attr.name) + 1):] sub_s_attr.name[(len(s_attr.name) + 1):]
for sub_s_attr in sub_s_attrs for sub_s_attr in sub_s_attrs
] ]
s_attr_id_list: list[int] = list(range(s_attr.size)) s_attr_id_list = list(range(s_attr.size))
sub_s_attr_values: list[str] = [] sub_s_attr_values = []
for sub_s_attr in sub_s_attrs: for sub_s_attr in sub_s_attrs:
tmp = [] tmp = []
tmp.extend(sub_s_attr.values_by_ids(s_attr_id_list)) tmp.extend(sub_s_attr.values_by_ids(s_attr_id_list))
sub_s_attr_values.append(tmp) sub_s_attr_values.append(tmp)
del tmp del tmp
del s_attr_id_list del s_attr_id_list
print(f'values.s_attrs.{s_attr.name}') current_app.logger.info(f'values.s_attrs.{s_attr.name}')
static_data['values']['s_attrs'][s_attr.name] = [ static_data['values']['s_attrs'][s_attr.name] = [
{ {
s_attr_value_name: sub_s_attr_values[s_attr_value_name_idx][s_attr_id] s_attr_value_name: sub_s_attr_values[s_attr_value_name_idx][s_attr_id]
@ -159,11 +154,11 @@ def ext_corpus_static_data(corpus: str) -> dict:
} for s_attr_id in range(0, s_attr.size) } for s_attr_id in range(0, s_attr.size)
] ]
del sub_s_attr_values del sub_s_attr_values
print('Saving static data to file') current_app.logger.info('Saving static data to file')
with gzip.open(static_data_file_path, 'wt') as f: with gzip.open(static_data_file_path, 'wt') as f:
json.dump(static_data, f) json.dump(static_data, f)
del static_data del static_data
print('Sending static data to client') current_app.logger.info('Sending static data to client')
with open(static_data_file_path, 'rb') as f: with open(static_data_file_path, 'rb') as f:
return f.read() return f.read()
@ -173,7 +168,7 @@ def ext_corpus_paginate_corpus(
page: int = 1, page: int = 1,
per_page: int = 20 per_page: int = 20
) -> dict: ) -> dict:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_corpus = cqi_client.corpora.get(corpus) cqi_corpus = cqi_client.corpora.get(corpus)
# Sanity checks # Sanity checks
if ( if (
@ -188,7 +183,7 @@ def ext_corpus_paginate_corpus(
first_cpos = (page - 1) * per_page first_cpos = (page - 1) * per_page
last_cpos = min(cqi_corpus.size, first_cpos + per_page) last_cpos = min(cqi_corpus.size, first_cpos + per_page)
cpos_list = [*range(first_cpos, last_cpos)] cpos_list = [*range(first_cpos, last_cpos)]
lookups = lookups_by_cpos(cqi_corpus, cpos_list) lookups = _lookups_by_cpos(cqi_corpus, cpos_list)
payload = {} payload = {}
# the items for the current page # the items for the current page
payload['items'] = [cpos_list] payload['items'] = [cpos_list]
@ -220,7 +215,7 @@ def ext_cqp_paginate_subcorpus(
per_page: int = 20 per_page: int = 20
) -> dict: ) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)
cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name)
# Sanity checks # Sanity checks
@ -235,7 +230,7 @@ def ext_cqp_paginate_subcorpus(
return {'code': 416, 'msg': 'Range Not Satisfiable'} return {'code': 416, 'msg': 'Range Not Satisfiable'}
offset = (page - 1) * per_page offset = (page - 1) * per_page
cutoff = per_page cutoff = per_page
cqi_results_export = export_subcorpus( cqi_results_export = _export_subcorpus(
cqi_subcorpus, context=context, cutoff=cutoff, offset=offset) cqi_subcorpus, context=context, cutoff=cutoff, offset=offset)
payload = {} payload = {}
# the items for the current page # the items for the current page
@ -267,20 +262,145 @@ def ext_cqp_partial_export_subcorpus(
context: int = 50 context: int = 50
) -> dict: ) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)
cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name)
cqi_subcorpus_partial_export = partial_export_subcorpus(cqi_subcorpus, match_id_list, context=context) cqi_subcorpus_partial_export = _partial_export_subcorpus(cqi_subcorpus, match_id_list, context=context)
return cqi_subcorpus_partial_export return cqi_subcorpus_partial_export
def ext_cqp_export_subcorpus( def ext_cqp_export_subcorpus(subcorpus: str, context: int = 50) -> dict:
subcorpus: str,
context: int = 50
) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client = CQiOverSocketIOSessionManager.get_cqi_client()
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)
cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name) cqi_subcorpus = cqi_corpus.subcorpora.get(subcorpus_name)
cqi_subcorpus_export = export_subcorpus(cqi_subcorpus, context=context) cqi_subcorpus_export = _export_subcorpus(cqi_subcorpus, context=context)
return cqi_subcorpus_export return cqi_subcorpus_export
def _lookups_by_cpos(corpus: CQiCorpus, cpos_list: list[int]) -> dict:
lookups = {}
lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list}
for attr in corpus.positional_attributes.list():
cpos_attr_values = attr.values_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list):
lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i]
for attr in corpus.structural_attributes.list():
# We only want to iterate over non subattributes, identifiable by
# attr.has_values == False
if attr.has_values:
continue
cpos_attr_ids = attr.ids_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list):
if cpos_attr_ids[i] == -1:
continue
lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_ids[i]
occured_attr_ids = [x for x in set(cpos_attr_ids) if x != -1]
if len(occured_attr_ids) == 0:
continue
subattrs = corpus.structural_attributes.list(filters={'part_of': attr})
if len(subattrs) == 0:
continue
lookup_name = f'{attr.name}_lookup'
lookups[lookup_name] = {}
for attr_id in occured_attr_ids:
lookups[lookup_name][attr_id] = {}
for subattr in subattrs:
subattr_name = subattr.name[(len(attr.name) + 1):] # noqa
for i, subattr_value in enumerate(subattr.values_by_ids(occured_attr_ids)): # noqa
lookups[lookup_name][occured_attr_ids[i]][subattr_name] = subattr_value # noqa
return lookups
def _partial_export_subcorpus(
subcorpus: CQiSubcorpus,
match_id_list: list[int],
context: int = 25
) -> dict:
if subcorpus.size == 0:
return {'matches': []}
match_boundaries = []
for match_id in match_id_list:
if match_id < 0 or match_id >= subcorpus.size:
continue
match_boundaries.append(
(
match_id,
subcorpus.dump(subcorpus.fields['match'], match_id, match_id)[0],
subcorpus.dump(subcorpus.fields['matchend'], match_id, match_id)[0]
)
)
cpos_set = set()
matches = []
for match_boundary in match_boundaries:
match_num, match_start, match_end = match_boundary
c = (match_start, match_end)
if match_start == 0 or context == 0:
lc = None
cpos_list_lbound = match_start
else:
lc_lbound = max(0, (match_start - context))
lc_rbound = match_start - 1
lc = (lc_lbound, lc_rbound)
cpos_list_lbound = lc_lbound
if match_end == (subcorpus.collection.corpus.size - 1) or context == 0:
rc = None
cpos_list_rbound = match_end
else:
rc_lbound = match_end + 1
rc_rbound = min(
(match_end + context),
(subcorpus.collection.corpus.size - 1)
)
rc = (rc_lbound, rc_rbound)
cpos_list_rbound = rc_rbound
match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc}
matches.append(match)
cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1))
lookups = _lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set))
return {'matches': matches, **lookups}
def _export_subcorpus(
subcorpus: CQiSubcorpus,
context: int = 25,
cutoff: float = float('inf'),
offset: int = 0
) -> dict:
if subcorpus.size == 0:
return {'matches': []}
first_match = max(0, offset)
last_match = min((offset + cutoff - 1), (subcorpus.size - 1))
match_boundaries = zip(
range(first_match, last_match + 1),
subcorpus.dump(subcorpus.fields['match'], first_match, last_match),
subcorpus.dump(subcorpus.fields['matchend'], first_match, last_match)
)
cpos_set = set()
matches = []
for match_num, match_start, match_end in match_boundaries:
c = (match_start, match_end)
if match_start == 0 or context == 0:
lc = None
cpos_list_lbound = match_start
else:
lc_lbound = max(0, (match_start - context))
lc_rbound = match_start - 1
lc = (lc_lbound, lc_rbound)
cpos_list_lbound = lc_lbound
if match_end == (subcorpus.collection.corpus.size - 1) or context == 0:
rc = None
cpos_list_rbound = match_end
else:
rc_lbound = match_end + 1
rc_rbound = min(
(match_end + context),
(subcorpus.collection.corpus.size - 1)
)
rc = (rc_lbound, rc_rbound)
cpos_list_rbound = rc_rbound
match = {'num': match_num, 'lc': lc, 'c': c, 'rc': rc}
matches.append(match)
cpos_set.update(range(cpos_list_lbound, cpos_list_rbound + 1))
lookups = _lookups_by_cpos(subcorpus.collection.corpus, list(cpos_set))
return {'matches': matches, **lookups}

View File

@ -0,0 +1,37 @@
from cqi import CQiClient
from threading import Lock
from flask import session
class CQiOverSocketIOSessionManager:
@staticmethod
def setup():
session['cqi_over_sio'] = {}
@staticmethod
def teardown():
session.pop('cqi_over_sio')
@staticmethod
def set_corpus_id(corpus_id: int):
session['cqi_over_sio']['corpus_id'] = corpus_id
@staticmethod
def get_corpus_id() -> int:
return session['cqi_over_sio']['corpus_id']
@staticmethod
def set_cqi_client(cqi_client: CQiClient):
session['cqi_over_sio']['cqi_client'] = cqi_client
@staticmethod
def get_cqi_client() -> CQiClient:
return session['cqi_over_sio']['cqi_client']
@staticmethod
def set_cqi_client_lock(cqi_client_lock: Lock):
session['cqi_over_sio']['cqi_client_lock'] = cqi_client_lock
@staticmethod
def get_cqi_client_lock() -> Lock:
return session['cqi_over_sio']['cqi_client_lock']

View File

@ -0,0 +1,78 @@
from flask_login import current_user
from flask_socketio import join_room, leave_room, Namespace
from app import hashids
from app.decorators import socketio_login_required
from app.models import User
class UsersNamespace(Namespace):
@socketio_login_required
def on_get_user(self, user_hashid: str) -> dict:
user_id = hashids.decode(user_hashid)
if not isinstance(user_id, int):
return {'code': 400, 'msg': 'Bad Request'}
user = User.query.get(user_id)
if user is None:
return {'status': 404, 'statusText': 'Not found'}
if not (
user == current_user
or current_user.is_administrator
):
return {'status': 403, 'statusText': 'Forbidden'}
return {
'body': user.to_json_serializeable(
backrefs=True,
relationships=True
),
'status': 200,
'statusText': 'OK'
}
@socketio_login_required
def on_subscribe_user(self, user_hashid: str) -> dict:
user_id = hashids.decode(user_hashid)
if not isinstance(user_id, int):
return {'code': 400, 'msg': 'Bad Request'}
user = User.query.get(user_id)
if user is None:
return {'status': 404, 'statusText': 'Not found'}
if not (
user == current_user
or current_user.is_administrator
):
return {'status': 403, 'statusText': 'Forbidden'}
join_room(f'/users/{user.hashid}')
return {'status': 200, 'statusText': 'OK'}
@socketio_login_required
def on_unsubscribe_user(self, user_hashid: str) -> dict:
user_id = hashids.decode(user_hashid)
if not isinstance(user_id, int):
return {'code': 400, 'msg': 'Bad Request'}
user = User.query.get(user_id)
if user is None:
return {'status': 404, 'statusText': 'Not found'}
if not (
user == current_user
or current_user.is_administrator
):
return {'status': 403, 'statusText': 'Forbidden'}
leave_room(f'/users/{user.hashid}')
return {'status': 200, 'statusText': 'OK'}

View File

@ -4,7 +4,16 @@ nopaque.App = class App {
promises: {getUser: {}, subscribeUser: {}}, promises: {getUser: {}, subscribeUser: {}},
users: {}, users: {},
}; };
this.socket = io({transports: ['websocket'], upgrade: false}); this.sockets = {};
this.sockets['/users'] = io(
'/users',
{
transports: ['websocket'],
upgrade: false
}
);
// this.socket = io({transports: ['websocket'], upgrade: false});
this.socket = this.sockets['/users'];
this.socket.on('PATCH', (patch) => {this.onPatch(patch);}); this.socket.on('PATCH', (patch) => {this.onPatch(patch);});
} }
@ -14,7 +23,7 @@ nopaque.App = class App {
} }
this.data.promises.getUser[userId] = new Promise((resolve, reject) => { this.data.promises.getUser[userId] = new Promise((resolve, reject) => {
this.socket.emit('GET /users/<user_id>', userId, (response) => { this.socket.emit('get_user', userId, (response) => {
if (response.status === 200) { if (response.status === 200) {
this.data.users[userId] = response.body; this.data.users[userId] = response.body;
resolve(this.data.users[userId]); resolve(this.data.users[userId]);
@ -33,7 +42,7 @@ nopaque.App = class App {
} }
this.data.promises.subscribeUser[userId] = new Promise((resolve, reject) => { this.data.promises.subscribeUser[userId] = new Promise((resolve, reject) => {
this.socket.emit('SUBSCRIBE /users/<user_id>', userId, (response) => { this.socket.emit('subscribe_user', userId, (response) => {
if (response.status !== 200) { if (response.status !== 200) {
reject(response); reject(response);
return; return;