Initial commit

This commit is contained in:
Stephan Porada
2019-02-21 19:29:44 +01:00
commit 4263e5f41e
52 changed files with 3024 additions and 0 deletions

View File

@ -0,0 +1,225 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from markup.MetadataMarkup import MetadataMarkup
from lxml import etree
from xml.etree import ElementTree
from xml.sax.saxutils import escape
import logging
import os
import re
class EntityMarkup(MetadataMarkup):
"""Class for getting an XML node in which entities will be marked.
In practice this class and its mehtods can be used to get the text of a
given Node and marks every speaker in this text string.
Also passes methods and fields to the more specific
SimpleSpeakersMarkup."""
def __init__(self, file_path, element_name=".//sitzungsverlauf"):
super().__init__()
self.file_path = file_path
self.element_name = element_name
self.xml_tree = None
self.current_string = str()
self.filename = os.path.basename(file_path)
self.logger = logging.getLogger(__name__)
def get_element_text(self):
"""
Gets the strings of all elements matched by an element x-path. Element
name will be passed when the class is istanced. Distunguishes between
one string or several strings.
"""
self.all_elements = self.xml_tree.iterfind(self.element_name)
len_all_elements = len(list(self.all_elements))
self.current_strings = []
if(len_all_elements == 1):
self.all_elements = self.xml_tree.iterfind(self.element_name)
self.current_string = escape(list(self.all_elements)[0].text)
self.current_strings.append(self.current_string)
elif(len_all_elements > 1):
self.current_strings = []
self.all_elements = self.xml_tree.iterfind(self.element_name)
for element in self.all_elements:
string = escape(element.text)
self.current_strings.append(string)
self.all_elements = self.xml_tree.iterfind(self.element_name)
def replace_string(self, replacement_string, element_name):
"""
This function takes the newly manipulated xml string and overwrites
the old string with it.
"""
replacement_string = (
"<" + element_name + ">"
+ replacement_string
+ "</" + element_name + ">"
)
for element in self.xml_tree.xpath("//%s" % element_name):
element.getparent().remove(element)
replacement_element = etree.fromstring(replacement_string)
self.xml_tree.insert(1, replacement_element)
def simple_check_xml(self, xml_string, file_name, save_valid, node=True):
"""
Checks if a given xml element is well-formed xml. If it is checking a
partial string it adds a root element. If node is False it is checking a
document as a string.
"""
try:
if(node is True):
folder_path = "logs/well-formed_strings/"
file_path = os.path.join(folder_path, os.path.basename(file_name))
xml_string = "<root>" + xml_string + "</root>"
tree = etree.fromstring(xml_string)
self.logger.info(("The node string is well-formed. Simple markup is"
" correct. Node string can be found in "
+ folder_path))
self.logger.info(tree)
if(save_valid is True):
self.logger.info("Node string can be found in" + folder_path)
if not os.path.exists(folder_path):
os.mkdir(folder_path)
with open(file_path, "w") as text_file:
text_file.write(xml_string)
else:
folder_path = "logs/well-formed_files/"
file_path = os.path.join(folder_path, os.path.basename(file_name))
xml_string = xml_string
tree = etree.fromstring(xml_string)
self.logger.info("The XML file is well-formed.")
self.logger.info(tree)
if(save_valid is True):
self.logger.info("File can be found in" + folder_path)
if not os.path.exists(folder_path):
os.mkdir(folder_path)
with open(file_path, "w") as text_file:
text_file.write(xml_string.decode("utf-8"))
except Exception as e:
if(node is True):
folder_path = "logs/not_well-formed_strings/"
file_path = os.path.join(folder_path, os.path.basename(file_name))
if not os.path.exists(folder_path):
os.mkdir(folder_path)
with open(file_path, "w") as text_file:
text_file.write(xml_string)
self.logger.error(("XML node string is not well-formed. XML can be"
" found in " + folder_path))
self.logger.error(e)
else:
folder_path = "logs/not_well-formed_files/"
file_path = os.path.join(folder_path, os.path.basename(file_name))
if not os.path.exists(folder_path):
os.mkdir(folder_path)
with open(file_path, "w") as text_file:
text_file.write(xml_string.decode("utf-8"))
self.logger.error(("XML file is not well-formed. XML can be"
" found in " + folder_path))
self.logger.error(e)
return False
def inject_element(self, current_element, regex, tagname,
strip_newlines=False):
"""
Injects new xml elements into the selected element text. The new element
will be created by using a regular expression which matches a partial
string in the current_element text string. The match will be the
new_element text string. The tagname sets the tagname of the
new_element. Optionally Attributes can be set aswell.
"""
element_string = ElementTree.tostring(current_element, encoding="unicode", method="xml")
match = re.search(regex, element_string)
if(match):
index_shift = 0
if(strip_newlines is True):
counter = match.group().count("\n")
match_str = re.sub(r"\n", "", match.group())
else:
counter = 0
match_str = match.group()
index_start = match.start() + index_shift - counter
index_end = match.end() + index_shift - counter
new_element = etree.Element(tagname)
new_element.text = match_str
new_element_str = ElementTree.tostring(new_element, encoding="unicode", method="xml")
element_string = (element_string[:index_start]
+ new_element_str
+ element_string[index_end:])
index_shift += len(new_element_str) - len(match_str)
replacement_element = etree.fromstring(element_string.encode("utf8"))
current_element.getparent().replace(current_element, replacement_element)
def markup_speech_lines(self, current_element):
"""
Inserts markup in every speech that marks every line <p> with
attribute klasse="J". J is set for every line even if it is O. In the
early protocols (period 1. to 10.) One line is most of the time a
sentence. In the later periods one line is capped at around 80
characters.
"""
lines = current_element.xpath("text()")
if(len(lines) > 0):
lines = lines[0].splitlines()
current_element.xpath(".//redner")[0].tail = ""
for line in lines:
part_element = etree.Element("p")
part_element.set("klasse", "J")
part_element.text = line
current_element.append(part_element)
def get_multiline_entities(self, elements, start_of_str, end_of_str,
tagname):
"""
This function identifies multiline entities (i.e. Kommentare/Comments)
wich are split over multiple elements which have been marked with the
markup_speech_lines() function.
Gets the text of those and joins them together into one
string. The first elements text will be set to the newly created string
surrounded by new xml tags with tagname set to input tagname.
All other elements with the rest of the string will be deleted.
start_of_str should be a regex that describes the pattern how the start
of the supposed multiline entity looks like. end_of_str describes the
pattern how the end of the supposed multiline entity looks like.
"""
self.multiline_text = []
self.multiline_elements = []
start_found = False
end_found = False
for element in elements:
if(start_found is False and end_found is False
and element.text is not None):
start_match = re.search(start_of_str, element.text)
if(start_match is not None):
self.multiline_text.append(start_match.group())
self.multiline_elements.append(element)
start_found = True
continue
elif(start_found is True and end_found is False
and element.text is not None):
end_match = re.search(end_of_str, element.text)
if(end_match):
self.multiline_text.append(end_match.group())
self.multiline_elements.append(element)
end_found = True
continue
else:
self.multiline_text.append(element.text)
self.multiline_elements.append(element)
continue
elif(start_found is True and end_found is True):
new_element_text = re.sub(r"- ", "", " ".join(self.multiline_text)) # joins the sting parts and also removes hyphenation
part_element = etree.Element("p")
part_element.set("klasse", "J")
comment_element = etree.Element(tagname)
comment_element.text = new_element_text
part_element.append(comment_element)
self.multiline_elements[0].getparent().replace(self.multiline_elements[0], part_element)
for element in self.multiline_elements[1:]:
element.getparent().remove(element)
start_found = False
end_found = False
self.multiline_text = []
self.multiline_elements = []
continue

