bundesdata_markup_nlp_software/bundesdata_markup_nlp/markup/MetadataMarkup.py
2019-02-21 19:29:44 +01:00

268 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utility.XMLProtocol import XMLProtocol
from utility import update_config
from lxml import etree
from datetime import datetime
from babel.dates import format_date
import os
import re
import logging
import configparser
class MetadataMarkup(XMLProtocol):
"""
This class is for opening one XML-protocoll, extracting the included
metadata and creating a new valid metadata head.
"""
def __init__(self):
super().__init__()
self.plenarprotokoll_string = str() # will be extracted with extract_metadata()
self.wahlperiode = int() # will be extracted with extract_metadata()
self.sitzungsnr = int() # will be extracted with extract_metadata()
self.herausgeber = "Deutscher Bundestag" # Always the same in every protocoll
self.berichtart = "Steongrafischer Bericht" # Always the same in every protocoll
self.sitzungstitel_string = ". Sitzung" # Always the same in every protocoll
self.ort = "Berlin" # Always the same in every protocoll
self.datum_ger_non_iso = str() # will be extracted with extract_metadata()
self.datum_iso = str() # ISO-date will be built from self.datum_ger_non_iso
self.datum_string = str() # will be built from self.datum_iso
self.attachment = str() # will be extracted from a split. Will not work
# all the time. But will not break the XML.
self.logger = logging.getLogger(__name__)
def extract_metadata(self, etree_element_object):
"""
Extracts metadata from the given XML-tags and wirtes them into the
instance variables
"""
root = etree_element_object
metadata_list = []
for element in root.iter():
if(element.tag != "TEXT"):
metadata_list.append(element.text)
metadata_list = metadata_list[1:]
self.wahlperiode = metadata_list[0]
self.plenarprotokoll_string = metadata_list[1].lower().title()
self.sitzungsnr = metadata_list[2].split("/")[1]
self.datum_ger_non_iso = metadata_list[3]
self.logger.info("Metadata successfully extracted.")
self.logger.info("Wahlperiode is:" + self.wahlperiode)
self.logger.info("Plenarprotokoll is:" + self.plenarprotokoll_string)
self.logger.info("Sitzungsnummer is:" + self.sitzungsnr)
self.logger.info("German non ISO date is:" + self.datum_ger_non_iso)
def built_iso_date(self, ger_date):
"""
Gets the german date and converts it to an ISO standard date.
"""
self.datum_iso = datetime.strptime(ger_date, "%d.%m.%Y").date()
self.logger.info("ISO date created:" + str(self.datum_iso))
def built_date_string(self, iso_date):
"""
Gets the ISO date and creates from it an german full string date.
"""
date_string = format_date(iso_date, format="full", locale="de_DE")
date_string = re.sub(r",", ", den", date_string)
self.datum_string = date_string
self.logger.info("Date string created:" + self.datum_string)
def delete_old_metadata(self, etree_element_object):
"""
Deletes old metadata tags and text. Renames root tag.
"""
for element in etree_element_object.iter():
if(element.tag != "TEXT" and element.tag != "DOKUMENT"):
element.getparent().remove(element)
elif(element.tag == "DOKUMENT"):
element.tag = "dbtplenarprotokoll"
elif(element.tag == "TEXT"):
self.full_content = element.text
element.getparent().remove(element)
self.logger.info("Old metadata deleted.")
def insert_new_metadata(self, etree_element_object):
"""
Inserts the extracted metadata and splitted content into new created
and valid xml tags according to the official schema.
"""
vorspann_element = etree.Element("vorspann")
xml_string = """
<kopfdaten>
<plenarprotokoll-nummer>{} <wahlperiode>{}</wahlperiode>/<sitzungsnr>{}</sitzungsnr>
(neu)</plenarprotokoll-nummer>
<herausgeber>{}</herausgeber>
<berichtart>{}</berichtart>
<sitzungstitel><sitzungsnr>{}</sitzungsnr>. Sitzung</sitzungstitel>
<veranstaltungsdaten><ort>{}</ort>, <datum date="{}">{}</datum></veranstaltungsdaten>
</kopfdaten>"""\
.format(self.plenarprotokoll_string, self.wahlperiode,
self.sitzungsnr, self.herausgeber, self.berichtart,
self.sitzungsnr, self.ort, self.datum_ger_non_iso,
self.datum_string)
etree_from_str = etree.fromstring(xml_string)
etree_element_object.insert(0, vorspann_element)
vorspann_element.append(etree_from_str)
toc_element = etree.Element("inhaltsverzeichnis")
toc_element.text = self.toc
vorspann_element.append(toc_element)
content_element = etree.Element("sitzungsverlauf")
content_element.text = self.president + self.content
etree_element_object.insert(2, content_element)
anlagen_element = etree.Element("anlagen")
anlagen_element. text = self.attachment
etree_element_object.insert(3, anlagen_element)
rednerliste_element = etree.Element("rednerliste",
sitzungsdatum=self.datum_ger_non_iso)
etree_element_object.insert(4, rednerliste_element)
self.xml_tree = etree_element_object
self.logger.info("New metadata XML-head inserted." + xml_string)
def split_content(self, etree_element_object):
"""Splits the full content to: table of content, speeches and in some
cases attachments."""
config = configparser.ConfigParser()
config.read("config.ini")
session_start_split = config["Regular expressions splits"]["session_start_president_split"]
regex_start = re.compile(session_start_split)
tmp_list = regex_start.split(self.full_content, maxsplit=1)
self.toc = tmp_list[0]
self.president = tmp_list[1]
self.content = tmp_list[2]
attachment_split = config["Regular expressions splits"]["attachment_split"]
regex_att = re.compile(attachment_split)
tmp_list = regex_att.split(self.content)
tmp_list = [element for element in tmp_list if element is not None]
if(tmp_list[-1] == ""): # if the split does not match anything last item is empty string.
self.content = "".join(tmp_list[0:-1])
self.attachment = "Keine Anlage extrahiert."
self.logger.warning(("There is no attachment."))
else:
self.content = "".join(tmp_list[0:-1])
self.attachment = tmp_list[-1]
self.logger.info("Attachment found.")
self.logger.info("Contet splitted at:" + str(regex_start))
self.logger.info("Contet splitted at:" + str(regex_att))
def get_session_times(self):
"""This function looks into the entire protocoll content to extract the
last closing time and the starting time. If only one of both or none are
found, the missing time will be set to xx:xx."""
config = configparser.ConfigParser()
config.read("config.ini")
regex_conf_values = config.items("Regular expressions time extraction")
regex_conf_values = [regex[1] for regex in regex_conf_values]
tmp_list = []
identifier = 0
start_time_found = True
end_time_found = True
for regex in (regex_conf_values):
identifier += 1
regex = re.compile(regex)
if(identifier == 1):
# Always gets first start time.
matches = list(regex.finditer(self.full_content))
if(len(matches) > 1):
match = matches[-1]
elif(len(matches) == 0):
match = None
else:
match = matches[0]
elif(identifier == 2):
# Always gets last closing time
matches = list(regex.finditer(self.full_content))
if(len(matches) > 1):
match = matches[-1]
elif(len(matches) == 0):
match = None
else:
match = matches[0]
if(match is None and identifier == 1):
self.logger.warning("No start time found for " + str(regex))
start_time_found = False
elif(match is None and identifier == 2):
self.logger.warning("No end time found for " + str(regex))
end_time_found = False
elif(match):
session_time = [group for group in match.groups()
if group is not None]
session_time = ["0" + group if len(group) == 1 else group for
group in session_time] # Adds a 0 in front if digit len is 1
if(len(session_time) == 2):
tmp_list.append(":".join(session_time))
elif(len(session_time) == 1):
tmp_list.append(session_time[0] + ":00")
if(len(tmp_list) == 2):
self.session_start_time = tmp_list[0]
self.session_end_time = tmp_list[1]
self.logger.info("Start time found: " + self.session_start_time)
self.logger.info("End time found: " + self.session_end_time)
self.logger.info("Successfully matched start and end times.")
elif(len(tmp_list) == 1 and start_time_found is True and end_time_found
is False):
self.session_start_time = tmp_list[0]
self.session_end_time = "xx:xx"
self.logger.warning("Only start time found: "
+ self.session_start_time)
self.logger.warning("End time set to: "
+ self.session_end_time)
elif(len(tmp_list) == 1 and start_time_found is False and end_time_found
is True):
self.session_end_time = tmp_list[0]
self.session_start_time = "xx:xx"
self.logger.warning("Only end time found: "
+ self.session_end_time)
self.logger.warning("Start time set to: "
+ self.session_start_time)
def write_to_attr(self, element, attr_key, attr_value):
"""
Writes two strings as a an attribute key value pair to a given
element.
"""
elements = self.xml_tree.findall(element)
if(elements == []):
element = self.tree.getroot()
elements.append(element)
for element in elements:
element.set(attr_key, attr_value)
self.xml_tree = self.xml_tree
self.logger.info("Wrote attribute "
+ attr_key
+ "="
+ "\""
+ attr_value
+ "\"")
def save_to_file(self, output_path, file_path, subfolder, config_section,
config_key):
"""
Writes the new markup to a new xml file. Takes the output path and
creates a new folder there. Also updates the config file with the new
path.
"""
self.filename = os.path.basename(file_path)
save_path = os.path.join(output_path, subfolder)
if not os.path.exists(save_path):
os.mkdir(save_path)
tree = etree.ElementTree(self.xml_tree)
new_filename = self.filename
save_file_path = os.path.join(save_path, new_filename)
tree.write(save_file_path,
pretty_print=True,
xml_declaration=True,
encoding="utf8",
doctype="<!DOCTYPE dbtplenarprotokoll SYSTEM 'dbtplenarprotokoll_minimal.dtd\'>")
self.logger.info("New XML saved to:" + save_file_path)
update_config.update_config("config.ini", config_section, config_key,
save_path)