mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-01-13 11:40:35 +00:00
253 lines
11 KiB
Python
253 lines
11 KiB
Python
from .CQiClient import CQiClient
|
|
from .CQi import CONST_FIELD_MATCH, CONST_FIELD_MATCHEND
|
|
import time
|
|
from app import logger # only works if imported into opaque web app
|
|
|
|
|
|
class CQiWrapper(CQiClient):
|
|
"""
|
|
CQIiWrapper object
|
|
|
|
High level wrapper that groups and renames some functions of CQiClient
|
|
for ease of use. Also structures recieved data into python dictionaries.
|
|
|
|
Keyword arguments:
|
|
host -- host IP adress or hostname wher the cqp server is running
|
|
port -- port of the cqp server
|
|
username -- username used to connect to the cqp server
|
|
password -- password of the user to connect to the cqp server
|
|
"""
|
|
|
|
SUBCORPUS_NAMES = []
|
|
|
|
def __init__(self, host='127.0.0.1', port=4877, username='opaque',
|
|
password='opaque'):
|
|
super(CQiWrapper, self).__init__(host=host, port=port)
|
|
self.username = username
|
|
self.password = password
|
|
|
|
def connect(self):
|
|
"""
|
|
Connect with CQP server
|
|
|
|
Connects via socket to the CQP server using the given username and
|
|
password from class initiation.
|
|
"""
|
|
self.ctrl_connect(self.username, self.password)
|
|
|
|
def __create_attribute_strings(self):
|
|
"""
|
|
Creates all needed attribute strings to query for word, lemma etc. in
|
|
the given corpus.
|
|
For example: CORPUS_NAME.word to query words
|
|
"""
|
|
p_attrs = self.corpus_positional_attributes(self.corpus_name)
|
|
struct_attrs = self.corpus_structural_attributes(self.corpus_name)
|
|
self.attr_strings = {}
|
|
self.attr_strings['positional_attrs'] = {}
|
|
self.attr_strings['struct_attrs'] = {}
|
|
for p_attr in p_attrs:
|
|
self.attr_strings['positional_attrs'][p_attr] = (self.corpus_name
|
|
+ '.'
|
|
+ p_attr)
|
|
for struct_attr in struct_attrs:
|
|
self.attr_strings['struct_attrs'][struct_attr] = (self.corpus_name
|
|
+ '.'
|
|
+ struct_attr)
|
|
# logger.warning(('All positional and '
|
|
# 'structural attributes: {}').format(self.attr_strings))
|
|
|
|
def select_corpus(self, corpus_name):
|
|
if corpus_name in self.corpus_list_coprora():
|
|
self.corpus_name = corpus_name
|
|
self.__create_attribute_strings()
|
|
# logger.warning('{} does exist.'.format(corpus_name))
|
|
else:
|
|
# logger.warning('{} does not exist.'.format(corpus_name))
|
|
pass
|
|
|
|
def disconnect(self):
|
|
"""
|
|
Disconnect from CQP server
|
|
|
|
Disconnects from the CQP server. Closes used socket after disconnect.
|
|
"""
|
|
self.ctrl_bye()
|
|
self.connection.close()
|
|
# logger.warning('Disconnected from cqp server.')
|
|
|
|
def query_subcorpus(self, query, result_subcorpus_name='Query-results'):
|
|
"""
|
|
Create subcorpus
|
|
|
|
Input query will be used to create a subcorpus holding all cpos match
|
|
positions for that query.
|
|
|
|
Keyword arguments:
|
|
result_subcorpus_name -- user set name of the subcorpus which holds all
|
|
cpos match positions, produced by the query
|
|
query -- query written in cqp query language
|
|
"""
|
|
self.cqp_query(self.corpus_name, result_subcorpus_name, query)
|
|
self.result_subcorpus = (self.corpus_name
|
|
+ ':'
|
|
+ result_subcorpus_name)
|
|
self.SUBCORPUS_NAMES.append(self.result_subcorpus)
|
|
self.nr_matches = self.cqp_subcorpus_size(self.result_subcorpus)
|
|
print('Nr of all matches is:', self.nr_matches)
|
|
# logger.warning('Nr of all matches is: {}'.format(self.nr_matches))
|
|
|
|
def show_subcorpora(self):
|
|
"""
|
|
Show all subcorpora currently saved by the cqp server.
|
|
"""
|
|
return self.cqp_list_subcorpora(self.corpus_name)
|
|
|
|
def show_query_results(self,
|
|
context_len=10,
|
|
result_len=1000,
|
|
result_offset=0):
|
|
"""
|
|
Show query results
|
|
|
|
Shows the actual matched strings produce by the query. Uses the cpos
|
|
match indexes to grab those strings. saves them into an orderd
|
|
dictionary. Also saves coresponding tags, lemmas and context. Gets those
|
|
informations using the corresponding cpos.
|
|
|
|
Keyword arguments:
|
|
context_len -- defines how many words before and after a match will be
|
|
shown (default 10)
|
|
result_len -- defines how many results are actually grabbed
|
|
"""
|
|
self.context_len = context_len
|
|
self.corpus_max_len = self.cl_attribute_size(
|
|
self.attr_strings['positional_attrs']['word']
|
|
)
|
|
self.nr_matches = min(result_len, self.nr_matches)
|
|
if self.nr_matches == 0:
|
|
# logger.warning('Query resulted in 0 matches.')
|
|
return None
|
|
else:
|
|
# Get match cpos boundries
|
|
# match_boundries shows the start and end cpos of one match as a
|
|
# pair of cpositions
|
|
# [(1355, 1357), (1477, 1479)] Example for two boundry pairs
|
|
offset_start = 0 if result_offset == 0 else result_offset
|
|
offset_end = self.nr_matches + result_offset - 1
|
|
match_boundaries = zip(self.cqp_dump_subcorpus(self.result_subcorpus,
|
|
CONST_FIELD_MATCH,
|
|
offset_start,
|
|
offset_end),
|
|
self.cqp_dump_subcorpus(self.result_subcorpus,
|
|
CONST_FIELD_MATCHEND,
|
|
offset_start,
|
|
offset_end))
|
|
|
|
# Generate all cpos between match boundries including start and end boundries.
|
|
# Also generate cpos for left and right context.
|
|
# Save those cpos into dict as lists for the keys 'lc', 'hit' and 'rc'
|
|
# Also collect all cpos together in one list for the final request of
|
|
# all cpos informations
|
|
all_matches = []
|
|
all_cpos = []
|
|
for start, end in match_boundaries:
|
|
end += 1
|
|
lc_cpos = list(range(max([0, start - self.context_len]), start))
|
|
lc = {'lc': lc_cpos}
|
|
match_cpos = list(range(start, end))
|
|
match = {'hit': match_cpos}
|
|
rc_cpos = list(range(end, min([self.corpus_max_len, end + self.context_len])))
|
|
rc = {'rc': rc_cpos}
|
|
lc.update(match)
|
|
lc.update(rc)
|
|
all_cpos.extend(lc_cpos + match_cpos + rc_cpos)
|
|
all_matches.append(lc)
|
|
# print(all_matches)
|
|
# print(all_cpos)
|
|
|
|
# Get all cpos for all sneteces boundries
|
|
# s_lookup = {}
|
|
# for s_id in set(s_ids):
|
|
# s_start, s_end = self.cl_struc2cpos('UTOPIEN.s', s_id)
|
|
# # CHANGE to UTOPIEN.s will always be like this in nopaque
|
|
# s_cpos = range(s_start, s_end)
|
|
# s_lookup.update({s_id: list(s_cpos)})
|
|
# # print(list(s_cpos))
|
|
# all_cpos.extend(s_cpos)
|
|
t0 = time.time()
|
|
all_cpos = list(set(all_cpos)) # get rid of cpos duplicates
|
|
t1 = time.time()
|
|
t_total = t1 - t0
|
|
print('TIME FOR ALL CPOS:', t_total)
|
|
print('CPOS SUM:', len(all_cpos))
|
|
|
|
# Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
|
|
# all cpos entries in all_cpos_list
|
|
# Also saves these informations into self.results dict
|
|
t6 = time.time()
|
|
all_cpos_infos, text_lookup = self.get_cpos_infos(all_cpos)
|
|
t7 = time.time()
|
|
t_final = t7 - t6
|
|
print('GOT ALL RESULTS IN:', t_final)
|
|
|
|
self.results = {'matches': all_matches, 'cpos_lookup': all_cpos_infos,
|
|
'text_lookup': text_lookup}
|
|
return self.results
|
|
|
|
def get_cpos_infos(self, all_cpos):
|
|
'''
|
|
Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
|
|
all cpos entries specified in the parameter all_cpos.
|
|
'''
|
|
# Get all positional attribute informations
|
|
cpos_infos = {}
|
|
for p_attr_key in self.attr_strings['positional_attrs'].keys():
|
|
match_strs = self.cl_cpos2str(self.attr_strings['positional_attrs'][p_attr_key], all_cpos)
|
|
cpos_infos[p_attr_key] = match_strs
|
|
|
|
# Get all strucutural attribute informations
|
|
tmp_info = {}
|
|
structs_to_check = []
|
|
for struct_attr_key in self.attr_strings['struct_attrs'].keys():
|
|
key = self.attr_strings['struct_attrs'][struct_attr_key]
|
|
has_value = self.corpus_structural_attribute_has_values(key)
|
|
struct_ids = self.cl_cpos2struc(key, all_cpos)
|
|
if has_value is False: # Get IDs of strucutural elements without values (this means get IDs of XML tags. Struct elements only have values if they are XML attributes)
|
|
tmp_info[struct_attr_key] = []
|
|
for id in struct_ids:
|
|
tmp_info[struct_attr_key].append(id)
|
|
else:
|
|
structs_to_check.append({key: struct_attr_key})
|
|
struct_attr_values = list(tmp_info.values())
|
|
struct_attr_keys = list(tmp_info.keys())
|
|
|
|
# Build textlookup dictionary
|
|
text_lookup_ids = list(set(struct_attr_values[0])) # First is always one text
|
|
text_lookup = {}
|
|
for d in structs_to_check:
|
|
s_key, s_value = zip(*d.items())
|
|
s_value = s_value[0].split('_', 1)[1]
|
|
struct_values = self.cl_struc2str(s_key[0], text_lookup_ids)
|
|
zipped = dict(zip(text_lookup_ids, struct_values))
|
|
for zip_key, zip_value in zipped.items():
|
|
check = text_lookup.get(zip_key)
|
|
if check is None:
|
|
text_lookup[zip_key] = {s_value: zip_value}
|
|
else:
|
|
text_lookup[zip_key].update({s_value: zip_value})
|
|
|
|
# zip keys and values together
|
|
attr_values_list = []
|
|
attr_keys_list = []
|
|
for key in cpos_infos.keys():
|
|
attr_values_list.append(cpos_infos[key])
|
|
attr_keys_list.append(key)
|
|
attr_keys_list.extend(struct_attr_keys)
|
|
attr_values_list.extend(struct_attr_values)
|
|
joined_cpos_infos = zip(all_cpos, *attr_values_list)
|
|
dict_cpos_infos = {}
|
|
for info in joined_cpos_infos:
|
|
dict_cpos_infos[info[0]] = dict(zip(attr_keys_list, info[1:]))
|
|
return dict_cpos_infos, text_lookup
|