View File

@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.XMLProtocol import XMLProtocol
import logging
class MdBData(XMLProtocol):
"""Class to handel operations on the Stammdatenbank."""
def __init__(self):
super(XMLProtocol, self).__init__()
self.logger = logging.getLogger(__name__)
def get_set(self, element_path, element_tree):
"""
Creates Sets from input path on element_tree.
"""
tmp_list = [element.text for element in
element_tree.iterfind(element_path) if element is not None]
set_of_elements = set(tmp_list)
return set_of_elements

View File

@ -0,0 +1,267 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.XMLProtocol import XMLProtocol
from utility import update_config
from lxml import etree
from datetime import datetime
from babel.dates import format_date
import os
import re
import logging
import configparser
class MetadataMarkup(XMLProtocol):
"""
This class is for opening one XML-protocoll, extracting the included
metadata and creating a new valid metadata head.
"""
def __init__(self):
super().__init__()
self.plenarprotokoll_string = str() # will be extracted with extract_metadata()
self.wahlperiode = int() # will be extracted with extract_metadata()
self.sitzungsnr = int() # will be extracted with extract_metadata()
self.herausgeber = "Deutscher Bundestag" # Always the same in every protocoll
self.berichtart = "Steongrafischer Bericht" # Always the same in every protocoll
self.sitzungstitel_string = ". Sitzung" # Always the same in every protocoll
self.ort = "Berlin" # Always the same in every protocoll
self.datum_ger_non_iso = str() # will be extracted with extract_metadata()
self.datum_iso = str() # ISO-date will be built from self.datum_ger_non_iso
self.datum_string = str() # will be built from self.datum_iso
self.attachment = str() # will be extracted from a split. Will not work
# all the time. But will not break the XML.
self.logger = logging.getLogger(__name__)
def extract_metadata(self, etree_element_object):
"""
Extracts metadata from the given XML-tags and wirtes them into the
instance variables
"""
root = etree_element_object
metadata_list = []
for element in root.iter():
if(element.tag != "TEXT"):
metadata_list.append(element.text)
metadata_list = metadata_list[1:]
self.wahlperiode = metadata_list[0]
self.plenarprotokoll_string = metadata_list[1].lower().title()
self.sitzungsnr = metadata_list[2].split("/")[1]
self.datum_ger_non_iso = metadata_list[3]
self.logger.info("Metadata successfully extracted.")
self.logger.info("Wahlperiode is:" + self.wahlperiode)
self.logger.info("Plenarprotokoll is:" + self.plenarprotokoll_string)
self.logger.info("Sitzungsnummer is:" + self.sitzungsnr)
self.logger.info("German non ISO date is:" + self.datum_ger_non_iso)
def built_iso_date(self, ger_date):
"""
Gets the german date and converts it to an ISO standard date.
"""
self.datum_iso = datetime.strptime(ger_date, "%d.%m.%Y").date()
self.logger.info("ISO date created:" + str(self.datum_iso))
def built_date_string(self, iso_date):
"""
Gets the ISO date and creates from it an german full string date.
"""
date_string = format_date(iso_date, format="full", locale="de_DE")
date_string = re.sub(r",", ", den", date_string)
self.datum_string = date_string
self.logger.info("Date string created:" + self.datum_string)
def delete_old_metadata(self, etree_element_object):
"""
Deletes old metadata tags and text. Renames root tag.
"""
for element in etree_element_object.iter():
if(element.tag != "TEXT" and element.tag != "DOKUMENT"):
element.getparent().remove(element)
elif(element.tag == "DOKUMENT"):
element.tag = "dbtplenarprotokoll"
elif(element.tag == "TEXT"):
self.full_content = element.text
element.getparent().remove(element)
self.logger.info("Old metadata deleted.")
def insert_new_metadata(self, etree_element_object):
"""
Inserts the extracted metadata and splitted content into new created
and valid xml tags according to the official schema.
"""
vorspann_element = etree.Element("vorspann")
xml_string = """
<kopfdaten>
<plenarprotokoll-nummer>{} <wahlperiode>{}</wahlperiode>/<sitzungsnr>{}</sitzungsnr>
(neu)</plenarprotokoll-nummer>
<herausgeber>{}</herausgeber>
<berichtart>{}</berichtart>
<sitzungstitel><sitzungsnr>{}</sitzungsnr>. Sitzung</sitzungstitel>
<veranstaltungsdaten><ort>{}</ort>, <datum date="{}">{}</datum></veranstaltungsdaten>
</kopfdaten>"""\
.format(self.plenarprotokoll_string, self.wahlperiode,
self.sitzungsnr, self.herausgeber, self.berichtart,
self.sitzungsnr, self.ort, self.datum_ger_non_iso,
self.datum_string)
etree_from_str = etree.fromstring(xml_string)
etree_element_object.insert(0, vorspann_element)
vorspann_element.append(etree_from_str)
toc_element = etree.Element("inhaltsverzeichnis")
toc_element.text = self.toc
vorspann_element.append(toc_element)
content_element = etree.Element("sitzungsverlauf")
content_element.text = self.president + self.content
etree_element_object.insert(2, content_element)
anlagen_element = etree.Element("anlagen")
anlagen_element. text = self.attachment
etree_element_object.insert(3, anlagen_element)
rednerliste_element = etree.Element("rednerliste",
sitzungsdatum=self.datum_ger_non_iso)
etree_element_object.insert(4, rednerliste_element)
self.xml_tree = etree_element_object
self.logger.info("New metadata XML-head inserted." + xml_string)
def split_content(self, etree_element_object):
"""Splits the full content to: table of content, speeches and in some
cases attachments."""
config = configparser.ConfigParser()
config.read("config.ini")
session_start_split = config["Regular expressions splits"]["session_start_president_split"]
regex_start = re.compile(session_start_split)
tmp_list = regex_start.split(self.full_content, maxsplit=1)
self.toc = tmp_list[0]
self.president = tmp_list[1]
self.content = tmp_list[2]
attachment_split = config["Regular expressions splits"]["attachment_split"]
regex_att = re.compile(attachment_split)
tmp_list = regex_att.split(self.content)
tmp_list = [element for element in tmp_list if element is not None]
if(tmp_list[-1] == ""): # if the split does not match anything last item is empty string.
self.content = "".join(tmp_list[0:-1])
self.attachment = "Keine Anlage extrahiert."
self.logger.warning(("There is no attachment."))
else:
self.content = "".join(tmp_list[0:-1])
self.attachment = tmp_list[-1]
self.logger.info("Attachment found.")
self.logger.info("Contet splitted at:" + str(regex_start))
self.logger.info("Contet splitted at:" + str(regex_att))
def get_session_times(self):
"""This function looks into the entire protocoll content to extract the
last closing time and the starting time. If only one of both or none are
found, the missing time will be set to xx:xx."""
config = configparser.ConfigParser()
config.read("config.ini")
regex_conf_values = config.items("Regular expressions time extraction")
regex_conf_values = [regex[1] for regex in regex_conf_values]
tmp_list = []
identifier = 0
start_time_found = True
end_time_found = True
for regex in (regex_conf_values):
identifier += 1
regex = re.compile(regex)
if(identifier == 1):
# Always gets first start time.
matches = list(regex.finditer(self.full_content))
if(len(matches) > 1):
match = matches[-1]
elif(len(matches) == 0):
match = None
else:
match = matches[0]
elif(identifier == 2):
# Always gets last closing time
matches = list(regex.finditer(self.full_content))
if(len(matches) > 1):
match = matches[-1]
elif(len(matches) == 0):
match = None
else:
match = matches[0]
if(match is None and identifier == 1):
self.logger.warning("No start time found for " + str(regex))
start_time_found = False
elif(match is None and identifier == 2):
self.logger.warning("No end time found for " + str(regex))
end_time_found = False
elif(match):
session_time = [group for group in match.groups()
if group is not None]
session_time = ["0" + group if len(group) == 1 else group for
group in session_time] # Adds a 0 in front if digit len is 1
if(len(session_time) == 2):
tmp_list.append(":".join(session_time))
elif(len(session_time) == 1):
tmp_list.append(session_time[0] + ":00")
if(len(tmp_list) == 2):
self.session_start_time = tmp_list[0]
self.session_end_time = tmp_list[1]
self.logger.info("Start time found: " + self.session_start_time)
self.logger.info("End time found: " + self.session_end_time)
self.logger.info("Successfully matched start and end times.")
elif(len(tmp_list) == 1 and start_time_found is True and end_time_found
is False):
self.session_start_time = tmp_list[0]
self.session_end_time = "xx:xx"
self.logger.warning("Only start time found: "
+ self.session_start_time)
self.logger.warning("End time set to: "
+ self.session_end_time)
elif(len(tmp_list) == 1 and start_time_found is False and end_time_found
is True):
self.session_end_time = tmp_list[0]
self.session_start_time = "xx:xx"
self.logger.warning("Only end time found: "
+ self.session_end_time)
self.logger.warning("Start time set to: "
+ self.session_start_time)
def write_to_attr(self, element, attr_key, attr_value):
"""
Writes two strings as a an attribute key value pair to a given
element.
"""
elements = self.xml_tree.findall(element)
if(elements == []):
element = self.tree.getroot()
elements.append(element)
for element in elements:
element.set(attr_key, attr_value)
self.xml_tree = self.xml_tree
self.logger.info("Wrote attribute "
+ attr_key
+ "="
+ "\""
+ attr_value
+ "\"")
def save_to_file(self, output_path, file_path, subfolder, config_section,
config_key):
"""
Writes the new markup to a new xml file. Takes the output path and
creates a new folder there. Also updates the config file with the new
path.
"""
self.filename = os.path.basename(file_path)
save_path = os.path.join(output_path, subfolder)
if not os.path.exists(save_path):
os.mkdir(save_path)
tree = etree.ElementTree(self.xml_tree)
new_filename = self.filename
save_file_path = os.path.join(save_path, new_filename)
tree.write(save_file_path,
pretty_print=True,
xml_declaration=True,
encoding="utf8",
doctype="<!DOCTYPE dbtplenarprotokoll SYSTEM 'dbtplenarprotokoll_minimal.dtd\'>")
self.logger.info("New XML saved to:" + save_file_path)
update_config.update_config("config.ini", config_section, config_key,
save_path)

