nlp/nlp

155 lines
4.9 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
"""
nlp
Usage: For usage instructions run with option --help
Author: Patrick Jentsch <p.jentsch@uni-bielefeld.de>
"""
import argparse
import multiprocessing
import os
import sys
from pyflow import WorkflowRunner
def parse_arguments():
parser = argparse.ArgumentParser(
description=('Performs NLP of documents utilizing spaCy. The results '
'are served as verticalized text files.')
)
parser.add_argument('-i', dest='input_dir', required=True)
parser.add_argument(
'-l',
choices=['de', 'el', 'en', 'es', 'fr', 'it', 'nl', 'pt'],
dest='lang',
required=True
)
parser.add_argument('-o', dest='output_dir', required=True)
parser.add_argument('--nCores',
default=min(4, multiprocessing.cpu_count()),
dest='n_cores',
help='total number of cores available',
required=False,
type=int)
parser.add_argument('--zip',
action='store_true',
default=False,
dest='zip',
help='package result files in zip bundles',
required=False)
parser.add_argument('--check-encoding',
action='store_true',
default=False,
dest="check_encoding",
help='''if used the nlp process will know hat the encoding of
the input files is unkown and thus != utf-8. The process will
try to determine the encoding of the input files and use this.
encoding.'''
)
return parser.parse_args()
class NLPWorkflow(WorkflowRunner):
def __init__(self, args):
self.jobs = analyze_jobs(args.input_dir, args.output_dir)
self.lang = args.lang
self.n_cores = args.n_cores
self.output_dir = args.output_dir
self.zip = args.zip
self.check_encoding
def workflow(self):
if len(self.jobs) == 0:
return
'''
' ##################################################
' # Create output directories #
' ##################################################
'''
create_output_directories_jobs = []
for index, job in enumerate(self.jobs):
cmd = 'mkdir -p "{}"'.format(job['output_dir'])
create_output_directories_jobs.append(
self.addTask(
command=cmd,
label='create_output_directories_job_-_{}'.format(index)
)
)
'''
' ##################################################
' # Natural language processing #
' ##################################################
'''
nlp_jobs = []
nlp_job_n_cores = min(
self.n_cores,
max(1, int(self.n_cores / len(self.jobs)))
)
for index, job in enumerate(self.jobs):
cmd = 'spacy_nlp -l "{}" "{}" "{}" "{}"'.format(
self.lang,
job['path'],
os.path.join(job['output_dir'], job['name'] + '.vrt',
if self.check_encoding "--check-encoding" else "")
)
nlp_jobs.append(
self.addTask(
command=cmd,
dependencies='create_output_directories_job_-_{}'.format(
index
),
label='nlp_job_-_{}'.format(index),
nCores=nlp_job_n_cores
)
)
if zip:
vrt_zip_jobs = []
vrt_zip_job_dependencies = nlp_jobs
cmd = 'cd "%s" && zip -m vrt.zip */*.vrt -x "pyflow.data*" && cd -' % (
self.output_dir
)
vrt_zip_jobs.append(
self.addTask(
command=cmd,
dependencies=vrt_zip_job_dependencies,
label='vrt_zip_job'
)
)
def analyze_jobs(input_dir, output_dir):
jobs = []
for file in os.listdir(input_dir):
if os.path.isdir(os.path.join(input_dir, file)):
jobs += analyze_jobs(os.path.join(input_dir, file),
os.path.join(output_dir, file))
elif file.endswith('.txt'):
jobs.append({'filename': file,
'name': file.rsplit('.', 1)[0],
'output_dir': os.path.join(output_dir, file),
'path': os.path.join(input_dir, file)})
return jobs
def main():
args = parse_arguments()
wflow = NLPWorkflow(args)
retval = wflow.run(dataDirRoot=args.output_dir, nCores=args.n_cores)
sys.exit(retval)
if __name__ == '__main__':
main()