bundesdata_web_app/app/ngram_viewer/ngram_search.py
2019-03-01 20:55:41 +01:00

285 lines
13 KiB
Python
Executable File

from datetime import datetime
from ngram_viewer.models import *
from speakers.models import Speaker
from watson import search as watson
from collections import defaultdict, OrderedDict
import logging
class NgramSearch(object):
"""
Class that handles the search for ngrams per year. Inputs are the user query
and search options. User query will be splitted and every split will be used
as a single query.
Every singel query returns a QuerySet. Data from those will be
retrived and converted to valid chart.js data sets. Besides the query the
user can pass some search options to the class like case sensitive and case
insensitve. This Class handles search per year which is kind of the default.
"""
def __init__(self, clean_data):
super(NgramSearch, self).__init__()
self.cs_query = clean_data["query"]
self.case_sensitive = clean_data["case_sensitive"]
self.search_plus = clean_data["search_plus"]
self.ignore_missing = clean_data["ignore_missing"]
self.corpus_choice = clean_data["corpus_choice"]
self.sub_querys_dict = defaultdict(list)
self.filtered_sets_dict = defaultdict(list)
self.raw_data = []
def get_time_from_year_str(self, query_data, date_format="%Y"):
"""
This function creates a valid datetime object from an input string.
Works with strings consisting of %Y, %Y-%m or %Y.%m.%d. Not needed for
now.
"""
for ngram_dict in query_data:
for key in ngram_dict:
data_series = ngram_dict[key]
for value_pair in data_series:
valid_time = datetime.strptime(value_pair["x"], date_format)
valid_time_str = valid_time.strftime("%Y-%m-%dT%H:%M:%S")
value_pair["x"] = valid_time_str
return query_data
def get_sub_querys(self):
"""
This function takes the comma separated query string and splits it into
the needed substring and sorts them into a dictionary according to their
length to distinguish between unigrams, bigrams and so on.
"""
# Some checks to see if the input query is valid
if(self.cs_query.startswith(",")):
self.cs_query = self.cs_query[1:]
elif(self.cs_query.endswith(",")):
self.cs_query = self.cs_query[:-1]
logger = logging.getLogger(__name__)
sub_querys = self.cs_query.split(",")
logger.info(sub_querys)
sub_querys_stripped = []
for sub_query in sub_querys:
if(sub_query.startswith(" ")):
sub_querys_stripped.append(sub_query[1:])
elif(sub_query.endswith(" ")):
sub_querys_stripped.append(sub_query[:-1])
else:
sub_querys_stripped.append(sub_query)
sub_querys_dict = defaultdict(list)
for sub_query in sub_querys_stripped:
# Checks for words starting with german Umlaut or special characters like "§$%&"
sort_key = sub_query[0].upper()
if(sort_key in ["Ä", "Ö", "Ü"]):
sort_key = "_Non_ASCII"
elif(sort_key.isascii() is True and sort_key.isalnum() is False):
sort_key = "_Non_ASCII"
elif(not sort_key.isascii()):
sort_key = "_Non_ASCII"
else:
sort_key = sort_key
if(len(sub_query.split()) == 1):
main_class = "One"
elif(len(sub_query.split()) == 2):
main_class = "Two"
elif(len(sub_query.split()) == 3):
main_class = "Three"
elif(len(sub_query.split()) == 4):
main_class = "Four"
elif(len(sub_query.split()) == 5):
main_class = "Five"
else:
sub_querys_dict["invalid"].append(sub_query)
continue
model = "Key{}_{}Gram_{}".format(sort_key,
main_class,
self.corpus_choice)
model = globals()[model]
sub_querys_dict[model].append(sub_query)
self.sub_querys_dict = sub_querys_dict
def enhanced_search(self):
"""
This function takes the sub_querys_dict and searches the database for every
subquery and returns QuerySets for those. In a second step the QuerySets
will be searched again with a regex to assure that QuerySets only contain
objects with an exact word match.
"""
# first broad search to catch every entry containing the query
# Without enhanced search syntax
if(self.search_plus is False):
query_sets_dict = defaultdict(list)
for key, values in self.sub_querys_dict.items():
if(key != "invalid"):
for value in values:
query_set = key.objects.filter(ngram__icontains=value) # Case-insensitve. Checks for entires that somehow contain the input string. Equal to LIKE SQL syntax. Should be faster than exact match and the QuerySet can be used for more specific search operations.
query_sets_dict[key].append((query_set, value))
# Case-insensitive exact match of entries
if(self.case_sensitive is False):
filtered_sets_dict = defaultdict(list)
for key, query_sets in query_sets_dict.items():
for query_set in query_sets:
r_filtered = query_set[0].filter(ngram__iexact=query_set[1]) # Matches entries that contain the exact query
filtered_sets_dict[key].append((r_filtered, query_set[1]))
# Case-sensitive exact match of entries
elif(self.case_sensitive is True):
filtered_sets_dict = defaultdict(list)
for key, query_sets in query_sets_dict.items():
for query_set in query_sets:
r_filtered = query_set[0].filter(ngram__exact=query_set[1]) # Matches entries that contain the exact query
filtered_sets_dict[key].append((r_filtered, query_set[1]))
# With enhanced search syntax
elif(self.search_plus is True):
# Case-insensitive exact match of entries
if(self.case_sensitive is False):
filtered_sets_dict = defaultdict(list)
for key, values in self.sub_querys_dict.items():
if(key != "invalid"):
for value in values:
if(value.endswith("__")):
r_filtered = key.objects.filter(ngram__iexact=value[:-2])
else:
r_filtered = key.objects.filter(ngram__iregex=value) # Matches entries that contain regex query case-insensitive
filtered_sets_dict[key].append((r_filtered, value))
# Case-sensitive exact match of entries
elif(self.case_sensitive is True):
filtered_sets_dict = defaultdict(list)
for key, values in self.sub_querys_dict.items():
if(key != "invalid"):
for value in values:
if(value.endswith("__")):
r_filtered = key.objects.filter(ngram__exact=value[:-2])
else:
r_filtered = key.objects.filter(ngram__regex=value) # Matches entries that contain regex query case-sensitive
filtered_sets_dict[key].append((r_filtered, value))
self.filtered_sets_dict = filtered_sets_dict
def query_sets_to_data(self):
"""
Converts QuerySets to data dictionaries. Fills missing years with zero
value counts for ngrams. Also sums upper and lower case n-grams to one
ngram with one count.
"""
data = []
for key, query_sets in self.filtered_sets_dict.items():
for query_set in query_sets:
data_line = {}
for ngram in query_set[0]:
if ngram.key in data_line:
data_line[ngram.key] += ngram.count
# print(ngram.key, ngram.count, ngram.one_gram)
else:
data_line[ngram.key] = ngram.count
# print(ngram.key, ngram.count, ngram.one_gram)
# print(data_line)
data.append({query_set[1]: data_line})
# checks for missing years and fills the mwith zero
if(self.ignore_missing is False):
years = [year for year in range(1949, 2018)]
for data_line in data:
for key, values in data_line.items():
for year in years:
if(str(year) not in values):
values[str(year)] = 0
data_line[key] = dict(sorted(values.items()))
elif(self.ignore_missing is True):
for data_line in data:
for key, values in data_line.items():
data_line[key] = dict(sorted(values.items()))
self.raw_data = data
def convert_to_data_set(self):
"""
Converts the cleaned data from query_sets_to_data into valid chart.js
data set json like objects.
"""
data_set = []
for data_line in self.raw_data:
data_set_line = defaultdict(list)
for key, values in data_line.items():
for year, count in values.items():
new_data_point = {}
new_data_point["y"] = count
new_data_point["x"] = year
data_set_line[key].append(new_data_point)
data_set.append(data_set_line)
self.data_set = data_set
class NgramSearchSpeaker(NgramSearch):
"""
Class that handles the search for ngrams per speaker. Inputs are the user
query and search options. User query can only contain one n-gram.
The query returns a QuerySet. Data from thise will be
retrived and converted to a valid chart.js data set. Besides the query the
user can pass some search options to the class like case sensitive and case
insensitve. Inherits from NgramSearch.
"""
def __init__(self, clean_data):
super(NgramSearch, self).__init__()
self.cs_query = clean_data["query"].split(",")[0]
self.case_sensitive = clean_data["case_sensitive"]
self.search_plus = clean_data["search_plus"]
self.ignore_missing = clean_data["ignore_missing"]
self.corpus_choice = clean_data["corpus_choice"]
self.sub_querys_dict = defaultdict(list)
self.filtered_sets_dict = defaultdict(list)
self.raw_data = []
def get_speaker_name(self, query_data):
"""
This function takes the speaker ID and gets the corresponding speaker
name.
"""
for ngram_dict in query_data:
for key in ngram_dict:
data_series = ngram_dict[key]
for value_pair in data_series:
speaker_id = value_pair["x"]
if(speaker_id != "None"):
speaker_details = Speaker.objects.get(pk=speaker_id)
value_pair["x"] = (speaker_id
+ ": "
+ speaker_details.first_name
+ " "
+ speaker_details.last_name
+ " ({})".format(speaker_details.party))
elif(speaker_id == "None"):
value_pair["x"] = "Redner nicht identifiziert."
return query_data
def query_sets_to_data(self):
"""
Converts QuerySets to data dictionaries.
Also sums upper and lower case n-grams to one ngram
with one count.
"""
data = []
for key, query_sets in self.filtered_sets_dict.items():
for query_set in query_sets:
data_line = {}
for ngram in query_set[0]:
if ngram.key in data_line:
data_line[ngram.key] += ngram.count
# print(ngram.key, ngram.count, ngram.one_gram)
else:
data_line[ngram.key] = ngram.count
# print(ngram.key, ngram.count, ngram.one_gram)
# print(data_line)
data.append({query_set[1]: data_line})
for d in data:
for key, value in d.items():
value = OrderedDict(sorted(value.items(), key=lambda t: t[1], reverse=True))
value = dict(value)
d[key] = value
self.raw_data = data