View File

@ -0,0 +1,161 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from markup.EntityMarkup import EntityMarkup
import re
import logging
class SpeakerMarkup(EntityMarkup):
"""
Class for specific markup of different speakers identified by different
regular expressions included in the config file.
"""
def __init__(self, string, regex):
super(SpeakerMarkup).__init__()
self.string_to_search = string
self.regex_string = regex
self.logger = logging.getLogger(__name__)
def identify_speaker(self):
"""
Gets match objects from the speakers in the given text node. Also
calculates length of it and puts the matches in a list.
"""
self.matches = re.finditer(self.regex_compiled, self.string_to_search)
tmp_list = []
for match in self.matches:
tmp_list.append(match)
self.matches_count = len(tmp_list)
self.matches = tmp_list
def markup_speaker(self, case="middle"):
"""
This is where the first simple markup happens. It uses the matches
and replaces them with simple markup for further processing. The
'first' markup uses re.sub. The second and third one work on string
basis.
"""
def markup_logging():
"""Helper function for creating log file output."""
if(self.matches_count == 0):
self.logger.warning("0 matches for given expression:"
+ self.regex_string)
elif(self.matches_count > 0):
self.logger.info(str(self.matches_count)
+ " matches for given expression:"
+ self.regex_string)
elif(self.matches_count == 1):
self.logger.info(str(self.matches_count)
+ " match for given expression:"
+ self.regex_string)
if(case == "first"):
# Uses re.sub because it is only for one match.
start_tags = "<rede><redner>"
end_tags = "</redner>"
self.matches_count = 1 # sets count to 1 because it only marks the first match
markup_logging()
first_match = self.matches[0]
start_xml = start_tags + first_match.group() + end_tags
if(len(first_match.group().split()) <= 10):
self.string_to_search = self.regex_compiled.sub(start_xml,
self.string_to_search,
count=1)
self.markuped_string = self.string_to_search
elif(case == "middle"):
"""
Does not use re.sub because it is faster to work on the string.
Also it avoids looping two times to get the specific match.group()
which caused some errors.
"""
index_shift = 0
start_tags = "\n</rede><rede><redner>"
end_tags = "</redner>"
markup_logging()
for match in self.matches:
index_start = match.start() + index_shift
index_end = match.end() + index_shift
whole_match_len = len(match.group())
# Handels cases where lots of text before the actual speaker is # matched
linebrks_in_match = len(match.group().split("\n"))
if(linebrks_in_match >= 2):
last_part_match = "".join(match.group().split("\n")[1:])
first_line_of_match = match.group().split("\n")[0]
if(len(first_line_of_match.split()) <= 10):
match = first_line_of_match + last_part_match
else:
match = last_part_match
delta_start_index = whole_match_len - len(match)
index_start = index_start + delta_start_index
self.string_to_search = (self.string_to_search[:index_start]
+ start_tags
+ match
+ end_tags
+ self.string_to_search[index_end:]
)
index_shift += len(start_tags) + len(end_tags)
else:
self.string_to_search = (self.string_to_search[:index_start]
+ start_tags
+ match.group()
+ end_tags
+ self.string_to_search[index_end:]
)
index_shift += len(start_tags) + len(end_tags)
self.markuped_string = self.string_to_search
elif(case == "last"):
index_shift = 0
"""
Matches the end of the session to add the last closing <rede> tag
to the last speech for well-formed xml. Uses re.sub because it is
only one operation.
"""
end_tag = "</rede>"
session_close_time_tag = ('<sitzungsende/>')
# Created end tags will be inserted into the protocol
if(len(self.matches) == 1):
self.logger.info("Last speech successfully tagged.")
markup_logging()
for match in self.matches:
end_xml = end_tag + match.group() + session_close_time_tag
if(len(match.group().split()) <= 15):
self.string_to_search = self.regex_compiled.sub(end_xml,
self.string_to_search,
count=1)
self.markuped_string = self.string_to_search
elif(len(self.matches) == 0):
self.logger.warning(("No end of session found! Last tag " + end_tag
+ " will be added to the end of the protocol."
" This might add some unrelated text to the "
"last speech."))
markup_logging()
self.markuped_string = self.string_to_search + end_tag
else:
markup_logging()
self.logger.warning(("There are " + str(len(self.matches))
+ " session endings. Ignoring the endings"
+ " before the last final ending of the "
+ " session."))
match = self.matches[-1]
end_xml = end_tag + match.group() + session_close_time_tag
whole_match_len = len(match.group())
index_start = match.start() + index_shift
index_end = match.end() + index_shift
last_line = match.group().split("\n")[-1] # Always takes the last line of a match avoiding lots of text before the actual speaker.
delta_start_index = whole_match_len - len(last_line)
index_start = index_start + delta_start_index
self.string_to_search = (self.string_to_search[:index_start]
+ end_xml
+ self.string_to_search[index_end:])
index_shift += len(end_tag)
self.markuped_string = self.string_to_search

