nopaque/vre_nlp_node/nlp
Patrick Jentsch 86557443a2 Add prototype
2019-06-03 14:57:09 +02:00

131 lines
3.6 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
"""
nlp
Usage: For usage instructions run with option --help
Author: Patrick Jentsch <p.jentsch@uni-bielefeld.de>
"""
import argparse
import multiprocessing
import os
import sys
from pyflow import WorkflowRunner
def parse_arguments():
parser = argparse.ArgumentParser(
"Performs NLP of documents utilizing spaCy. \
Output is .vrt."
)
parser.add_argument("-i",
dest="inputDir",
help="Input directory.",
required=True)
parser.add_argument("-l",
dest='lang',
help="Language for NLP",
required=True)
parser.add_argument("-o",
dest="outputDir",
help="Output directory.",
required=True)
parser.add_argument("--nCores",
default=multiprocessing.cpu_count(),
dest="nCores",
help="Total number of cores available.",
required=False,
type=int)
return parser.parse_args()
class NLPWorkflow(WorkflowRunner):
def __init__(self, jobs, lang, nCores):
self.jobs = jobs
self.lang = lang
self.nCores = nCores
def workflow(self):
###
# Task "mkdir_job": create output directories
# Dependencies: None
###
mkdir_jobs = []
mkdir_job_number = 0
for job in self.jobs:
mkdir_job_number += 1
cmd = 'mkdir -p "%s"' % (
job["output_dir"]
)
mkdir_jobs.append(self.addTask(label="mkdir_job_-_%i" % (mkdir_job_number), command=cmd))
###
# Task "spacy_nlp_job": perform NLP
# Dependencies: mkdir_jobs
###
self.waitForTasks()
nlp_jobs = []
nlp_job_number = 0
for job in self.jobs:
nlp_job_number += 1
cmd = 'spacy_nlp -i "%s" -o "%s" -l "%s"' % (
job["path"],
os.path.join(job["output_dir"], os.path.basename(job["path"]).rsplit(".", 1)[0] + ".vrt"),
self.lang
)
nlp_jobs.append(self.addTask(label="nlp_job_-_%i" % (nlp_job_number), command=cmd, dependencies=mkdir_jobs))
###
# Task "zip_job": compress output
# Dependencies: nlp_jobs
###
zip_jobs = []
zip_job_number = 0
for job in self.jobs:
zip_job_number += 1
cmd = 'zip -jqr %s %s' % (
job["output_dir"] + "_-_nlp",
job["output_dir"]
)
zip_jobs.append(self.addTask(label="zip_job_-_%i" % (zip_job_number), command=cmd, dependencies=nlp_jobs))
def analyze_jobs(inputDir, outputDir, level=1):
jobs = []
if level > 2:
return jobs
for file in os.listdir(inputDir):
if os.path.isdir(os.path.join(inputDir, file)):
jobs += analyze_jobs(
os.path.join(inputDir, file),
os.path.join(outputDir, file),
level + 1
)
elif file.endswith(".txt"):
jobs.append({"path": os.path.join(inputDir, file), "output_dir": os.path.join(outputDir, file.rsplit(".", 1)[0])})
return jobs
def main():
args = parse_arguments()
wflow = NLPWorkflow(
analyze_jobs(args.inputDir, args.outputDir),
args.lang,
args.nCores
)
retval = wflow.run(nCores=args.nCores)
sys.exit(retval)
if __name__ == "__main__":
main()