Add compression to static corpus data, use chunked computation, hide read corpus ids in corpus analysis

This commit is contained in:
Patrick Jentsch 2023-07-06 13:02:22 +02:00
parent 413b6111df
commit a9973e9c8e
7 changed files with 148 additions and 84 deletions

View File

@ -19,6 +19,9 @@ def reset():
for corpus in [x for x in Corpus.query.all() if x.status in status]:
print(f'Resetting corpus {corpus}')
shutil.rmtree(os.path.join(corpus.path, 'cwb'), ignore_errors=True)
os.mkdir(os.path.join(corpus.path, 'cwb'))
os.mkdir(os.path.join(corpus.path, 'cwb', 'data'))
os.mkdir(os.path.join(corpus.path, 'cwb', 'registry'))
corpus.status = CorpusStatus.UNPREPARED
corpus.num_analysis_sessions = 0
db.session.commit()

View File

@ -7,6 +7,7 @@ from threading import Lock
from app import db, hashids, socketio
from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus
import math
'''
@ -83,7 +84,7 @@ def connect(auth):
socketio.sleep(3)
retry_counter -= 1
db.session.refresh(corpus)
cqi_client = CQiClient(f'cqpserver_{corpus_id}')
cqi_client = CQiClient(f'cqpserver_{corpus_id}', timeout=math.inf)
session['cqi_over_sio'] = {
'corpus_id': corpus_id,
'cqi_client': cqi_client,

View File

@ -1,12 +1,18 @@
from collections import Counter
from cqi import CQiClient
from cqi.models.corpora import Corpus
from cqi.status import StatusOk
from cqi.models.corpora import Corpus as CQiCorpus
from cqi.models.attributes import (
PositionalAttribute as CQiPositionalAttribute,
StructuralAttribute as CQiStructuralAttribute
)
from cqi.status import StatusOk as CQiStatusOk
from flask import session
from typing import Dict, List
import gzip
import json
import math
import os
import shutil
from app import db
from app.models import Corpus
from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus
@ -22,43 +28,28 @@ CQI_EXTENSION_FUNCTION_NAMES: List[str] = [
]
def ext_corpus_update_db(corpus: str):
def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus)
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
db_corpus.num_tokens = cqi_corpus.size
db.session.commit()
return StatusOk()
return CQiStatusOk()
def ext_corpus_static_data(corpus: str) -> Dict:
db_corpus = Corpus.query.get(session['cqi_over_sio']['corpus_id'])
static_corpus_data_file = os.path.join(db_corpus.path, 'cwb', 'static.json')
static_corpus_data_file = os.path.join(db_corpus.path, 'cwb', 'static.json.gz')
if os.path.exists(static_corpus_data_file):
with open(static_corpus_data_file, 'r') as f:
return json.load(f)
with open(static_corpus_data_file, 'rb') as f:
return f.read()
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus)
##########################################################################
# A faster way to get cpos boundaries for smaller s_attrs #
##########################################################################
# cqi_corpus.query('Last', '<s> []* </s>;')
# cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
# print(cqi_subcorpus.size)
# first_match = 0
# last_match = cqi_subcorpus.attrs['size'] - 1
# match_boundaries = zip(
# list(range(first_match, last_match + 1)),
# cqi_subcorpus.dump(cqi_subcorpus.attrs['fields']['match'], first_match, last_match),
# cqi_subcorpus.dump(cqi_subcorpus.attrs['fields']['matchend'], first_match, last_match)
# )
# for x in match_boundaries:
# print(x)
cqi_p_attrs = {
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
cqi_p_attrs: Dict[str, CQiPositionalAttribute] = {
p_attr.name: p_attr
for p_attr in cqi_corpus.positional_attributes.list()
}
cqi_s_attrs = {
cqi_s_attrs: Dict[str, CQiStructuralAttribute] = {
s_attr.name: s_attr
for s_attr in cqi_corpus.structural_attributes.list()
}
@ -75,73 +66,121 @@ def ext_corpus_static_data(corpus: str) -> Dict:
'values': {'p_attrs': {}, 's_attrs': {}}
}
for p_attr in cqi_p_attrs.values():
static_corpus_data['corpus']['freqs'][p_attr.name] = dict(
zip(
range(0, p_attr.lexicon_size),
p_attr.freqs_by_ids(list(range(0, p_attr.lexicon_size)))
static_corpus_data['corpus']['freqs'][p_attr.name] = {}
chunk_size = 5000
p_attr_id_list = list(range(p_attr.lexicon_size))
chunks = [p_attr_id_list[i:i+chunk_size] for i in range(0, len(p_attr_id_list), chunk_size)]
del p_attr_id_list
for chunk in chunks:
print(f'corpus.freqs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
static_corpus_data['corpus']['freqs'][p_attr.name].update(
dict(zip(chunk, p_attr.freqs_by_ids(chunk)))
)
del chunks
static_corpus_data['p_attrs'][p_attr.name] = {}
cpos_list = list(range(cqi_corpus.size))
chunks = [cpos_list[i:i+chunk_size] for i in range(0, len(cpos_list), chunk_size)]
del cpos_list
for chunk in chunks:
print(f'p_attrs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
static_corpus_data['p_attrs'][p_attr.name].update(
dict(zip(chunk, p_attr.ids_by_cpos(chunk)))
)
static_corpus_data['p_attrs'][p_attr.name] = dict(
zip(
range(0, cqi_corpus.size),
p_attr.ids_by_cpos(list(range(0, cqi_corpus.size)))
)
)
static_corpus_data['values']['p_attrs'][p_attr.name] = dict(
zip(
range(0, p_attr.lexicon_size),
p_attr.values_by_ids(list(range(0, p_attr.lexicon_size)))
)
del chunks
static_corpus_data['values']['p_attrs'][p_attr.name] = {}
p_attr_id_list = list(range(p_attr.lexicon_size))
chunks = [p_attr_id_list[i:i+chunk_size] for i in range(0, len(p_attr_id_list), chunk_size)]
del p_attr_id_list
for chunk in chunks:
print(f'values.p_attrs.{p_attr.name}: {chunk[0]} - {chunk[-1]}')
static_corpus_data['values']['p_attrs'][p_attr.name].update(
dict(zip(chunk, p_attr.values_by_ids(chunk)))
)
del chunks
for s_attr in cqi_s_attrs.values():
if s_attr.has_values:
continue
static_corpus_data['corpus']['counts'][s_attr.name] = s_attr.size
static_corpus_data['s_attrs'][s_attr.name] = {'lexicon': {}, 'values': None}
static_corpus_data['values']['s_attrs'][s_attr.name] = {}
if s_attr.name in ['s', 'ent']:
cqi_corpus.query('Last', f'<{s_attr.name}> []* </{s_attr.name}>;')
cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
first_match = 0
last_match = cqi_subcorpus.size - 1
match_boundaries = zip(
range(first_match, last_match + 1),
cqi_subcorpus.dump(cqi_subcorpus.fields['match'], first_match, last_match),
cqi_subcorpus.dump(cqi_subcorpus.fields['matchend'], first_match, last_match)
)
for id, lbound, rbound in match_boundaries:
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
cqi_subcorpus.drop()
##########################################################################
# A faster way to get cpos boundaries for smaller s_attrs #
##########################################################################
# if s_attr.name in ['s', 'ent']:
# cqi_corpus.query('Last', f'<{s_attr.name}> []* </{s_attr.name}>;')
# cqi_subcorpus = cqi_corpus.subcorpora.get('Last')
# first_match = 0
# last_match = cqi_subcorpus.size - 1
# match_boundaries = zip(
# range(first_match, last_match + 1),
# cqi_subcorpus.dump(cqi_subcorpus.fields['match'], first_match, last_match),
# cqi_subcorpus.dump(cqi_subcorpus.fields['matchend'], first_match, last_match)
# )
# for id, lbound, rbound in match_boundaries:
# static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
# static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
# static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
# static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
# cqi_subcorpus.drop()
for id in range(0, s_attr.size):
if s_attr.name not in ['s', 'ent']:
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {}
lbound, rbound = s_attr.cpos_by_id(id)
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
if s_attr.name not in ['text', 's']:
continue
cpos_range = range(lbound, rbound + 1)
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['ent'] = len({x for x in cqi_s_attrs['ent'].ids_by_cpos(list(cpos_range)) if x != -1})
print(f's_attrs.{s_attr.name}.lexicon.{id}')
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id] = {
'bounds': None,
'counts': None,
'freqs': None
}
if s_attr.name != 'text':
continue
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['s'] = len({x for x in cqi_s_attrs['s'].ids_by_cpos(list(cpos_range)) if x != -1})
lbound, rbound = s_attr.cpos_by_id(id)
print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds')
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
print(f's_attrs.{s_attr.name}.lexicon.{id}.counts')
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts'] = {}
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['token'] = rbound - lbound + 1
cpos_list = list(range(lbound, rbound + 1))
chunks = [cpos_list[i:i+chunk_size] for i in range(0, len(cpos_list), chunk_size)]
del cpos_list
ent_ids = set()
for chunk in chunks:
print(f'Gather ent_ids from cpos: {chunk[0]} - {chunk[-1]}')
ent_ids.update({x for x in cqi_s_attrs['ent'].ids_by_cpos(chunk) if x != -1})
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['ent'] = len(ent_ids)
del ent_ids
s_ids = set()
for chunk in chunks:
print(f'Gather s_ids from cpos: {chunk[0]} - {chunk[-1]}')
s_ids.update({x for x in cqi_s_attrs['s'].ids_by_cpos(chunk) if x != -1})
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['counts']['s'] = len(s_ids)
del s_ids
print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs')
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {}
for p_attr in cqi_p_attrs.values():
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr.ids_by_cpos(list(cpos_range))))
p_attr_ids = []
for chunk in chunks:
print(f'Gather p_attr_ids from cpos: {chunk[0]} - {chunk[-1]}')
p_attr_ids.extend(p_attr.ids_by_cpos(chunk))
static_corpus_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids))
del p_attr_ids
del chunks
sub_s_attrs = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr})
s_attr_value_names = [
s_attr_value_names: List[str] = [
sub_s_attr.name[(len(s_attr.name) + 1):]
for sub_s_attr in sub_s_attrs
]
sub_s_attr_values = [
sub_s_attr.values_by_ids(list(range(0, s_attr.size)))
for sub_s_attr in sub_s_attrs
]
s_attr_id_list = list(range(s_attr.size))
chunks = [s_attr_id_list[i:i+chunk_size] for i in range(0, len(s_attr_id_list), chunk_size)]
del s_attr_id_list
sub_s_attr_values = []
for sub_s_attr in sub_s_attrs:
tmp = []
for chunk in chunks:
tmp.extend(sub_s_attr.values_by_ids(chunk))
sub_s_attr_values.append(tmp)
del tmp
del chunks
print(f's_attrs.{s_attr.name}.values')
static_corpus_data['s_attrs'][s_attr.name]['values'] = s_attr_value_names
print(f'values.s_attrs.{s_attr.name}')
static_corpus_data['values']['s_attrs'][s_attr.name] = {
s_attr_id: {
s_attr_value_name: sub_s_attr_values[s_attr_value_name_idx][s_attr_id_idx]
@ -150,9 +189,12 @@ def ext_corpus_static_data(corpus: str) -> Dict:
)
} for s_attr_id_idx, s_attr_id in enumerate(range(0, s_attr.size))
}
with open(static_corpus_data_file, 'w') as f:
del sub_s_attr_values
with gzip.open(static_corpus_data_file, 'wt') as f:
json.dump(static_corpus_data, f)
return static_corpus_data
del static_corpus_data
with open(static_corpus_data_file, 'rb') as f:
return f.read()
def ext_corpus_paginate_corpus(

View File

@ -28,19 +28,19 @@ def _create_build_corpus_service(corpus):
''' ## Command ## '''
command = ['bash', '-c']
command.append(
f'mkdir /corpora/data/nopaque_{corpus.id}'
f'mkdir /corpora/data/nopaque-{corpus.hashid.lower()}'
' && '
'cwb-encode'
' -c utf8'
f' -d /corpora/data/nopaque_{corpus.id}'
f' -d /corpora/data/nopaque-{corpus.hashid.lower()}'
' -f /root/files/corpus.vrt'
f' -R /usr/local/share/cwb/registry/nopaque_{corpus.id}'
f' -R /usr/local/share/cwb/registry/nopaque-{corpus.hashid.lower()}'
' -P pos -P lemma -P simple_pos'
' -S ent:0+type -S s:0'
' -S text:0+address+author+booktitle+chapter+editor+institution+journal+pages+publisher+publishing_year+school+title'
' -xsB -9'
' && '
f'cwb-make -V NOPAQUE_{corpus.id}'
f'cwb-make -V NOPAQUE-{corpus.hashid.upper()}'
)
''' ## Constraints ## '''
constraints = ['node.role==worker']
@ -149,11 +149,15 @@ def _create_cqpserver_container(corpus):
''' ### Corpus data volume ### '''
data_volume_source = os.path.join(corpus.path, 'cwb', 'data')
data_volume_target = '/corpora/data'
# data_volume_source = os.path.join(corpus.path, 'cwb', 'data', f'nopaque_{corpus.id}')
# data_volume_target = f'/corpora/data/nopaque_{corpus.hashid.lower()}'
data_volume = f'{data_volume_source}:{data_volume_target}:rw'
volumes.append(data_volume)
''' ### Corpus registry volume ### '''
registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry')
registry_volume_target = '/usr/local/share/cwb/registry'
# registry_volume_source = os.path.join(corpus.path, 'cwb', 'registry', f'nopaque_{corpus.id}')
# registry_volume_target = f'/usr/local/share/cwb/registry/nopaque_{corpus.hashid.lower()}'
registry_volume = f'{registry_volume_source}:{registry_volume_target}:rw'
volumes.append(registry_volume)
# Check if a cqpserver container already exists. If this is the case,