View File

@ -0,0 +1,554 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from markup.SpeakerMarkup import SpeakerMarkup
from xml.etree import ElementTree
from lxml import etree
from tqdm import tqdm
from itertools import combinations
import copy
import logging
import re
import os
class SpeakerNameMarkup(SpeakerMarkup):
"""
This class is for the complex markup of the speakers in one given protocol.
Creates the name tag with all needed inforamtion from the Stammdatenbank.
Has to cross reference the speaker with said Stammdatenbank.
"""
known_redner_dicts = dict()
last_wahlperiode = int()
def __init__(self, file_path, element_name=".//redner"):
super(SpeakerNameMarkup).__init__()
self.file_path = file_path
self.filename = os.path.basename(self.file_path)[:-4]
self.element_name = element_name
self.redner_dict = dict()
self.all_speakers = []
self.logger = logging.getLogger(__name__)
def cross_reference_markup(self, strings, feature_set_dict,
MdB_etree):
"""
Checks if features like name, surename academic title and city are
present in the input string. Consists of main function and helper
functions. First the string will be split in tokens. Every token will
be checked a gainst sets of valid names, surnames, academic titles and
fractions. If there is a match a dictionary entriy will be set
accordingly.
Also uses the add_missing_MdB_feature helper function in a second step
to add features which are not present in the string or have been
identified wrongly.
The function crates a dictionary containing all features of one speaker
to crate a valid XML element from it later on.
"""
def initiate_dict(keys, extra_keys):
"""
Creates a dictionarie with a set of keys and sets them to None.
Some specific key values will be set to specific values.
"""
for key in keys:
redner_dict[key] = None
for key in extra_keys:
redner_dict[key] = None
redner_dict["feature_complete"] = False
redner_dict["original_string"] = string
redner_dict["identified"] = False
redner_dict["damalige_fraktion"] = None
def get_names(keys, dict, token):
"""
Checks if token is in set vorname or nachname. If it is dictionary
values will be set accordingly. Avoids that surname will be
overwirtten by a name wich is also a valid surname.
"""
for key in keys[0:2]: # Only for vorname, nachname in written order
if(token in feature_set_dict[key][0] and redner_dict[key]
is None):
redner_dict[key] = token
elif(token in feature_set_dict["nachname"][0]
and redner_dict["nachname"] is not None):
redner_dict["nachname"] = token
else:
continue
def get_feature(key, string, set):
"""
Checks if a token is a valid feature (like name affix or academic
title, ortszusatz or namenszusatz) and adds it to the dictionary.
Does not check for names.
"""
for feature in set:
if(key == "titel"):
regex = r"(\b{}\B)".format(re.escape(feature)) # could be Dr. and . is not a word boundary.
elif(key is "namenszusatz"):
regex = r"\b({})\b".format(re.escape(feature)) # No . in word so word boundary at start and end of regex.
elif(key is "fraktion"):
regex = r"\B(\({}\))\B".format(re.escape(feature)) # always surrounded by parentheses, but also has to match them to avoid matching i. e. "CDU" in "CDU/CSU"
elif(key is "ortszusatz"):
regex = r"\B{}\B".format(re.escape(feature)) # always surrounded by parentheses
else:
regex = r"(\b{}\b)".format(re.escape(feature))
match = re.search(regex, string)
if(match):
if(key == "fraktion"):
redner_dict[key] = match.group()[1:-1] # removes ()
break
else:
redner_dict[key] = match.group()
break
else:
redner_dict[key] = None
def get_role(string):
"""Checks redner string for role. Identifies 'Bundesministerin für
Familie, Senioren, Frauen und Jugend' etc."""
if("Staatssekretär" in string or "Staatssekretärin" in string):
regex = r"(Staatssekretär(in)?)"
splits = re.split(regex, string, maxsplit=1)
role_long = splits[1] + splits[-1]
redner_dict["rolle_lang"] = role_long
role_short = [word[0] for word in role_long.split()
if word[0].isupper()]
role_short = splits[1] + " " + "".join(role_short)
redner_dict["rolle_kurz"] = role_short
elif("Bundesminister" in string or "Bundesministerin" in string):
regex = r"(Bundesminister(in)?)"
splits = re.split(regex, string, maxsplit=1)
role_long = splits[1] + splits[-1]
redner_dict["rolle_lang"] = role_long
role_short = [word[0] for word in role_long.split()
if word[0].isupper()]
role_short = splits[1] + " " + "".join(role_short)
redner_dict["rolle_kurz"] = role_short
def check_name(redner_dict):
"""
Checks if vorname and nachname are the same. Sets vorname to None if
True. Vorname will be set later on with add_missing_MdB_feature.
"""
if(redner_dict["nachname"] == redner_dict["vorname"]):
redner_dict["vorname"] = None
def get_party(redner_dict):
"""
Creates a party key in the dictionary containing the party of the
speaker. Party is not the same as fraction. This is mainly done
because CDU/CSU is the fraction in the bundestag but speakers can
belong to either the CDU or CSU. If the fraction is not CDU/CSU
party will be set to fraction. Also handels problems with GRÜNE.
"""
if(redner_dict["fraktion"] != "CDU/CSU"
and redner_dict["fraktion"] != "CDU"
and redner_dict["fraktion"] != "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
elif(redner_dict["fraktion"] == "CDU"
or redner_dict["fraktion"] == "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
redner_dict["fraktion"] = "CDU/CSU"
if(redner_dict["fraktion"] == "GRÜNE"):
redner_dict["fraktion"] = "BÜNDNIS 90/DIE GRÜNEN"
def check_party_and_fraction():
"""
Checks if party and fraction have been set correctly. Will be used
after add_missing_MdB_feature. To correct some errors with CDU/CSU.
"""
if(redner_dict["fraktion"] is not None
and redner_dict["partei"] == "CDU"
or redner_dict["partei"] == "CSU"):
redner_dict["fraktion"] = "CDU/CSU"
if(redner_dict["partei"] is None
and redner_dict["fraktion"] is not None
and redner_dict["fraktion"] != "CDU"
and redner_dict["fraktion"] != "CSU"):
redner_dict["partei"] = redner_dict["fraktion"]
def get_match_in_str(key, string, regex):
"""
Matches a regex in the current string and adds it as a value to the
given key into the dictionary.
"""
match = re.search(regex, string)
if(match):
redner_dict[key] = match.group()
else:
redner_dict[key] = None
def add_missing_MdB_feature(string, redner_dict, feature_set_dict,
MdB_etree, conditions_key_list,
feature_lookup, feature_to_add,
logging_state=False, multi_ids=False):
"""
This function trys to get missing features for on speaker. Input is
a list of features(conditions_key_list) which are used as parameters
in an xpath expression. The Xpath is built dynamically from the
list.
If the Xpath matches one unique entry the feature(feature_to_add)
will be set to the match of feature_lookup in the matched element.
"""
###
# Xpath creation from conditions_key_list
###
xpath_parts = []
conds = conditions_key_list
len_conds = len(conds)
if(len_conds == 1):
for condition in conds:
xpath_part = ".//MDB[.//{}/text()='{}']" \
.format(feature_set_dict[condition][1],
redner_dict[condition])
xpath_parts.append(xpath_part)
xpath = "".join(xpath_parts)
if("None" in xpath):
xpath = None
elif(len_conds == 2):
xpath_first_part = ".//MDB[.//{}/text()='{}'" \
.format(feature_set_dict[conds[0]][1],
redner_dict[conds[0]])
xpath_parts.insert(0, xpath_first_part)
xpath_last_part = ".//{}/text()='{}']" \
.format(feature_set_dict[conds[-1]][1],
redner_dict[conds[-1]])
xpath_parts.append(xpath_last_part)
xpath = " and ".join(xpath_parts)
if("None" in xpath):
xpath = None
elif(len_conds > 2):
xpath_first_part = ".//MDB[.//{}/text()='{}'" \
.format(feature_set_dict[conds[0]][1],
redner_dict[conds[0]])
xpath_parts.insert(0, xpath_first_part)
for condition in conds[1:-1]:
xpath_inner_part = ".//{}/text()='{}'" \
.format(feature_set_dict[condition][1],
redner_dict[condition])
xpath_parts.append(xpath_inner_part)
xpath_last_part = ".//{}/text()='{}']" \
.format(feature_set_dict[conds[-1]][1],
redner_dict[conds[-1]])
xpath_parts.append(xpath_last_part)
xpath = " and ".join(xpath_parts)
if("None" in xpath): # sets xpaths to None if it uses a feature which is None
xpath = None
xpath_parts = [] # empties xpath_parts list
try: # tries every xpath
matches = MdB_etree.xpath(xpath)
except TypeError: # handles xpaths that are None
matches = []
# If xpath has unique match new feature value will be set to given feature
if(len(matches) == 1):
matches = matches[0]
feature_lookup = ".//" + feature_lookup
new_feature = matches.xpath(feature_lookup)[0].text
self.logger.info((" There is one unique match "
+ " for this speaker: "
+ str(redner_dict)
+ " Extracted feature "
+ feature_lookup + ": "
+ str(new_feature)
+ " with: "
+ str(conds)))
redner_dict[feature_to_add] = new_feature
self.logger.info(("New speaker features are: "
+ str(redner_dict)))
# Handels mathches tha are not unique for logging and mutli id
elif(len(matches) > 1):
self.logger.warning((" There are "
+ str(len(matches))
+ " matches for this speaker: "
+ str(redner_dict)
+ " .Could not extract: "
+ feature_lookup
+ " Features used are: "
+ str(conds)))
elif(len(matches) > 1 and multi_ids is True):
ids = matches
for id, i in ids, enumerate(ids):
key = "id" + i
redner_dict[key] = id
return matches
def get_periode(MdB_etree):
periode = self.xml_tree.xpath(".//wahlperiode")
if(periode):
redner_dict["wahlperiode"] = periode[0].text
return periode[0].text
###
# Start of main function cross_reference_markup
###
# Initiates empty dict and gets keys for it
redner_dict = dict()
features = list(feature_set_dict.keys())
# Counters to calculate how successful the identification of speakers is
identified_speakers = 0
unidentified_speakers = 0
multiple_identified_speakers = 0
# Cross references every <redner> string
for string in tqdm(strings, desc="Cross reference name markup for speakers in strings"):
self.logger.info("\nStarting name markup process for new speaker:")
# Sets values in redner_dict to None or specific value
initiate_dict(features, [feature for feature in features])
tokens = string.replace(":", "").replace(",", "").split() # replaces ":" and "," with nothing because some names would be "name:" and some names would contain a ","
for token in tokens:
get_names(features, feature_set_dict, token)
self.logger.info("nachname is: " + str(redner_dict["nachname"]))
feature_keys = [key for key in features if key not in ["vorname",
"nachname"]]
for f_key in feature_keys:
get_feature(f_key, string, feature_set_dict[f_key][0])
get_party(redner_dict)
check_name(redner_dict)
regex_p = r"^\w*(?:P|p)räsident\w*"
get_match_in_str("präsident", string, regex_p)
get_role(string)
###
# Checks if script is still running for the same current periode.
# If this is not the case the known_redner_dicts will be emptied.
###
current_wahlperiode = get_periode(MdB_etree)
if(current_wahlperiode != SpeakerNameMarkup.last_wahlperiode):
SpeakerNameMarkup.known_redner_dicts = dict()
SpeakerNameMarkup.last_wahlperiode = current_wahlperiode
###
# Creates possible combinations of features which will be used in
# add_missing_MdB_feature to identify missing features like vorname or
# nachname.
###
combination_features = [feature for feature in features if feature
not in ["namenszusatz",
"feature_complete",
"id",
"titel",
"rolle_kurz",
"rolle_lang",
"original_string",
"identified",
"damalige_fraktion"]]
subsets = []
for length in range(0, 5):
for subset in combinations(combination_features, length):
subsets.append(list(subset))
subsets = subsets[1:]
combination_features.remove("wahlperiode")
combination_features.remove("nachname")
###
# First while loop trying to identify every feature for one speaker.
# Uses combinations from above. Before calling the function
# add_missing_MdB_feature there is a check if the speaker has alreeady
# been identified before. If this is the case features will be set to
# the already identfied features. This saves a lot of time.
###
counter_feats = 0
while(redner_dict["feature_complete"] is False):
redner_dict["damalige_fraktion"] = redner_dict["fraktion"]
# print("Doing name markup for:", redner_dict)
# Checks if speaker has been already identified before.
if(string in SpeakerNameMarkup.known_redner_dicts):
# print("Speaker has already been identified once.")
redner_dict = SpeakerNameMarkup.known_redner_dicts[string].copy()
# print("Speaker features are set to:",
# SpeakerNameMarkup.known_redner_dicts[string])
redner_dict["identified"] = True
self.logger.info(("Speaker has alreeady been identified "
+ "once."))
self.logger.info(("Speaker features are set to: "
+ str(SpeakerNameMarkup.known_redner_dicts[string])))
if(SpeakerNameMarkup.known_redner_dicts[string]["feature_complete"] is not False):
identified_speakers += 1
break
else:
for feature in combination_features:
for subset in subsets:
add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
subset,
feature_set_dict[feature][1],
feature)
check_party_and_fraction()
if(redner_dict["vorname"] is not None
and redner_dict["nachname"] is not None
and redner_dict["fraktion"] is not None
and redner_dict["partei"] is not None):
redner_dict["feature_complete"] = True
counter_feats += 1
if(counter_feats == len(combination_features)):
redner_dict["feature_complete"] = False
break
###
# Second while loop uses four features to identfie the unique ID for one
# speaker with add_missing_MdB_feature. Also tries to identfie speakers
# with lesser known features. In this case there can be multiple possile
# ids for one speaker these will be saved in a special dictionary entry.
# Rare case.
###
counter_ids = 0
while(redner_dict["id"] is None):
if(redner_dict["feature_complete"] is True):
add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
["vorname", "nachname", "partei",
"wahlperiode"],
feature_set_dict["id"][1],
"id")
key_original_string = redner_dict["original_string"]
SpeakerNameMarkup.known_redner_dicts.update(
{key_original_string: redner_dict.copy()})
redner_dict["identified"] = True
if(counter_ids == 1):
redner_dict["id"] = None
redner_dict["feature_complete"] = False
redner_dict["identified"] = False
self.logger.warning(("Unique ID could not be assigned. "
+ "Feature complete: True "
+ "Features are: "
+ str(redner_dict)))
SpeakerNameMarkup.known_redner_dicts.update(
{key_original_string: redner_dict.copy()})
unidentified_speakers += 1
identified_speakers -= 1 # because identified_speakers was set before
break
identified_speakers += 1
elif(redner_dict["feature_complete"] is not True):
redner_dict["id"] = None
ids = add_missing_MdB_feature(string,
redner_dict,
feature_set_dict,
MdB_etree,
["nachname", "partei",
"wahlperiode"],
feature_set_dict["id"][1],
"id", False, True)
if(ids is not None and len(ids) > 1):
redner_dict["identified"] = "Multiple"
multiple_identified_speakers += 1
identified_speakers -= 1
break
elif(ids is None):
self.logger.warning(("Unique ID could not be assigned. "
+ "Feature complete: False "
+ "Features are: "
+ str(redner_dict)))
redner_dict["identified"] = False
unidentified_speakers += 1
break
counter_ids += 1
self.logger.info(("Number of identified speakers with valid id and"
+ " name markup is: "
+ str(identified_speakers)))
self.logger.info(("Number of unidentified speakers without valid"
+ " id and name markup is: "
+ str(unidentified_speakers)))
self.logger.info(("Number of speakers with possible multiple ids: "
+ str(multiple_identified_speakers)))
self.logger.info(("Number of all speaker entitiys in current"
+ " protocoll is: "
+ str(len(strings))))
redner_dict_final = copy.deepcopy(redner_dict)
self.redner_dict = redner_dict_final
self.all_speakers.append(self.redner_dict)
for key in features:
redner_dict[key] = None
# print("Speaker features after whole cross reference markup:",
# redner_dict_final)
self.logger.info(("Saved speakers (identfied and not identified): "
+ str(len(self.all_speakers))))
def create_speaker_elements(self):
"""
Creates a valid redner XML element for one redner_dict entry from the
list self.all_speakers. Has to be done step by step becuase dictionary
is not sorted and name sub elements have to be in specific order.
"""
self.all_speaker_elements = []
for redner_entry in tqdm(self.all_speakers, desc="Creating speaker element"):
redner_element = etree.Element("redner")
redner_element.set("id", str(redner_entry["id"]))
name_element = etree.Element("name")
titel_element = etree.Element("titel")
titel_element.text = redner_entry["titel"]
vorname_element = etree.Element("vorname")
vorname_element.text = redner_entry["vorname"]
namenszusatz_element = etree.Element("namenszusatz")
namenszusatz_element.text = redner_entry["namenszusatz"]
nachname_element = etree.Element("nachname")
nachname_element.text = redner_entry["nachname"]
damalige_fraktion_element = etree.Element("damalige_fraktion")
damalige_fraktion_element.text = redner_entry["damalige_fraktion"]
fraktion_element = etree.Element("fraktion")
fraktion_element.text = redner_entry["fraktion"]
partei_element = etree.Element("partei")
partei_element.text = redner_entry["partei"]
ortszusatz_element = etree.Element("ortszusatz")
ortszusatz_element.text = redner_entry["ortszusatz"]
rolle_lang_element = etree.Element("rolle_lang")
rolle_lang_element.text = redner_entry["rolle_lang"]
rolle_kurz_element = etree.Element("rolle_kurz")
rolle_kurz_element.text = redner_entry["rolle_kurz"]
original_string_element = etree.Element("original_string")
original_string_element.text = redner_entry["original_string"]
if(redner_entry["titel"] is not None):
name_element.append(titel_element)
name_element.append(vorname_element)
if(redner_entry["namenszusatz"] is not None):
name_element.append(namenszusatz_element)
name_element.append(nachname_element)
name_element.append(damalige_fraktion_element)
name_element.append(fraktion_element)
name_element.append(partei_element)
if(redner_entry["ortszusatz"] is not None):
name_element.append(ortszusatz_element)
if(redner_entry["rolle_lang"] is not None):
name_element.append(rolle_lang_element)
name_element.append(rolle_kurz_element)
name_element.append(original_string_element)
name_element.tail = original_string_element.text
redner_element.append(name_element)
self.all_speaker_elements.append(redner_element)
self.logger.info(("Speaker element is: "
+ ElementTree.tostring(redner_element).decode("utf-8")))
def set_speech_ids(self):
"""
This functions sets a unique rede id for every rede element in one
protocoll. Id is a ten digit integer preceded by the string ID.
Example: ID1809900000
First two digits are the wahlperiode the followinf three digits are the
sitzungsnr (session number). The remaining digits are for counting the
speeches. First speech is 00100, second is 00200, eleventh is 01100 and so on.
Example: ID1809901100 --> eleventh speech
Last tow digits are for corrections.
"""
id_counter = 000
speeches = self.xml_tree.xpath(".//sitzungsbeginn | .//rede")
for speech in tqdm(speeches, desc="Creating speech ids"):
id_counter_str = str(id_counter).zfill(5)
id = "ID" + self.filename + id_counter_str
speech.set("id", id)
id_counter += 100
self.logger.info(("Speech id is: " + id))
self.xml_tree = self.xml_tree

