From a9973e9c8e75ecee69901d3da56fdba73f7924a8 Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Thu, 6 Jul 2023 13:02:22 +0200
Subject: [PATCH] Add compression to static corpus data, use chunked
computation, hide read corpus ids in corpus analysis
---
app/corpora/cli.py | 3 +
app/corpora/cqi_over_sio/__init__.py | 3 +-
app/corpora/cqi_over_sio/extensions.py | 194 +++++++++++++++----------
app/daemon/corpus_utils.py | 12 +-
app/models.py | 9 +-
app/static/js/cqi/models/corpora.js | 10 +-
app/templates/_scripts.html.j2 | 1 +
7 files changed, 148 insertions(+), 84 deletions(-)
diff --git a/app/corpora/cli.py b/app/corpora/cli.py
index 67658825..8c1a0970 100644
--- a/app/corpora/cli.py
+++ b/app/corpora/cli.py
@@ -19,6 +19,9 @@ def reset():
for corpus in [x for x in Corpus.query.all() if x.status in status]:
print(f'Resetting corpus {corpus}')
shutil.rmtree(os.path.join(corpus.path, 'cwb'), ignore_errors=True)
+ os.mkdir(os.path.join(corpus.path, 'cwb'))
+ os.mkdir(os.path.join(corpus.path, 'cwb', 'data'))
+ os.mkdir(os.path.join(corpus.path, 'cwb', 'registry'))
corpus.status = CorpusStatus.UNPREPARED
corpus.num_analysis_sessions = 0
db.session.commit()
diff --git a/app/corpora/cqi_over_sio/__init__.py b/app/corpora/cqi_over_sio/__init__.py
index 717f6015..8afcf502 100644
--- a/app/corpora/cqi_over_sio/__init__.py
+++ b/app/corpora/cqi_over_sio/__init__.py
@@ -7,6 +7,7 @@ from threading import Lock
from app import db, hashids, socketio
from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus
+import math
'''
@@ -83,7 +84,7 @@ def connect(auth):
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(corpus)
- cqi_client = CQiClient(f'cqpserver_{corpus_id}')
+ cqi_client = CQiClient(f'cqpserver_{corpus_id}', timeout=math.inf)
session['cqi_over_sio'] = {
'corpus_id': corpus_id,
'cqi_client': cqi_client,
diff --git a/app/corpora/cqi_over_sio/extensions.py b/app/corpora/cqi_over_sio/extensions.py
index 0ff166bf..9f457c9b 100644
--- a/app/corpora/cqi_over_sio/extensions.py
+++ b/app/corpora/cqi_over_sio/extensions.py
@@ -1,12 +1,18 @@
from collections import Counter
from cqi import CQiClient
-from cqi.models.corpora import Corpus
-from cqi.status import StatusOk
+from cqi.models.corpora import Corpus as CQiCorpus
+from cqi.models.attributes import (
+ PositionalAttribute as CQiPositionalAttribute,
+ StructuralAttribute as CQiStructuralAttribute
+)
+from cqi.status import StatusOk as CQiStatusOk
from flask import session
from typing import Dict, List
+import gzip
import json
import math
import os
+import shutil
from app import db
from app.models import Corpus
from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus
@@ -22,43 +28,28 @@ CQI_EXTENSION_FUNCTION_NAMES: List[str] = [
]
-def ext_corpus_update_db(corpus: str):
+def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
- cqi_corpus = cqi_client.corpora.get(corpus)
+ cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
db_corpus.num_tokens = cqi_corpus.size
db.session.commit()
- return StatusOk()
+ return CQiStatusOk()
def ext_corpus_static_data(corpus: str) -> Dict:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
- static_corpus_data_file = os.path.join(db_corpus.path, 'cwb', 'static.json')
+ static_corpus_data_file = os.path.join(db_corpus.path, 'cwb', 'static.json.gz')
if os.path.exists(static_corpus_data_file):
- with open(static_corpus_data_file, 'r') as f:
- return json.load(f)
+ with open(static_corpus_data_file, 'rb') as f:
+ return f.read()
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
- cqi_corpus = cqi_client.corpora.get(corpus)
- ##########################################################################
- # A faster way to get cpos boundaries for smaller s_attrs #
- ##########################################################################
- # cqi_corpus.query('Last', ' []* ;')
- # cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
- # print(cqi_subcorpus.size)
- # first_match = 0
- # last_match = cqi_subcorpus.attrs['size'] - 1
- # match_boundaries = zip(
- # list(range(first_match, last_match + 1)),
- # cqi_subcorpus.dump(cqi_subcorpus.attrs['fields']['match'], first_match, last_match),
- # cqi_subcorpus.dump(cqi_subcorpus.attrs['fields']['matchend'], first_match, last_match)
- # )
- # for x in match_boundaries:
- # print(x)
- cqi_p_attrs = {
+ cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
+ cqi_p_attrs: Dict[str, CQiPositionalAttribute] = {
p_attr.name: p_attr
for p_attr in cqi_corpus.positional_attributes.list()
}
- cqi_s_attrs = {
+ cqi_s_attrs: Dict[str, CQiStructuralAttribute] = {
s_attr.name: s_attr
for s_attr in cqi_corpus.structural_attributes.list()
}
@@ -75,73 +66,121 @@ def ext_corpus_static_data(corpus: str) -> Dict:
'values': {'p_attrs': {}, 's_attrs': {}}
}
for p_attr in cqi_p_attrs.values():
- static_corpus_data['corpus']['freqs'][p_attr.name] = dict(
- zip(
- range(0, p_attr.lexicon_size),
- p_attr.freqs_by_ids(list(range(0, p_attr.lexicon_size)))
+ static_corpus_data['corpus']['freqs'][p_attr.name] = {}
+ chunk_size = 5000
+ p_attr_id_list = list(range(p_attr.lexicon_size))
+ chunks = [p_attr_id_list[i:i+chunk_size] for i in range(0, len(p_attr_id_list), chunk_size)]
+ del p_attr_id_list
+ for chunk in chunks:
+ print(f'corpus.freqs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
+ static_corpus_data['corpus']['freqs'][p_attr.name].update(
+ dict(zip(chunk, p_attr.freqs_by_ids(chunk)))
)
- )
- static_corpus_data['p_attrs'][p_attr.name] = dict(
- zip(
- range(0, cqi_corpus.size),
- p_attr.ids_by_cpos(list(range(0, cqi_corpus.size)))
+ del chunks
+ static_corpus_data['p_attrs'][p_attr.name] = {}
+ cpos_list = list(range(cqi_corpus.size))
+ chunks = [cpos_list[i:i+chunk_size] for i in range(0, len(cpos_list), chunk_size)]
+ del cpos_list
+ for chunk in chunks:
+ print(f'p_attrs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
+ static_corpus_data['p_attrs'][p_attr.name].update(
+ dict(zip(chunk, p_attr.ids_by_cpos(chunk)))
)
- )
- static_corpus_data['values']['p_attrs'][p_attr.name] = dict(
- zip(
- range(0, p_attr.lexicon_size),
- p_attr.values_by_ids(list(range(0, p_attr.lexicon_size)))
+ del chunks
+ static_corpus_data['values']['p_attrs'][p_attr.name] = {}
+ p_attr_id_list = list(range(p_attr.lexicon_size))
+ chunks = [p_attr_id_list[i:i+chunk_size] for i in range(0, len(p_attr_id_list), chunk_size)]
+ del p_attr_id_list
+ for chunk in chunks:
+ print(f'values.p_attrs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
+ static_corpus_data['values']['p_attrs'][p_attr.name].update(
+ dict(zip(chunk, p_attr.values_by_ids(chunk)))
)
- )
+ del chunks
for s_attr in cqi_s_attrs.values():
if s_attr.has_values:
continue
static_corpus_data['corpus']['counts'][s_attr.name] = s_attr.size
static_corpus_data['s_attrs'][s_attr.name] = {'lexicon': {}, 'values': None}
static_corpus_data['values']['s_attrs'][s_attr.name] = {}
- if s_attr.name in ['s', 'ent']:
- cqi_corpus.query('Last', f'<{s_attr.name}> []* {s_attr.name}>;')
- cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
- first_match = 0
- last_match = cqi_subcorpus.size - 1
- match_boundaries = zip(
- range(first_match, last_match + 1),
- cqi_subcorpus.dump(cqi_subcorpus.fields['match'], first_match, last_match),
- cqi_subcorpus.dump(cqi_subcorpus.fields['matchend'], first_match, last_match)
- )
- for id, lbound, rbound in match_boundaries:
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
- cqi_subcorpus.drop()
+ ##########################################################################
+ # A faster way to get cpos boundaries for smaller s_attrs #
+ ##########################################################################
+ # if s_attr.name in ['s', 'ent']:
+ # cqi_corpus.query('Last', f'<{s_attr.name}> []* {s_attr.name}>;')
+ # cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
+ # first_match = 0
+ # last_match = cqi_subcorpus.size - 1
+ # match_boundaries = zip(
+ # range(first_match, last_match + 1),
+ # cqi_subcorpus.dump(cqi_subcorpus.fields['match'], first_match, last_match),
+ # cqi_subcorpus.dump(cqi_subcorpus.fields['matchend'], first_match, last_match)
+ # )
+ # for id, lbound, rbound in match_boundaries:
+ # static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
+ # static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
+ # static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
+ # static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
+ # cqi_subcorpus.drop()
for id in range(0, s_attr.size):
- if s_attr.name not in ['s', 'ent']:
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
- lbound, rbound = s_attr.cpos_by_id(id)
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
- if s_attr.name not in ['text', 's']:
- continue
- cpos_range = range(lbound, rbound + 1)
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['ent'] = len({x for x in cqi_s_attrs['ent'].ids_by_cpos(list(cpos_range)) if x != -1})
+ print(f's_attrs.{s_attr.name}.lexicon.{id}')
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {
+ 'bounds': None,
+ 'counts': None,
+ 'freqs': None
+ }
if s_attr.name != 'text':
continue
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['s'] = len({x for x in cqi_s_attrs['s'].ids_by_cpos(list(cpos_range)) if x != -1})
+ lbound, rbound = s_attr.cpos_by_id(id)
+ print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds')
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
+ print(f's_attrs.{s_attr.name}.lexicon.{id}.counts')
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
+ cpos_list = list(range(lbound, rbound + 1))
+ chunks = [cpos_list[i:i+chunk_size] for i in range(0, len(cpos_list), chunk_size)]
+ del cpos_list
+ ent_ids = set()
+ for chunk in chunks:
+ print(f'Gather ent_ids from cpos: {chunk[0]} - {chunk[-1]}')
+ ent_ids.update({x for x in cqi_s_attrs['ent'].ids_by_cpos(chunk) if x != -1})
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['ent'] = len(ent_ids)
+ del ent_ids
+ s_ids = set()
+ for chunk in chunks:
+ print(f'Gather s_ids from cpos: {chunk[0]} - {chunk[-1]}')
+ s_ids.update({x for x in cqi_s_attrs['s'].ids_by_cpos(chunk) if x != -1})
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['s'] = len(s_ids)
+ del s_ids
+ print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs')
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {}
for p_attr in cqi_p_attrs.values():
- static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr.ids_by_cpos(list(cpos_range))))
+ p_attr_ids = []
+ for chunk in chunks:
+ print(f'Gather p_attr_ids from cpos: {chunk[0]} - {chunk[-1]}')
+ p_attr_ids.extend(p_attr.ids_by_cpos(chunk))
+ static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids))
+ del p_attr_ids
+ del chunks
sub_s_attrs = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr})
- s_attr_value_names = [
+ s_attr_value_names: List[str] = [
sub_s_attr.name[(len(s_attr.name) + 1):]
for sub_s_attr in sub_s_attrs
]
- sub_s_attr_values = [
- sub_s_attr.values_by_ids(list(range(0, s_attr.size)))
- for sub_s_attr in sub_s_attrs
- ]
+ s_attr_id_list = list(range(s_attr.size))
+ chunks = [s_attr_id_list[i:i+chunk_size] for i in range(0, len(s_attr_id_list), chunk_size)]
+ del s_attr_id_list
+ sub_s_attr_values = []
+ for sub_s_attr in sub_s_attrs:
+ tmp = []
+ for chunk in chunks:
+ tmp.extend(sub_s_attr.values_by_ids(chunk))
+ sub_s_attr_values.append(tmp)
+ del tmp
+ del chunks
+ print(f's_attrs.{s_attr.name}.values')
static_corpus_data['s_attrs'][s_attr.name]['values'] = s_attr_value_names
+ print(f'values.s_attrs.{s_attr.name}')
static_corpus_data['values']['s_attrs'][s_attr.name] = {
s_attr_id: {
s_attr_value_name: sub_s_attr_values[s_attr_value_name_idx][s_attr_id_idx]
@@ -150,9 +189,12 @@ def ext_corpus_static_data(corpus: str) -> Dict:
)
} for s_attr_id_idx, s_attr_id in enumerate(range(0, s_attr.size))
}
- with open(static_corpus_data_file, 'w') as f:
+ del sub_s_attr_values
+ with gzip.open(static_corpus_data_file, 'wt') as f:
json.dump(static_corpus_data, f)
- return static_corpus_data
+ del static_corpus_data
+ with open(static_corpus_data_file, 'rb') as f:
+ return f.read()
def ext_corpus_paginate_corpus(
diff --git a/app/daemon/corpus_utils.py b/app/daemon/corpus_utils.py
index 4d807c14..5b885db7 100644
--- a/app/daemon/corpus_utils.py
+++ b/app/daemon/corpus_utils.py
@@ -28,19 +28,19 @@ def _create_build_corpus_service(corpus):
''' ## Command ## '''
command = ['bash', '-c']
command.append(
- f'mkdir /corpora/data/nopaque_{corpus.id}'
+ f'mkdir /corpora/data/nopaque-{corpus.hashid.lower()}'
' && '
'cwb-encode'
' -c utf8'
- f' -d /corpora/data/nopaque_{corpus.id}'
+ f' -d /corpora/data/nopaque-{corpus.hashid.lower()}'
' -f /root/files/corpus.vrt'
- f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}'
+ f' -R /usr/local/share/cwb/registry/nopaque-{corpus.hashid.lower()}'
' -P pos -P lemma -P simple_pos'
' -S ent:0+type -S s:0'
' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title'
' -xsB -9'
' && '
- f'cwb-make -V NOPAQUE_{corpus.id}'
+ f'cwb-make -V NOPAQUE-{corpus.hashid.upper()}'
)
''' ## Constraints ## '''
constraints = ['node.role==worker']
@@ -149,11 +149,15 @@ def _create_cqpserver_container(corpus):
''' ### Corpus data volume ### '''
data_volume_source = os.path.join(corpus.path, 'cwb', 'data')
data_volume_target = '/corpora/data'
+ # data_volume_source = os.path.join(corpus.path, 'cwb', 'data', f'nopaque_{corpus.id}')
+ # data_volume_target = f'/corpora/data/nopaque_{corpus.hashid.lower()}'
data_volume = f'{data_volume_source}:{data_volume_target}:rw'
volumes.append(data_volume)
''' ### Corpus registry volume ### '''
registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_volume_target = '/usr/local/share/cwb/registry'
+ # registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry', f'nopaque_{corpus.id}')
+ # registry_volume_target = f'/usr/local/share/cwb/registry/nopaque_{corpus.hashid.lower()}'
registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw'
volumes.append(registry_volume)
# Check if a cqpserver container already exists. If this is the case,
diff --git a/app/models.py b/app/models.py
index a7cc77e9..8121f7a9 100644
--- a/app/models.py
+++ b/app/models.py
@@ -1607,9 +1607,14 @@ class Corpus(HashidMixin, db.Model):
return corpus
def build(self):
+ build_dir = os.path.join(self.path, 'cwb')
+ shutil.rmtree(build_dir, ignore_errors=True)
+ os.mkdir(build_dir)
+ os.mkdir(os.path.join(build_dir, 'data'))
+ os.mkdir(os.path.join(build_dir, 'registry'))
corpus_element = ET.fromstring('\n')
for corpus_file in self.files:
- normalized_vrt_path = os.path.join(self.path, 'cwb', f'{corpus_file.id}.norm.vrt')
+ normalized_vrt_path = os.path.join(build_dir, f'{corpus_file.id}.norm.vrt')
try:
normalize_vrt_file(corpus_file.path, normalized_vrt_path)
except:
@@ -1636,7 +1641,7 @@ class Corpus(HashidMixin, db.Model):
# corpus_element.insert(1, text_element)
corpus_element.append(text_element)
ET.ElementTree(corpus_element).write(
- os.path.join(self.path, 'cwb', 'corpus.vrt'),
+ os.path.join(build_dir, 'corpus.vrt'),
encoding='utf-8'
)
self.status = CorpusStatus.SUBMITTED
diff --git a/app/static/js/cqi/models/corpora.js b/app/static/js/cqi/models/corpora.js
index 8128c47f..2b271811 100644
--- a/app/static/js/cqi/models/corpora.js
+++ b/app/static/js/cqi/models/corpora.js
@@ -138,7 +138,15 @@ cqi.models.corpora.CorpusCollection = class CorpusCollection extends cqi.models.
/************************************************************************
* Custom additions for nopaque *
************************************************************************/
- returnValue.static_data = await this.client.api.ext_corpus_static_data(corpusName);
+ // returnValue.static_data = await this.client.api.ext_corpus_static_data(corpusName);
+ let tmp = await this.client.api.ext_corpus_static_data(corpusName);
+ console.log(tmp);
+ let inflated = pako.inflate(tmp);
+ console.log(inflated);
+ let decoder = new TextDecoder('utf-8');
+ console.log(decoder);
+ let decoded = decoder.decode(inflated);
+ returnValue.static_data = JSON.parse(decoded);
return returnValue;
}
diff --git a/app/templates/_scripts.html.j2 b/app/templates/_scripts.html.j2
index 9f7f6396..d8dfa6d1 100644
--- a/app/templates/_scripts.html.j2
+++ b/app/templates/_scripts.html.j2
@@ -2,6 +2,7 @@
+
{%- assets
filters='rjsmin',