View File

@ -1607,9 +1607,14 @@ class Corpus(HashidMixin, db.Model):
return corpus
def build(self):
build_dir = os.path.join(self.path, 'cwb')
shutil.rmtree(build_dir, ignore_errors=True)
os.mkdir(build_dir)
os.mkdir(os.path.join(build_dir, 'data'))
os.mkdir(os.path.join(build_dir, 'registry'))
corpus_element = ET.fromstring('<corpus>\n</corpus>')
for corpus_file in self.files:
normalized_vrt_path = os.path.join(self.path, 'cwb', f'{corpus_file.id}.norm.vrt')
normalized_vrt_path = os.path.join(build_dir, f'{corpus_file.id}.norm.vrt')
try:
normalize_vrt_file(corpus_file.path, normalized_vrt_path)
except:
@ -1636,7 +1641,7 @@ class Corpus(HashidMixin, db.Model):
# corpus_element.insert(1, text_element)
corpus_element.append(text_element)
ET.ElementTree(corpus_element).write(
os.path.join(self.path, 'cwb', 'corpus.vrt'),
os.path.join(build_dir, 'corpus.vrt'),
encoding='utf-8'
)
self.status = CorpusStatus.SUBMITTED

View File

@ -138,7 +138,15 @@ cqi.models.corpora.CorpusCollection = class CorpusCollection extends cqi.models.
/************************************************************************
* Custom additions for nopaque *
************************************************************************/
returnValue.static_data = await this.client.api.ext_corpus_static_data(corpusName);
// returnValue.static_data = await this.client.api.ext_corpus_static_data(corpusName);
let tmp = await this.client.api.ext_corpus_static_data(corpusName);
console.log(tmp);
let inflated = pako.inflate(tmp);
console.log(inflated);
let decoder = new TextDecoder('utf-8');
console.log(decoder);
let decoded = decoder.decode(inflated);
returnValue.static_data = JSON.parse(decoded);
return returnValue;
}

View File

@ -2,6 +2,7 @@
<script src="https://cdnjs.cloudflare.com/ajax/libs/list.js/2.3.1/list.min.js" integrity="sha512-93wYgwrIFL+b+P3RvYxi/WUFRXXUDSLCT2JQk9zhVGXuS2mHl2axj6d+R6pP+gcU5isMHRj1u0oYE/mWyt/RjA==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.5.4/socket.io.min.js" integrity="sha512-HTENHrkQ/P0NGDFd5nk6ibVtCkcM7jhr2c7GyvXp5O+4X6O5cQO9AhqFzM+MdeBivsX7Hoys2J7pp2wdgMpCvw==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/plotly.js/2.24.2/plotly.min.js" integrity="sha512-dAXqGCq94D0kgLSPnfvd/pZpCMoJQpGj2S2XQmFQ9Ay1+96kbjss02ISEh+TBNXMggGg/1qoMcOHcxg+Op/Jmw==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/pako/2.1.0/pako_inflate.min.js" integrity="sha512-mlnC6JeOvg9V4vBpWMxGKscsCdScB6yvGVCeFF2plnQMRmwH69s9F8SHPbC0oirqfePmRBhqx2s3Bx7WIvHfWg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
{%- assets
filters='rjsmin',