View File

View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.FileGetter import FileGetter
from utility.XMLProtocol import XMLProtocol
import configparser
from tqdm import tqdm
def beautify_xml(case, alter_lines=False, line_width=0):
"""
Beautifies the xml protocols so that they are easily readable by humans.
Uses .beautify_xml_part() and .beautify_xml() to be able to format lines for
specific parts of an xml. Alter lines can be set to Flase or True. Line
width that will be used if alter_lines is True can be set to any value
between 0 and 160.
"""
config = configparser.ConfigParser()
config.read("config.ini")
if(case == "markup"):
output_path = config["File paths"]["output_folder"]
input_path = config["File paths"]["clear_speech_markup"]
key_name = "beautiful_xml"
elif(case == "nlp"):
output_path = config["File paths"]["nlp_output"]
input_path = config["File paths"]["nlp_lemmatized_tokenized"]
key_name = "nlp_beuatiful_xml"
files = FileGetter(input_path, "*.xml")
files = files.get_files()
for file_path in tqdm(sorted(files), desc="First beautification steps"):
xml = XMLProtocol()
xml.read_xml(file_path)
xml.beautify_xml_part(file_path, ".//vorspann")
xml.replace_elements(".//vorspann", [xml.beautified_part])
xml.beautify_xml_part(file_path, ".//sitzungsverlauf", alter_lines,
line_width)
xml.replace_elements(".//sitzungsverlauf", [xml.beautified_part])
xml.save_to_file(output_path, file_path, key_name,
"File paths", key_name)
config.read("config.ini")
beautiful_xmls_path = config["File paths"][key_name]
files = FileGetter(beautiful_xmls_path, "*.xml")
files = files.get_files()
for file_path in tqdm(files, desc="Second beautification steps"):
xml.beautify_xml(file_path, False)
if __name__ == '__main__':
beautify_xml()

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.FileGetter import FileGetter
from markup.MetadataMarkup import MetadataMarkup
from tqdm import tqdm
import os
import configparser
import logging
def get_metadata():
"""
This script creates a valid metadata head and first level xml tag strucutre
for all files in one directory with subdirs. It needs all filepaths for all
files to consider. File paths will be extracted by using the FileGetter
class.
After that it extracts the given metadata for one file each and writes it as
valid XML according to the new offical schema into a new file at the given
output path.
"""
logger = logging.getLogger(__name__)
print("Running metadata creation for original XML-protocolls.")
config = configparser.ConfigParser()
config.read("config.ini")
input_path = config["File paths"]["input_folder_xmls"]
output_path = config["File paths"]["output_folder"]
Files = FileGetter(input_path, "*.xml")
file_list = Files.get_files()
metadata = MetadataMarkup()
for file in tqdm(sorted(file_list), desc="Metadata status:"):
logger.info("\nCreating metadata for: " + str(os.path.basename(file)))
root = metadata.read_protcol(file)
metadata.extract_metadata(root)
metadata.built_iso_date(metadata.datum_ger_non_iso)
metadata.built_date_string(metadata.datum_iso)
metadata.delete_old_metadata(root)
metadata.split_content(root)
metadata.insert_new_metadata(root)
metadata.get_session_times()
metadata.write_to_attr("dbtplenarprotokoll", "sitzung-datum",
metadata.datum_ger_non_iso)
metadata.write_to_attr("dbtplenarprotokoll", "sitzung-start-uhrzeit",
metadata.session_start_time)
metadata.write_to_attr("dbtplenarprotokol", "sitzung-ende-uhrzeit",
metadata.session_end_time)
metadata.write_to_attr("dbtplenarprotokoll", "sitzungs-nr",
metadata.sitzungsnr)
metadata.write_to_attr("dbtplenarprotokol", "wahlperiode",
metadata.wahlperiode)
metadata.save_to_file(output_path, file, "new_metadata", "File paths", "new_metadata")
logger.info("New metadata created for: " + str(os.path.basename(file)))
print("Succesfully extracted and wrote new metadata to XML-protocolls.")
if __name__ == '__main__':
get_metadata()

