nopaque/app/corpora/CQiWrapper/CQiWrapper.py

245 lines
11 KiB
Python
Raw Normal View History

2019-11-18 14:24:13 +01:00
from CQiClient import CQiClient
from CQi import CONST_FIELD_MATCH, CONST_FIELD_MATCHEND
2019-11-07 15:48:47 +01:00
import collections
2019-11-18 14:24:13 +01:00
from app import logger # only works if imported into opaque web app
2019-11-07 15:48:47 +01:00
class CQiWrapper(CQiClient):
"""
CQIiWrapper object
High level wrapper that groups and renames some functions of CQiClient
for ease of use. Also structures recieved data into python dictionaries.
Keyword arguments:
2019-11-18 14:24:13 +01:00
host -- host IP adress or hostname wher the cqp server is running
port -- port of the cqp server
2019-11-07 15:48:47 +01:00
username -- username used to connect to the cqp server
password -- password of the user to connect to the cqp server
"""
SUBCORPUS_NAMES = []
def __init__(self, host='127.0.0.1', port=4877, username='opaque',
password='opaque'):
super(CQiWrapper, self).__init__(host=host, port=port)
self.username = username
self.password = password
def connect(self):
"""
Connect with CQP server
Connects via socket to the CQP server using the given username and
password from class initiation.
"""
self.ctrl_connect(self.username, self.password)
2019-11-18 14:24:13 +01:00
def __create_attribute_strings(self):
"""
Creates all needed attribute strings to query for word, lemma etc. in
the given corpus.
For example: CORPUS_NAME.word to query words
"""
2019-11-11 15:35:37 +01:00
p_attrs = self.corpus_positional_attributes(self.corpus_name)
struct_attrs = self.corpus_structural_attributes(self.corpus_name)
self.meta_struct_element = struct_attrs[0]
self.attr_strings = {}
self.attr_strings['positional_attrs'] = {}
self.attr_strings['struct_attrs'] = {}
for p_attr in p_attrs:
self.attr_strings['positional_attrs'][p_attr] = (self.corpus_name
+ '.'
+ p_attr)
for struct_attr in struct_attrs[:-1]:
self.attr_strings['struct_attrs'][struct_attr] = (self.corpus_name
+ '.'
+ struct_attr)
2019-11-18 14:24:13 +01:00
logger.warning(('All positional and '
'structural attributes: {}').format(self.attr_strings))
def select_corpus(self, corpus_name):
if corpus_name in self.corpus_list_coprora():
self.corpus_name = corpus_name
self.__create_attribute_strings()
logger.warning('{} does exist.'.format(corpus_name))
else:
self.disconnect()
logger.warning('{} does not exist.'.format(corpus_name))
2019-11-07 15:48:47 +01:00
def disconnect(self):
"""
Disconnect from CQP server
Disconnects from the CQP server. Closes used socket after disconnect.
"""
self.ctrl_bye()
self.connection.close()
2019-11-18 14:24:13 +01:00
logger.warning('Disconnected from cqp server.')
2019-11-07 15:48:47 +01:00
2019-11-18 14:24:13 +01:00
def query_subcorpus(self, query, result_subcorpus_name='Query-results'):
2019-11-07 15:48:47 +01:00
"""
Create subcorpus
Input query will be used to create a subcorpus holding all cpos match
positions for that query.
Keyword arguments:
result_subcorpus_name -- user set name of the subcorpus which holds all
cpos match positions, produced by the query
query -- query written in cqp query language
"""
2019-11-11 15:35:37 +01:00
self.cqp_query(self.corpus_name, result_subcorpus_name, query)
2019-11-18 14:24:13 +01:00
self.result_subcorpus = (self.corpus_name
+ ':'
+ result_subcorpus_name)
self.SUBCORPUS_NAMES.append(self.result_subcorpus)
self.nr_matches = self.cqp_subcorpus_size(self.result_subcorpus)
logger.warning('Nr of all matches is: {}'.format(self.nr_matches))
2019-11-07 15:48:47 +01:00
def show_subcorpora(self):
2019-11-18 14:24:13 +01:00
"""
Show all subcorpora currently saved by the cqp server.
"""
2019-11-11 15:35:37 +01:00
return self.cqp_list_subcorpora(self.corpus_name)
2019-11-07 15:48:47 +01:00
2019-11-18 14:24:13 +01:00
def show_query_results(self,
context_len=10,
result_len=1000):
2019-11-07 15:48:47 +01:00
"""
Show query results
Shows the actual matched strings produce by the query. Uses the cpos
match indexes to grab those strings. saves them into an orderd
2019-11-18 14:24:13 +01:00
dictionary. Also saves coresponding tags, lemmas and context. Gets those
informations using the corresponding cpos.
2019-11-07 15:48:47 +01:00
Keyword arguments:
context_len -- defines how many words before and after a match will be
shown (default 10)
2019-11-18 14:24:13 +01:00
result_len -- defines how many results are actually grabbed
2019-11-07 15:48:47 +01:00
"""
self.context_len = context_len
2019-11-18 14:24:13 +01:00
self.corpus_max_len = self.cl_attribute_size(
self.attr_strings['positional_attrs']['word']
)
self.nr_matches = min(result_len, self.nr_matches)
2019-11-07 15:48:47 +01:00
if self.nr_matches == 0:
2019-11-18 14:24:13 +01:00
logger.warning('Query resulted in 0 matches.')
self.disconnect
return None
2019-11-07 15:48:47 +01:00
else:
2019-11-18 14:24:13 +01:00
# Get match cpos boundries
# match_boundries shows the start and end cpos of one match as a
# pair of cpositions
# [(1355, 1357), (1477, 1479)] Example for two boundry pairs
match_boundaries = zip(self.cqp_dump_subcorpus(self.result_subcorpus,
CONST_FIELD_MATCH,
0,
self.nr_matches - 1),
self.cqp_dump_subcorpus(self.result_subcorpus,
CONST_FIELD_MATCHEND,
0,
self.nr_matches - 1))
# Generate all cpos between boundries including start and end boundries
# Save them as list into on match entry at serial number 'i'
ordered_matches = collections.OrderedDict()
for i, match_pair in enumerate(match_boundaries):
ordered_matches[i] = ({'match_cpos_list':
list(range(match_pair[0],
match_pair[1] + 1))})
# Saves cpos form all match entries into one list
all_cpos_list = []
for key in ordered_matches.keys():
all_cpos_list += ordered_matches[key]['match_cpos_list']
# Saves all cpos from before and after context into the list:
# all_context_cpos_list
all_context_cpos_list = []
for key in ordered_matches.keys():
cpos_list = ordered_matches[key]['match_cpos_list']
before_index = max([0, cpos_list[0] - self.context_len])
after_index = min([self.corpus_max_len,
cpos_list[-1] + self.context_len])
ordered_matches[key]['context_before_cpos_list'] = list(range(before_index,
cpos_list[0]))
ordered_matches[key]['context_after_cpos_list'] = list(range(cpos_list[-1] + 1,
after_index + 1))
all_context_cpos_list += ordered_matches[key]['context_before_cpos_list']
all_context_cpos_list += ordered_matches[key]['context_after_cpos_list']
# Combines all_cpos_list with all_context_cpos_list as a sorted set
all_cpos_list += all_context_cpos_list
all_cpos_list = sorted(list(set(all_cpos_list)))
# Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
# all cpos entries in all_cpos_list
# Also saves these informations into the ordered_matches dict
all_cpos_infos = self.get_cpos_infos(all_cpos_list)
for key in ordered_matches.keys():
# loops over cpos in cpos_list which holds all match cpos
# Replaces one cpos with the corresponding cpos information created
# by self.get_cpos_infos(all_cpos_list)
cpos_list = ordered_matches[key]['match_cpos_list']
infos = []
for cpos in cpos_list:
info = {cpos: all_cpos_infos.get(cpos)}
infos.append(info)
ordered_matches[key]['match_cpos_list'] = infos
try:
# loops over cpos in ordered_matches[key]['context_before_cpos_list']
# which holds all cpos of the before context
# Replaces one cpos with the corresponding cpos information created
# by self.get_cpos_infos(all_cpos_list)
before_context_infos = []
for context_before_cpos in ordered_matches[key]['context_before_cpos_list']:
before_context_info = {context_before_cpos:
all_cpos_infos.get(context_before_cpos)}
before_context_infos.append(before_context_info)
ordered_matches[key]['context_before_cpos_list'] = before_context_infos
except UnboundLocalError:
logger.warning('Context before cpos list is empty.')
try:
# loops over cpos in ordered_matches[key]['context_after_cpos_list']
# which holds all cpos of the before context
# Replaces one cpos with the corresponding cpos information created
# by self.get_cpos_infos(all_cpos_list)
after_context_infos = []
for context_after_cpos in ordered_matches[key]['context_after_cpos_list']:
after_context_info = {context_after_cpos:
all_cpos_infos.get(context_after_cpos)}
after_context_infos.append(after_context_info)
ordered_matches[key]['context_after_cpos_list'] = after_context_infos
except UnboundLocalError:
logger.warning('Context after cpos list is empty.')
return ordered_matches
def get_cpos_infos(self, all_cpos):
'''
Get cpos informations like CORPUS_NAME.word or CORPUS_NAME.lemma for
all cpos entries specified in the parameter all_cpos.
'''
cpos_infos = {}
2019-11-11 15:35:37 +01:00
for attr_dict in self.attr_strings:
if attr_dict == 'positional_attrs':
for p_attr_key in self.attr_strings[attr_dict].keys():
2019-11-18 14:24:13 +01:00
match_str = self.cl_cpos2str(self.attr_strings[attr_dict][p_attr_key],
all_cpos)
cpos_infos[p_attr_key] = match_str
2019-11-11 15:35:37 +01:00
elif attr_dict == 'struct_attrs':
for struct_attr_key in self.attr_strings[attr_dict].keys():
2019-11-18 14:24:13 +01:00
struct_entry = self.cl_cpos2struc(self.attr_strings['struct_attrs'][self.meta_struct_element],
all_cpos)
match_str = self.cl_struc2str(self.attr_strings[attr_dict][struct_attr_key], struct_entry)
cpos_infos[struct_attr_key] = match_str
tmp_list = []
attr_key_list = []
for key in cpos_infos.keys():
tmp_list.append(cpos_infos[key])
attr_key_list.append(key)
joined_cpos_infos = zip(all_cpos, *tmp_list)
dict_cpos_infos = {}
for info in joined_cpos_infos:
dict_cpos_infos[info[0]] = dict(zip(attr_key_list, info[1:]))
return dict_cpos_infos