nopaque/app/corpora/CQiWrapper/CQiWrapper.py
2019-11-27 09:41:21 +01:00

242 lines
10 KiB
Python

from CQiClient import CQiClient
from CQi import CONST_FIELD_MATCH, CONST_FIELD_MATCHEND
import re
# from app import logger # only works if imported into opaque web app
class CQiWrapper(CQiClient):
"""
CQIiWrapper object
High level wrapper that groups and renames some functions of CQiClient
for ease of use. Also structures recieved data into python dictionaries.
Keyword arguments:
host -- host IP adress or hostname wher the cqp server is running
port -- port of the cqp server
username -- username used to connect to the cqp server
password -- password of the user to connect to the cqp server
"""
SUBCORPUS_NAMES = []
def __init__(self, host='127.0.0.1', port=4877, username='opaque',
password='opaque'):
super(CQiWrapper, self).__init__(host=host, port=port)
self.username = username
self.password = password
def connect(self):
"""
Connect with CQP server
Connects via socket to the CQP server using the given username and
password from class initiation.
"""
self.ctrl_connect(self.username, self.password)
def __create_attribute_strings(self):
"""
Creates all needed attribute strings to query for word, lemma etc. in
the given corpus.
For example: CORPUS_NAME.word to query words
"""
p_attrs = self.corpus_positional_attributes(self.corpus_name)
struct_attrs = self.corpus_structural_attributes(self.corpus_name)
self.attr_strings = {}
self.attr_strings['positional_attrs'] = {}
self.attr_strings['struct_attrs'] = {}
for p_attr in p_attrs:
self.attr_strings['positional_attrs'][p_attr] = (self.corpus_name
+ '.'
+ p_attr)
for struct_attr in struct_attrs:
self.attr_strings['struct_attrs'][struct_attr] = (self.corpus_name
+ '.'
+ struct_attr)
# logger.warning(('All positional and '
# 'structural attributes: {}').format(self.attr_strings))
def select_corpus(self, corpus_name):
if corpus_name in self.corpus_list_coprora():
self.corpus_name = corpus_name
self.__create_attribute_strings()
# logger.warning('{} does exist.'.format(corpus_name))
else:
# logger.warning('{} does not exist.'.format(corpus_name))
pass
def disconnect(self):
"""
Disconnect from CQP server
Disconnects from the CQP server. Closes used socket after disconnect.
"""
self.ctrl_bye()
self.connection.close()
# logger.warning('Disconnected from cqp server.')
def query_subcorpus(self, query, result_subcorpus_name='Query-results'):
"""
Create subcorpus
Input query will be used to create a subcorpus holding all cpos match
positions for that query.
Keyword arguments:
result_subcorpus_name -- user set name of the subcorpus which holds all
cpos match positions, produced by the query
query -- query written in cqp query language
"""
self.cqp_query(self.corpus_name, result_subcorpus_name, query)
self.result_subcorpus = (self.corpus_name
+ ':'
+ result_subcorpus_name)
self.SUBCORPUS_NAMES.append(self.result_subcorpus)
self.nr_matches = self.cqp_subcorpus_size(self.result_subcorpus)
# logger.warning('Nr of all matches is: {}'.format(self.nr_matches))
def show_subcorpora(self):
"""
Show all subcorpora currently saved by the cqp server.
"""
return self.cqp_list_subcorpora(self.corpus_name)
def show_query_results(self,
context_len=10,
result_len=1000):
"""
Show query results
Shows the actual matched strings produce by the query. Uses the cpos
match indexes to grab those strings. saves them into an orderd
dictionary. Also saves coresponding tags, lemmas and context. Gets those
informations using the corresponding cpos.
Keyword arguments:
context_len -- defines how many words before and after a match will be
shown (default 10)
result_len -- defines how many results are actually grabbed
"""
self.context_len = context_len
self.corpus_max_len = self.cl_attribute_size(
self.attr_strings['positional_attrs']['word']
)
self.nr_matches = min(result_len, self.nr_matches)
if self.nr_matches == 0:
# logger.warning('Query resulted in 0 matches.')
return None
else:
# Get match cpos boundries
# match_boundries shows the start and end cpos of one match as a
# pair of cpositions
# [(1355, 1357), (1477, 1479)] Example for two boundry pairs
match_boundaries = zip(self.cqp_dump_subcorpus(self.result_subcorpus,
CONST_FIELD_MATCH,
0,
self.nr_matches - 1),
self.cqp_dump_subcorpus(self.result_subcorpus,
CONST_FIELD_MATCHEND,
0,
self.nr_matches - 1))
# Generate all cpos between match boundries including start and end boundries.
# Also generate cpos for left and right context.
# Save those cpos into dict as lists for the keys 'lc', 'hit' and 'rc'
# Also collect all cpos together in one list for the final request of
# all cpos informations
all_matches = []
all_cpos = []
for start, end in match_boundaries:
lc_cpos = list(range(max([0, start - self.context_len]), start))
lc = {'lc': lc_cpos}
match_cpos = list(range(start, end + 1))
match = {'hit': match_cpos}
rc_cpos = list(range(end + 1, min([self.corpus_max_len, end + self.context_len + 1])))
rc = {'rc': rc_cpos}
lc.update(match)
lc.update(rc)
all_cpos.extend(lc_cpos + match_cpos + rc_cpos)
all_matches.append(lc)
# print(all_matches)
# print(all_cpos)
# Get all sentences IDs for all above collected cpos in all_cpos
s_ids = self.cl_cpos2struc('UTOPIEN.s', all_cpos) # CHANGE to CORPUS.s will always be like this in nopaque
# Get all cpos for all sneteces boundries
s_lookup = {}
for s_id in set(s_ids):
s_start, s_end = self.cl_struc2cpos('UTOPIEN.s', s_id) # CHANGE to CORPUS.s will always be like this in nopaque
# print(s_start, s_end)
s_cpos = range(s_start, s_end)
s_lookup.update({s_id: list(s_cpos)})
# print(list(s_cpos))
all_cpos.extend(s_cpos)
all_cpos = list(set(all_cpos)) # get rid of cpos duplicates
# Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
# all cpos entries in all_cpos_list
# Also saves these informations into self.results dict
all_cpos_infos, text_lookup = self.get_cpos_infos(all_cpos)
self.results = {'matches': all_matches, 'cpos_lookup': all_cpos_infos,
's_lookup': s_lookup, 'text_lookup': text_lookup}
return self.results
# print(self.results)
def get_cpos_infos(self, all_cpos):
'''
Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
all cpos entries specified in the parameter all_cpos.
'''
cpos_infos = {}
for p_attr_key in self.attr_strings['positional_attrs'].keys():
match_strs = self.cl_cpos2str(self.attr_strings['positional_attrs'][p_attr_key], all_cpos)
cpos_infos[p_attr_key] = match_strs
tmp_s_info = []
tmp_text_info = []
text_lookup = {}
tmp_dict = {}
for struct_attr_key in self.attr_strings['struct_attrs'].keys():
check = self.attr_strings['struct_attrs'][struct_attr_key]
if check == 'UTOPIEN.s':
struct_ids = self.cl_cpos2struc(check, all_cpos)
for id in struct_ids:
tmp_s_info.append({struct_attr_key: id})
elif check == 'UTOPIEN.entry':
struct_ids = self.cl_cpos2struc(check, all_cpos)
for id in struct_ids:
tmp_text_info.append({struct_attr_key: id})
else:
struct_ids = struct_ids = self.cl_cpos2struc(check, all_cpos)
struct_values = self.cl_struc2str(self.attr_strings['struct_attrs'][struct_attr_key], struct_ids)
for value in struct_values:
for id in struct_ids:
tmp_dict.update({id: {struct_attr_key: value}})
print(tmp_dict)
print(text_lookup)
# struct_entry = self.cl_cpos2struc(self.attr_strings['struct_attrs'][struct_attr_key], all_cpos)
# has_value = self.corpus_structural_attribute_has_values(self.attr_strings['struct_attrs'][struct_attr_key])
# if has_value:
# match_strs = self.cl_struc2str(self.attr_strings['struct_attrs'][struct_attr_key], struct_entry)
# elif self.attr_strings['struct_attrs'][struct_attr_key] == 'CORPUS.s':
# pass
# else:
# match_strs = [None for i in struct_entry]
# cpos_infos[struct_attr_key] = zip(struct_entry, match_strs)
tmp_list = []
attr_key_list = []
for key in cpos_infos.keys():
tmp_list.append(cpos_infos[key])
attr_key_list.append(key)
joined_cpos_infos = zip(all_cpos, *tmp_list)
dict_cpos_infos = {}
for info in joined_cpos_infos:
dict_cpos_infos[info[0]] = dict(zip(attr_key_list, info[1:]))
for key, s_id, text_id in zip(dict_cpos_infos.keys(), tmp_s_info, tmp_text_info):
dict_cpos_infos[key].update(s_id)
dict_cpos_infos[key].update(text_id)
return dict_cpos_infos, text_lookup