View File

@ -0,0 +1,122 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from markup.SpeakerNameMarkup import SpeakerNameMarkup
from markup.MdBData import MdBData
from utility.FileGetter import FileGetter
from xml.etree import ElementTree
from tqdm import tqdm
import os
import configparser
import logging
def get_names():
"""
This script gets the identified speaker elements. It will analyse the text
of those to determine <vorname>, <nachname>, @id etc. for every speaker.
Also creates a speech id for every speech.
"""
###
# Setting paths in config and start logging
###
logger = logging.getLogger(__name__)
config = configparser.ConfigParser()
config.read("config.ini")
xml_path = config["File paths"]["new_simple_markup"]
output_path = config["File paths"]["output_folder"]
parent_path = os.path.dirname(os.getcwd())
stammdatenbank_full_path = os.path.join(parent_path,
"data/MdB_data/MdB_Stammdaten.xml")
###
# opens and reads Stammdatenbank
###
stammdatenbank = MdBData()
stammdatenbank.read_xml(stammdatenbank_full_path)
###
# Getting sets of different name name/MdB features
###
# getting first names
first_names = stammdatenbank.get_set(".//VORNAME", stammdatenbank.xml_tree)
first_names.discard(None)
# getting las names
last_names = stammdatenbank.get_set(".//NACHNAME", stammdatenbank.xml_tree)
last_names.discard(None)
# getting academic titles
academic_titles = stammdatenbank.get_set(".//AKAD_TITEL",
stammdatenbank.xml_tree)
academic_titles_short = stammdatenbank.get_set(".//ANREDE_TITEL",
stammdatenbank.xml_tree)
additional_academic_titles = [title for title in config["Additional name features"]["academic_titles"].split()]
for title in additional_academic_titles:
academic_titles.add(title)
academic_titles = academic_titles.union(academic_titles_short)
academic_titles.discard(None)
# getting parties
parties = stammdatenbank.get_set(".//PARTEI_KURZ", stammdatenbank.xml_tree)
additional_parties = [party for party in config["Additional name features"]["parties"].split()]
for party in additional_parties:
parties.add(party)
parties.discard(None)
# getting name affixes
name_affixes = stammdatenbank.get_set(".//PRAEFIX", stammdatenbank.xml_tree)
name_affixes.discard(None)
# getting cities
cities = stammdatenbank.get_set(".//ORTSZUSATZ", stammdatenbank.xml_tree)
cities.discard(None)
# setting empty sets to later combine them with XML node names for XPaths
party = set() #
periode = set() #
feature_complete = set() #
speaker_id = set() #
role_long = set()
role_short = set()
###
# creating dict with tuples of sets and corresponding XML node name
###
sets = [(first_names, "VORNAME"), (last_names, "NACHNAME"),
(academic_titles, "AKAD_TITEL"), (parties, "PARTEI_KURZ"),
(name_affixes, "PRAEFIX"), (cities, "ORTSZUSATZ"),
(party, "PARTEI_KURZ"), (periode, "WP"), (feature_complete, "None"),
(speaker_id, "ID"), (role_long, "None"), (role_short, "None")]
features = ["vorname", "nachname", "titel", "fraktion", "namenszusatz",
"ortszusatz", "partei", "wahlperiode", "feature_complete",
"id", "rolle_lang", "rolle_kurz"]
feature_set_dict = dict(zip(features, sets))
###
# opening XML protocolls
# starting speaker markup for features
###
files = FileGetter(xml_path, "*.xml")
files = files.get_files()
for file_path in tqdm(sorted(files),
desc="File status"):
complex_speaker = SpeakerNameMarkup(file_path, ".//redner")
complex_speaker.read_xml(file_path)
complex_speaker.get_element_text()
logger.info(("Doing cross reference markup for names to get redner ids."
+ " For file: "
+ os.path.basename(file_path)))
complex_speaker.cross_reference_markup(complex_speaker.current_strings,
feature_set_dict,
stammdatenbank.xml_tree)
complex_speaker.create_speaker_elements()
complex_speaker.replace_elements(".//redner",
complex_speaker.all_speaker_elements,
True)
xml_string = ElementTree.tostring(complex_speaker.xml_tree)
bool = complex_speaker.simple_check_xml(xml_string, file_path, False,
False)
if(bool is False):
logger.error(("This XML file is not well-formed. Program stopped."
" Fix or remove this file an run the program again."
))
print("Program has stopped. See logs for more info.")
break
complex_speaker.set_speech_ids()
complex_speaker.save_to_file(output_path, file_path, "complex_markup",
"File paths", "complex_markup")
if __name__ == '__main__':
get_names()

