nlp/nlp
2021-07-13 16:31:53 +02:00

206 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
"""A NLP pipeline for text file processing."""
__author__ = 'Patrick Jentsch <p.jentsch@uni-bielefeld.de>,' \
'Stephan Porada <porada@posteo.de>'
__version__ = '1.0.0'
from argparse import ArgumentParser
from pyflow import WorkflowRunner
import multiprocessing
import os
import sys
SPACY_MODELS = {'de': 'de_core_news_md',
'en': 'en_core_web_md',
'it': 'it_core_news_md',
'nl': 'nl_core_news_md',
'pl': 'pl_core_news_md',
'zh': 'zh_core_web_md'}
class NLPPipelineJob:
"""An NLP pipeline job class
Each input file of the pipeline is represented as an NLP pipeline job,
which holds all necessary information for the pipeline to process it.
Arguments:
file -- Path to the file
output_dir -- Path to a directory, where job results a stored
"""
def __init__(self, file, output_dir):
self.file = file
self.name = os.path.basename(file).rsplit('.', 1)[0]
self.output_dir = output_dir
catma_stand_off_data_file = file.rsplit('.', 1)[0] + '.catma-stand-off.json' # noqa
if os.path.exists(catma_stand_off_data_file):
self.catma_stand_off_data_file = catma_stand_off_data_file
else:
self.catma_stand_off_data_file = None
class NLPPipeline(WorkflowRunner):
def __init__(self, input_dir, output_dir, check_encoding, lang, zip):
self.input_dir = input_dir
self.output_dir = output_dir
self.check_encoding = check_encoding
self.lang = lang
self.zip = zip
self.jobs = collect_jobs(self.input_dir, self.output_dir)
def workflow(self):
if not self.jobs:
return
'''
' ##################################################
' # setup output directory #
' ##################################################
'''
setup_output_directory_tasks = []
for i, job in enumerate(self.jobs):
cmd = 'mkdir -p "{}"'.format(job.output_dir)
lbl = 'setup_output_directory_-_{}'.format(i)
task = self.addTask(command=cmd, label=lbl)
setup_output_directory_tasks.append(task)
'''
' ##################################################
' # nlp #
' ##################################################
'''
nlp_tasks = []
n_cores = max(1, int(self.getNCores() / len(self.jobs)))
mem_mb = min(n_cores * 2048, int(self.getMemMb() / len(self.jobs)))
for i, job in enumerate(self.jobs):
output_file = os.path.join(job.output_dir, '{}.nopaque-stand-off.json'.format(job.name)) # noqa
cmd = 'spacy-nlp'
cmd += ' -l "{}"'.format(self.lang)
cmd += ' --check-encoding' if self.check_encoding else ''
cmd += ' "{}"'.format(job.file)
cmd += ' "{}"'.format(output_file)
deps = 'setup_output_directory_-_{}'.format(i)
lbl = 'nlp_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl,
memMb=mem_mb, nCores=n_cores)
nlp_tasks.append(task)
'''
' ##################################################
' # vrt creation #
' ##################################################
'''
vrt_creation_tasks = []
for i, job in enumerate(self.jobs):
output_file = os.path.join(job.output_dir, '{}.vrt'.format(job.name)) # noqa
nopaque_stand_off_data_file = os.path.join(job.output_dir, '{}.nopaque-stand-off.json'.format(job.name)) # noqa
cmd = 'vrt-creator'
cmd += ' "{}"'.format(job.file)
cmd += ' "{}"'.format(nopaque_stand_off_data_file)
if job.catma_stand_off_data_file is not None:
cmd += ' --catma-stand-off-data "{}"'.format(job.catma_stand_off_data_file) # noqa
cmd += ' "{}"'.format(output_file)
deps = 'nlp_-_{}'.format(i)
lbl = 'vrt_creation_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
vrt_creation_tasks.append(task)
'''
' ##################################################
' # zip creation #
' ##################################################
'''
zip_creation_tasks = []
if self.zip is not None:
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*"'
cmd += ' -i "*.vrt" "*.json"'
cmd += ' && '
cmd += 'cd -'
deps = vrt_creation_tasks
lbl = 'zip_creation'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
def collect_jobs(input_dir, output_dir):
jobs = []
for file in os.listdir(input_dir):
if os.path.isdir(os.path.join(input_dir, file)):
continue
if file.lower().endswith('.txt'):
job = NLPPipelineJob(os.path.join(input_dir, file),
os.path.join(output_dir, file))
jobs.append(job)
return jobs
def parse_args():
parser = ArgumentParser(description='NLP pipeline for TXT file processing',
prog='NLP pipeline')
parser.add_argument('-i', '--input-dir',
help='Input directory',
required=True)
parser.add_argument('-o', '--output-dir',
help='Output directory',
required=True)
parser.add_argument('-l', '--language',
choices=SPACY_MODELS.keys(),
help='Language of the input (2-character ISO 639-1 language codes)', # noqa
required=True)
parser.add_argument('--check-encoding',
action='store_true',
help='Check encoding of the input file, UTF-8 is used instead') # noqa
parser.add_argument('--log-dir',
help='Logging directory')
parser.add_argument('--mem-mb',
help='Amount of system memory to be used (Default: min(--n-cores * 2048, available system memory))', # noqa
type=int)
parser.add_argument('--n-cores',
default=min(4, multiprocessing.cpu_count()),
help='Number of CPU threads to be used (Default: min(4, number of CPUs))', # noqa
type=int)
parser.add_argument('--zip',
help='Create one zip file per filetype')
parser.add_argument('-v', '--version',
action='version',
help='Returns the current version of the NLP pipeline',
version='%(prog)s {}'.format(__version__))
args = parser.parse_args()
# Set some tricky default values and check for insufficient input
if args.log_dir is None:
args.log_dir = args.output_dir
if args.n_cores < 1:
raise Exception('--n-cores must be greater or equal 1')
if args.mem_mb is None:
max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
args.mem_mb = min(args.n_cores * 2048, max_mem_mb)
if args.mem_mb < 2048:
raise Exception('--mem-mb must be greater or equal 2048')
if args.zip is not None and args.zip.lower().endswith('.zip'):
# Remove .zip file extension if provided
args.zip = args.zip[:-4]
args.zip = args.zip if args.zip else 'output'
return args
def main():
args = parse_args()
nlp_pipeline = NLPPipeline(args.input_dir, args.output_dir, args.check_encoding, args.language, args.zip) # noqa
retval = nlp_pipeline.run(dataDirRoot=args.log_dir, memMb=args.mem_mb, nCores=args.n_cores) # noqa
sys.exit(retval)
if __name__ == '__main__':
main()