bundesdata_markup_nlp_software/bundesdata_markup_nlp/markup/SpeakerNameMarkup.py

555 lines
28 KiB
Python
Raw Normal View History

2019-02-21 18:29:44 +00:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from markup.SpeakerMarkup import SpeakerMarkup
from xml.etree import ElementTree
from lxml import etree
from tqdm import tqdm
from itertools import combinations
import copy
import logging
import re
import os
class SpeakerNameMarkup(SpeakerMarkup):
"""
This class is for the complex markup of the speakers in one given protocol.
Creates the name tag with all needed inforamtion from the Stammdatenbank.
Has to cross reference the speaker with said Stammdatenbank.
"""
known_redner_dicts = dict()
last_wahlperiode = int()
def __init__(self, file_path, element_name=".//redner"):
super(SpeakerNameMarkup).__init__()
self.file_path = file_path
self.filename = os.path.basename(self.file_path)[:-4]
self.element_name = element_name
self.redner_dict = dict()
self.all_speakers = []
self.logger = logging.getLogger(__name__)
def cross_reference_markup(self, strings, feature_set_dict,
MdB_etree):
"""
Checks if features like name, surename academic title and city are
present in the input string. Consists of main function and helper
functions. First the string will be split in tokens. Every token will
be checked a gainst sets of valid names, surnames, academic titles and
fractions. If there is a match a dictionary entriy will be set
accordingly.
Also uses the add_missing_MdB_feature helper function in a second step
to add features which are not present in the string or have been
identified wrongly.
The function crates a dictionary containing all features of one speaker
to crate a valid XML element from it later on.
"""
def initiate_dict(keys, extra_keys):
"""
Creates a dictionarie with a set of keys and sets them to None.
Some specific key values will be set to specific values.
"""
for key in keys:
redner_dict[key] = None
for key in extra_keys:
redner_dict[key] = None
redner_dict["feature_complete"] = False
redner_dict["original_string"] = string
redner_dict["identified"] = False
redner_dict["damalige_fraktion"] = None
def get_names(keys, dict, token):
"""
Checks if token is in set vorname or nachname. If it is dictionary
values will be set accordingly. Avoids that surname will be
overwirtten by a name wich is also a valid surname.
"""
for key in keys[0:2]: # Only for vorname, nachname in written order
if(token in feature_set_dict[key][0] and redner_dict[key]
is None):
redner_dict[key] = token
elif(token in feature_set_dict["nachname"][0]
and redner_dict["nachname"] is not None):
redner_dict["nachname"] = token
else:
continue
def get_feature(key, string, set):
"""
Checks if a token is a valid feature (like name affix or academic
title, ortszusatz or namenszusatz) and adds it to the dictionary.
Does not check for names.
"""
for feature in set:
if(key == "titel"):
regex = r"(\b{}\B)".format(re.escape(feature)) # could be Dr. and . is not a word boundary.
elif(key is "namenszusatz"):
regex = r"\b({})\b".format(re.escape(feature)) # No . in word so word boundary at start and end of regex.
elif(key is "fraktion"):
regex = r"\B(\({}\))\B".format(re.escape(feature)) # always surrounded by parentheses, but also has to match them to avoid matching i. e. "CDU" in "CDU/CSU"
elif(key is "ortszusatz"):
regex = r"\B{}\B".format(re.escape(feature)) # always surrounded by parentheses
else:
regex = r"(\b{}\b)".format(re.escape(feature))
match = re.search(regex, string)
if(match):
if(key == "fraktion"):
redner_dict[key] = match.group()[1:-1] # removes ()
break
else:
redner_dict[key] = match.group()
break
else:
redner_dict[key] = None
def get_role(string):
"""Checks redner string for role. Identifies 'Bundesministerin für
Familie, Senioren, Frauen und Jugend' etc."""
if("Staatssekretär" in string or "Staatssekretärin" in string):
regex = r"(Staatssekretär(in)?)"
splits = re.split(regex, string, maxsplit=1)
role_long = splits[1] + splits[-1]
redner_dict["rolle_lang"] = role_long
role_short = [word[0] for word in role_long.split()
if word[0].isupper()]
role_short = splits[1] + " " + "".join(role_short)
redner_dict["rolle_kurz"] = role_short
elif("Bundesminister" in string or "Bundesministerin" in string):
regex = r"(Bundesminister(in)?)"
splits = re.split(regex, string, maxsplit=1)
role_long = splits[1] + splits[-1]
redner_dict["rolle_lang"] = role_long
role_short = [word[0] for word in role_long.split()
if word[0].isupper()]
role_short = splits[1] + " " + "".join(role_short)
redner_dict["rolle_kurz"] = role_short
def check_name(redner_dict):
"""
Checks if vorname and nachname are the same. Sets vorname to None if
True. Vorname will be set later on with add_missing_MdB_feature.
"""
if(redner_dict["nachname"] == redner_dict["vorname"]):
redner_dict["vorname"] = None
def get_party(redner_dict):
"""
Creates a party key in the dictionary containing the party of the
speaker. Party is not the same as fraction. This is mainly done
because CDU/CSU is the fraction in the bundestag but speakers can
belong to either the CDU or CSU. If the fraction is not CDU/CSU
party will be set to fraction. Also handels problems with GRÜNE.
"""
if(redner_dict["fraktion"] != "CDU/CSU"
and redner_dict["fraktion"] != "CDU"
and redner_dict["fraktion"] != "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
elif(redner_dict["fraktion"] == "CDU"
or redner_dict["fraktion"] == "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
redner_dict["fraktion"] = "CDU/CSU"
if(redner_dict["fraktion"] == "GRÜNE"):
redner_dict["fraktion"] = "BÜNDNIS 90/DIE GRÜNEN"
def check_party_and_fraction():
"""
Checks if party and fraction have been set correctly. Will be used
after add_missing_MdB_feature. To correct some errors with CDU/CSU.
"""
if(redner_dict["fraktion"] is not None
and redner_dict["partei"] == "CDU"
or redner_dict["partei"] == "CSU"):
redner_dict["fraktion"] = "CDU/CSU"
if(redner_dict["partei"] is None
and redner_dict["fraktion"] is not None
and redner_dict["fraktion"] != "CDU"
and redner_dict["fraktion"] != "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
def get_match_in_str(key, string, regex):
"""
Matches a regex in the current string and adds it as a value to the
given key into the dictionary.
"""
match = re.search(regex, string)
if(match):
redner_dict[key] = match.group()
else:
redner_dict[key] = None
def add_missing_MdB_feature(string, redner_dict, feature_set_dict,
MdB_etree, conditions_key_list,
feature_lookup, feature_to_add,
logging_state=False, multi_ids=False):
"""
This function trys to get missing features for on speaker. Input is
a list of features(conditions_key_list) which are used as parameters
in an xpath expression. The Xpath is built dynamically from the
list.
If the Xpath matches one unique entry the feature(feature_to_add)
will be set to the match of feature_lookup in the matched element.
"""
###
# Xpath creation from conditions_key_list
###
xpath_parts = []
conds = conditions_key_list
len_conds = len(conds)
if(len_conds == 1):
for condition in conds:
xpath_part = ".//MDB[.//{}/text()='{}']" \
.format(feature_set_dict[condition][1],
redner_dict[condition])
xpath_parts.append(xpath_part)
xpath = "".join(xpath_parts)
if("None" in xpath):
xpath = None
elif(len_conds == 2):
xpath_first_part = ".//MDB[.//{}/text()='{}'" \
.format(feature_set_dict[conds[0]][1],
redner_dict[conds[0]])
xpath_parts.insert(0, xpath_first_part)
xpath_last_part = ".//{}/text()='{}']" \
.format(feature_set_dict[conds[-1]][1],
redner_dict[conds[-1]])
xpath_parts.append(xpath_last_part)
xpath = " and ".join(xpath_parts)
if("None" in xpath):
xpath = None
elif(len_conds > 2):
xpath_first_part = ".//MDB[.//{}/text()='{}'" \
.format(feature_set_dict[conds[0]][1],
redner_dict[conds[0]])
xpath_parts.insert(0, xpath_first_part)
for condition in conds[1:-1]:
xpath_inner_part = ".//{}/text()='{}'" \
.format(feature_set_dict[condition][1],
redner_dict[condition])
xpath_parts.append(xpath_inner_part)
xpath_last_part = ".//{}/text()='{}']" \
.format(feature_set_dict[conds[-1]][1],
redner_dict[conds[-1]])
xpath_parts.append(xpath_last_part)
xpath = " and ".join(xpath_parts)
if("None" in xpath): # sets xpaths to None if it uses a feature which is None
xpath = None
xpath_parts = [] # empties xpath_parts list
try: # tries every xpath
matches = MdB_etree.xpath(xpath)
except TypeError: # handles xpaths that are None
matches = []
# If xpath has unique match new feature value will be set to given feature
if(len(matches) == 1):
matches = matches[0]
feature_lookup = ".//" + feature_lookup
new_feature = matches.xpath(feature_lookup)[0].text
self.logger.info((" There is one unique match "
+ " for this speaker: "
+ str(redner_dict)
+ " Extracted feature "
+ feature_lookup + ": "
+ str(new_feature)
+ " with: "
+ str(conds)))
redner_dict[feature_to_add] = new_feature
self.logger.info(("New speaker features are: "
+ str(redner_dict)))
# Handels mathches tha are not unique for logging and mutli id
elif(len(matches) > 1):
self.logger.warning((" There are "
+ str(len(matches))
+ " matches for this speaker: "
+ str(redner_dict)
+ " .Could not extract: "
+ feature_lookup
+ " Features used are: "
+ str(conds)))
elif(len(matches) > 1 and multi_ids is True):
ids = matches
for id, i in ids, enumerate(ids):
key = "id" + i
redner_dict[key] = id
return matches
def get_periode(MdB_etree):
periode = self.xml_tree.xpath(".//wahlperiode")
if(periode):
redner_dict["wahlperiode"] = periode[0].text
return periode[0].text
###
# Start of main function cross_reference_markup
###
# Initiates empty dict and gets keys for it
redner_dict = dict()
features = list(feature_set_dict.keys())
# Counters to calculate how successful the identification of speakers is
identified_speakers = 0
unidentified_speakers = 0
multiple_identified_speakers = 0
# Cross references every <redner> string
for string in tqdm(strings, desc="Cross reference name markup for speakers in strings"):
self.logger.info("\nStarting name markup process for new speaker:")
# Sets values in redner_dict to None or specific value
initiate_dict(features, [feature for feature in features])
tokens = string.replace(":", "").replace(",", "").split() # replaces ":" and "," with nothing because some names would be "name:" and some names would contain a ","
for token in tokens:
get_names(features, feature_set_dict, token)
self.logger.info("nachname is: " + str(redner_dict["nachname"]))
feature_keys = [key for key in features if key not in ["vorname",
"nachname"]]
for f_key in feature_keys:
get_feature(f_key, string, feature_set_dict[f_key][0])
get_party(redner_dict)
check_name(redner_dict)
regex_p = r"^\w*(?:P|p)räsident\w*"
get_match_in_str("präsident", string, regex_p)
get_role(string)
###
# Checks if script is still running for the same current periode.
# If this is not the case the known_redner_dicts will be emptied.
###
current_wahlperiode = get_periode(MdB_etree)
if(current_wahlperiode != SpeakerNameMarkup.last_wahlperiode):
SpeakerNameMarkup.known_redner_dicts = dict()
SpeakerNameMarkup.last_wahlperiode = current_wahlperiode
###
# Creates possible combinations of features which will be used in
# add_missing_MdB_feature to identify missing features like vorname or
# nachname.
###
combination_features = [feature for feature in features if feature
not in ["namenszusatz",
"feature_complete",
"id",
"titel",
"rolle_kurz",
"rolle_lang",
"original_string",
"identified",
"damalige_fraktion"]]
subsets = []
for length in range(0, 5):
for subset in combinations(combination_features, length):
subsets.append(list(subset))
subsets = subsets[1:]
combination_features.remove("wahlperiode")
combination_features.remove("nachname")
###
# First while loop trying to identify every feature for one speaker.
# Uses combinations from above. Before calling the function
# add_missing_MdB_feature there is a check if the speaker has alreeady
# been identified before. If this is the case features will be set to
# the already identfied features. This saves a lot of time.
###
counter_feats = 0
while(redner_dict["feature_complete"] is False):
redner_dict["damalige_fraktion"] = redner_dict["fraktion"]
# print("Doing name markup for:", redner_dict)
# Checks if speaker has been already identified before.
if(string in SpeakerNameMarkup.known_redner_dicts):
# print("Speaker has already been identified once.")
redner_dict = SpeakerNameMarkup.known_redner_dicts[string].copy()
# print("Speaker features are set to:",
# SpeakerNameMarkup.known_redner_dicts[string])
redner_dict["identified"] = True
self.logger.info(("Speaker has alreeady been identified "
+ "once."))
self.logger.info(("Speaker features are set to: "
+ str(SpeakerNameMarkup.known_redner_dicts[string])))
if(SpeakerNameMarkup.known_redner_dicts[string]["feature_complete"] is not False):
identified_speakers += 1
break
else:
for feature in combination_features:
for subset in subsets:
add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
subset,
feature_set_dict[feature][1],
feature)
check_party_and_fraction()
if(redner_dict["vorname"] is not None
and redner_dict["nachname"] is not None
and redner_dict["fraktion"] is not None
and redner_dict["partei"] is not None):
redner_dict["feature_complete"] = True
counter_feats += 1
if(counter_feats == len(combination_features)):
redner_dict["feature_complete"] = False
break
###
# Second while loop uses four features to identfie the unique ID for one
# speaker with add_missing_MdB_feature. Also tries to identfie speakers
# with lesser known features. In this case there can be multiple possile
# ids for one speaker these will be saved in a special dictionary entry.
# Rare case.
###
counter_ids = 0
while(redner_dict["id"] is None):
if(redner_dict["feature_complete"] is True):
add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
["vorname", "nachname", "partei",
"wahlperiode"],
feature_set_dict["id"][1],
"id")
key_original_string = redner_dict["original_string"]
SpeakerNameMarkup.known_redner_dicts.update(
{key_original_string: redner_dict.copy()})
redner_dict["identified"] = True
if(counter_ids == 1):
redner_dict["id"] = None
redner_dict["feature_complete"] = False
redner_dict["identified"] = False
self.logger.warning(("Unique ID could not be assigned. "
+ "Feature complete: True "
+ "Features are: "
+ str(redner_dict)))
SpeakerNameMarkup.known_redner_dicts.update(
{key_original_string: redner_dict.copy()})
unidentified_speakers += 1
identified_speakers -= 1 # because identified_speakers was set before
break
identified_speakers += 1
elif(redner_dict["feature_complete"] is not True):
redner_dict["id"] = None
ids = add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
["nachname", "partei",
"wahlperiode"],
feature_set_dict["id"][1],
"id", False, True)
if(ids is not None and len(ids) > 1):
redner_dict["identified"] = "Multiple"
multiple_identified_speakers += 1
identified_speakers -= 1
break
elif(ids is None):
self.logger.warning(("Unique ID could not be assigned. "
+ "Feature complete: False "
+ "Features are: "
+ str(redner_dict)))
redner_dict["identified"] = False
unidentified_speakers += 1
break
counter_ids += 1
self.logger.info(("Number of identified speakers with valid id and"
+ " name markup is: "
+ str(identified_speakers)))
self.logger.info(("Number of unidentified speakers without valid"
+ " id and name markup is: "
+ str(unidentified_speakers)))
self.logger.info(("Number of speakers with possible multiple ids: "
+ str(multiple_identified_speakers)))
self.logger.info(("Number of all speaker entitiys in current"
+ " protocoll is: "
+ str(len(strings))))
redner_dict_final = copy.deepcopy(redner_dict)
self.redner_dict = redner_dict_final
self.all_speakers.append(self.redner_dict)
for key in features:
redner_dict[key] = None
# print("Speaker features after whole cross reference markup:",
# redner_dict_final)
self.logger.info(("Saved speakers (identfied and not identified): "
+ str(len(self.all_speakers))))
def create_speaker_elements(self):
"""
Creates a valid redner XML element for one redner_dict entry from the
list self.all_speakers. Has to be done step by step becuase dictionary
is not sorted and name sub elements have to be in specific order.
"""
self.all_speaker_elements = []
for redner_entry in tqdm(self.all_speakers, desc="Creating speaker element"):
redner_element = etree.Element("redner")
redner_element.set("id", str(redner_entry["id"]))
name_element = etree.Element("name")
titel_element = etree.Element("titel")
titel_element.text = redner_entry["titel"]
vorname_element = etree.Element("vorname")
vorname_element.text = redner_entry["vorname"]
namenszusatz_element = etree.Element("namenszusatz")
namenszusatz_element.text = redner_entry["namenszusatz"]
nachname_element = etree.Element("nachname")
nachname_element.text = redner_entry["nachname"]
damalige_fraktion_element = etree.Element("damalige_fraktion")
damalige_fraktion_element.text = redner_entry["damalige_fraktion"]
fraktion_element = etree.Element("fraktion")
fraktion_element.text = redner_entry["fraktion"]
partei_element = etree.Element("partei")
partei_element.text = redner_entry["partei"]
ortszusatz_element = etree.Element("ortszusatz")
ortszusatz_element.text = redner_entry["ortszusatz"]
rolle_lang_element = etree.Element("rolle_lang")
rolle_lang_element.text = redner_entry["rolle_lang"]
rolle_kurz_element = etree.Element("rolle_kurz")
rolle_kurz_element.text = redner_entry["rolle_kurz"]
original_string_element = etree.Element("original_string")
original_string_element.text = redner_entry["original_string"]
if(redner_entry["titel"] is not None):
name_element.append(titel_element)
name_element.append(vorname_element)
if(redner_entry["namenszusatz"] is not None):
name_element.append(namenszusatz_element)
name_element.append(nachname_element)
name_element.append(damalige_fraktion_element)
name_element.append(fraktion_element)
name_element.append(partei_element)
if(redner_entry["ortszusatz"] is not None):
name_element.append(ortszusatz_element)
if(redner_entry["rolle_lang"] is not None):
name_element.append(rolle_lang_element)
name_element.append(rolle_kurz_element)
name_element.append(original_string_element)
name_element.tail = original_string_element.text
redner_element.append(name_element)
self.all_speaker_elements.append(redner_element)
self.logger.info(("Speaker element is: "
+ ElementTree.tostring(redner_element).decode("utf-8")))
def set_speech_ids(self):
"""
This functions sets a unique rede id for every rede element in one
protocoll. Id is a ten digit integer preceded by the string ID.
Example: ID1809900000
First two digits are the wahlperiode the followinf three digits are the
sitzungsnr (session number). The remaining digits are for counting the
speeches. First speech is 00100, second is 00200, eleventh is 01100 and so on.
Example: ID1809901100 --> eleventh speech
Last tow digits are for corrections.
"""
id_counter = 000
speeches = self.xml_tree.xpath(".//sitzungsbeginn | .//rede")
for speech in tqdm(speeches, desc="Creating speech ids"):
id_counter_str = str(id_counter).zfill(5)
id = "ID" + self.filename + id_counter_str
speech.set("id", id)
id_counter += 100
self.logger.info(("Speech id is: " + id))
self.xml_tree = self.xml_tree