View File

@ -0,0 +1,114 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.FileGetter import FileGetter
from utility.XMLProtocol import XMLProtocol
from markup.EntityMarkup import EntityMarkup
from markup.SpeakerMarkup import SpeakerMarkup
from tqdm import tqdm
import configparser
import logging
import os
def get_speakers():
"""
This script identifies speakers in one xml with the new metadata structure
created by metastructure.py and applies well-formed XML markup to them and their
speeches. The markup trys to follow the official guideline from the Deutsche
Bundesregierung but is more simplistic and deviates from it when it comes down
to apply markup to the presiden of a session. This decision was made to
guarantee that every speakers speech only contains what he or she is saying.
Thus the markup follows the own minimal markup defined in the DTD
'minimal_markup.dtd' which trys to mimic the official one as close as
possible. The full offical markup cannot be applied to the XML protocolls
automatically. Script uses classes and subclasses from EntityMarkup.py.
"""
logger = logging.getLogger(__name__)
print("Running simple markup for first speaker identification.")
config = configparser.ConfigParser()
config.read("config.ini")
regex_conf_triples = config.items("Regular expressions speakers")
regex_conf_triples = [regex[1].split(" ; ") for regex in regex_conf_triples]
input_path = config["File paths"]["new_metadata"]
output_path = config["File paths"]["output_folder"]
files = FileGetter(input_path, "*.xml")
file_list = files.get_files()
sum_matches = 0
for file_path in tqdm(sorted(file_list), desc="Speaker markup status"):
identified = EntityMarkup(file_path)
logger.info("Doing simple markup for: " + str(os.path.basename(file_path)))
logger.info("\nMarkup status for: " + str(os.path.basename(file_path)))
with open(file_path, 'r') as f:
xml_as_string = f.read()
xml_as_bytes = xml_as_string.encode("utf-8")
bool = identified.simple_check_xml(xml_as_bytes, file_path, False,
False)
if(bool is False):
logger.error(("This XML file is not well-formed. Program stopped."
" Fix or remove this file an run the program again."
))
print("Program has stopped. See logs for more info.")
break
identified.read_xml(file_path)
identified.get_element_text()
string_for_markup = identified.current_string
# Start of simple markup
for regex_conf_triplet in regex_conf_triples:
regex = regex_conf_triplet[0]
case = regex_conf_triplet[1]
speaker = SpeakerMarkup(string_for_markup, regex)
speaker.compile_regex(regex)
speaker.identify_speaker()
speaker.markup_speaker(case)
string_for_markup = speaker.markuped_string
sum_matches += speaker.matches_count
logger.info(str(sum_matches) + " total matches in the protocol.")
sum_matches = 0
speaker.simple_check_xml(string_for_markup, file_path, False)
# Saving simple markuped string to xml
speaker.read_xml(file_path)
speaker.replace_string(string_for_markup, "sitzungsverlauf")
speaker.save_to_file(output_path, file_path, "simple_xml", "File paths",
"new_simple_markup")
print("Simple markup finished.")
config.read("config.ini")
new_simple_xml_path = config["File paths"]["new_simple_markup"]
# Start of president Replacer
new_files = FileGetter(new_simple_xml_path, "*.xml")
new_file_list = new_files.get_files()
print("Replacing some XML-elements in the protocolls.")
for file_path in tqdm(sorted(new_file_list), desc="Files replacement status"):
logger.info("Replacing some xml elements for: " + str(os.path.basename(file_path)))
for regex_conf_triplet in regex_conf_triples:
if(regex_conf_triplet[1] != "first"
or regex_conf_triplet[1] != "last"):
regex = regex_conf_triplet[0]
speaker_rolle_value = regex_conf_triplet[2]
replacements = XMLProtocol()
replacements.read_xml(file_path)
replacements.compile_regex(regex)
replacements.expand_element(".//rede", "typ",
speaker_rolle_value)
replacements.save_to_file(output_path, file_path, "simple_xml",
"File paths", "new_simple_markup")
start_time_attr_value = replacements.xml_tree.get("sitzung-start-uhrzeit")
replacements.replace_tag_attr(".//sitzungsverlauf/rede[1]",
"sitzungsbeginn",
"sitzung-start-uhrzeit",
start_time_attr_value,
False)
end_time_attr_value = replacements.xml_tree.get("sitzung-ende-uhrzeit")
replacements.expand_element(".//sitzungsende", "sitzung-ende-uhrzeit",
end_time_attr_value, False)
replacements.save_to_file(output_path, file_path, "simple_xml",
"File paths", "new_simple_markup")
if __name__ == '__main__':
get_speakers()

View File

@ -0,0 +1,76 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.FileGetter import FileGetter
from markup.EntityMarkup import EntityMarkup
import configparser
from tqdm import tqdm
import logging
def markup_speeches():
"""
Marks up different entitys in the speech strings. For example comments.
First it marks speech parts (<p>) line by line.
"""
logger = logging.getLogger(__name__)
config = configparser.ConfigParser()
config.read("config.ini")
complex_xmls = config["File paths"]["complex_markup"]
output_path = config["File paths"]["output_folder"]
regex_conf_pairs = config.items("Regular expressions speeches")
regex_conf_pairs = [regex[1].split(" ; ") for regex in regex_conf_pairs]
multiline_entities = config.items("Multiline entities")
multiline_entities = [regex[1].split(" ; ") for regex in multiline_entities]
files = FileGetter(complex_xmls, "*.xml")
file_list = files.get_files()
for file_path in tqdm(sorted(file_list), desc="File status speech markup"):
entity = EntityMarkup(file_path)
entity.read_xml(file_path)
speeches = entity.xml_tree.xpath(".//rede")
session_start = entity.xml_tree.xpath(".//sitzungsbeginn")[0]
for speech in speeches:
entity.markup_speech_lines(speech)
entity.markup_speech_lines(session_start)
session_lines = entity.xml_tree.xpath(".//p")
for line in tqdm(session_lines, desc="Marking single line entities"):
for pair in regex_conf_pairs:
entity.inject_element(line, pair[0], pair[1])
session_lines = entity.xml_tree.xpath(".//p") # gets new altered session lines (<p>)
for pair in tqdm(multiline_entities, desc="Marking multiline entities:"):
entity.get_multiline_entities(session_lines, pair[0], pair[1], pair[2])
# For logging
all_entities = 0
only_single_line_entities = 0
for pair in regex_conf_pairs:
element_path = ".//" + pair[1]
nr_entities = len(entity.xml_tree.xpath(element_path))
logger.info(("Number of identified " + pair[1] + " elements is: "
+ str(nr_entities)
+ " (single line)"))
all_entities += nr_entities
only_single_line_entities += nr_entities
for pair in multiline_entities:
element_path = ".//" + pair[2]
nr_entities = len(entity.xml_tree.xpath(element_path))
logger.info(("Number of identified " + pair[2] + " elements is: "
+ str(nr_entities)
+ " (multi line)"))
all_entities += nr_entities
logger.info(("Number of all identified single line entities: "
+ str(only_single_line_entities)))
logger.info(("Number of all identified entities is: " + str(all_entities)
+ " Also includes multiline matches. Number could be higher"
+ " than it is if multiline matches are matching the same"
+ " like the single line entitie regexes."))
entity.save_to_file(output_path, file_path, "clear_speech_markup",
"File paths", "clear_speech_markup")
if __name__ == '__main__':
markup_speeches()