ocr/ocr
2020-09-22 17:44:32 +02:00

475 lines
20 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
"""
ocr
Usage: For usage instructions run with option --help
Authors: Patrick Jentsch <p.jentsch@uni-bielefeld.de
Stephan Porada <sporada@uni-bielefeld.de>
"""
from argparse import ArgumentParser
from natsort import natsorted
from pyflow import WorkflowRunner
import multiprocessing
import os
import sys
import tempfile
TESSERACT_MODELS = ['deu', 'eng', 'enm', 'fra', 'frk', 'frm', 'ita', 'por',
'spa']
def parse_args():
parser = ArgumentParser(description='OCR Pipeline utilizing tesseract.')
parser.add_argument('-i', '--input-directory',
help='Input directory (only PDF files get processed)',
required=True)
parser.add_argument('-o', '--output-directory',
help='Output directory',
required=True)
parser.add_argument('-l', '--language',
choices=TESSERACT_MODELS,
required=True)
parser.add_argument('--binarize',
action='store_true',
help='Use ocropy binarisation as preprocessing step.')
parser.add_argument('--log-dir')
parser.add_argument('--n-cores',
default=min(4, multiprocessing.cpu_count()),
help='Total number of cores available.',
type=int)
parser.add_argument('--intermediate-directory')
parser.add_argument('--zip',
help='Zips all results in different archives depending'
' on result types. Also zips everything into one '
'archive.')
return parser.parse_args()
class OCRPipelineJob:
"""An OCR pipeline job class
Each input file of the pipeline is represented as an OCR pipeline job,
which holds all necessary information for the pipeline to process it.
Arguments:
file -- Path to the file
output_dir -- Path to a directory, where job results a stored
intermediate_dir -- Path to a directory, where intermediate files are
stored.
"""
def __init__(self, file, output_dir, intermediate_dir):
self.file = file
self.intermediate_dir = intermediate_dir
self.name = os.path.basename(file).rsplit('.', 1)[0]
self.output_dir = output_dir
class OCRPipeline(WorkflowRunner):
def __init__(self, input_dir, lang, output_dir, binarize, intermediate_dir,
n_cores, zip):
self.input_dir = input_dir
self.lang = lang
self.output_dir = output_dir
self.binarize = binarize
if intermediate_dir is None:
self.intermediate_dir = os.path.join(output_dir, 'tmp')
else:
self.intermediate_dir = tempfile.mkdtemp(dir=intermediate_dir)
self.n_cores = n_cores
if zip is None:
self.zip = zip
else:
if zip.lower().endswith('.zip'):
# Remove .zip file extension if provided
self.zip = zip[:-4]
self.zip = self.zip if self.zip else 'output'
else:
self.zip = zip
self.jobs = collect_jobs(self.input_dir,
self.output_dir,
self.intermediate_dir)
def workflow(self):
if not self.jobs:
return
'''
' ##################################################
' # setup output directory #
' ##################################################
'''
setup_output_directory_tasks = []
for i, job in enumerate(self.jobs):
cmd = 'mkdir'
cmd += ' -p'
cmd += ' "{}"'.format(job.intermediate_dir)
cmd += ' "{}"'.format(os.path.join(job.output_dir, 'poco'))
lbl = 'setup_output_directory_-_{}'.format(i)
task = self.addTask(command=cmd, label=lbl)
setup_output_directory_tasks.append(task)
'''
' ##################################################
' # split input #
' ##################################################
'''
split_input_tasks = []
n_cores = min(self.n_cores, max(1, int(self.n_cores / len(self.jobs))))
for i, job in enumerate(self.jobs):
input_file = job.file
output_file = '{}/page-%d.tif'.format(job.intermediate_dir)
cmd = 'gs'
cmd += ' -dBATCH'
cmd += ' -dNOPAUSE'
cmd += ' -dNumRenderingThreads={}'.format(n_cores)
cmd += ' -dQUIET'
cmd += ' -r300'
cmd += ' -sDEVICE=tiff24nc'
cmd += ' -sCompression=lzw'
cmd += ' "-sOutputFile={}"'.format(output_file)
cmd += ' "{}"'.format(input_file)
deps = 'setup_output_directory_-_{}'.format(i)
lbl = 'split_input_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl, nCores=n_cores) # noqa
split_input_tasks.append(task)
if self.binarize:
'''
' The binarization_tasks list is created based on the output files
' of the split_tasks. So wait until they are finished.
'''
self.waitForTasks()
'''
' ##################################################
' # binarization #
' ##################################################
'''
binarization_tasks = []
'''
' We run ocropus-nlbin with either four or, if there are less then
' four cores available for this workflow, the available core
' number.
'''
n_cores = min(4, self.n_cores)
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_dir = job.intermediate_dir
files = filter(lambda x: x.endswith('.tif'), os.listdir(input_dir)) # noqa
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
cmd = 'ocropus-nlbin "{}"'.format('" "'.join(files))
cmd += ' --nocheck'
cmd += ' --output "{}"'.format(output_dir)
cmd += ' --parallel "{}"'.format(n_cores)
print(cmd)
deps = 'split_input_-_{}'.format(i)
lbl = 'binarization_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl, nCores=n_cores) # noqa
binarization_tasks.append(task)
self.waitForTasks()
'''
' ##################################################
' # Renaming of binarization output files #
' ##################################################
'''
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_dir = job.intermediate_dir
files = filter(lambda x: x.endswith('.bin.png'), os.listdir(input_dir)) # noqa
for file in files:
# int conversion is done in order to trim leading zeros
page_number = int(file.split('.', 1)[0])
output_file = 'page-{}.bin.png'.format(page_number)
os.rename(os.path.join(output_dir, file),
os.path.join(output_dir, output_file))
'''
' The ocr_tasks are created based of the output files of either the
' split_tasks or binarization_tasks. So wait until they are
' finished.
'''
self.waitForTasks()
'''
' ##################################################
' # ocr #
' ##################################################
'''
ocr_tasks = []
'''
' Tesseract runs fastest with four cores. So we run it with either four
' or, if there are less then four cores available for this workflow,
' the available core number.
'''
n_cores = min(4, self.n_cores)
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_dir = job.intermediate_dir
files = os.listdir(input_dir)
if self.binarize:
deps = 'binarization_-_{}'.format(i)
files = filter(lambda x: x.endswith('.bin.png'), files)
else:
deps = 'split_input_-_{}'.format(i)
files = filter(lambda x: x.endswith('.tif'), files)
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
for j, file in enumerate(files):
if self.binarize:
output_file_base = os.path.join(output_dir, file.rsplit('.', 2)[0]) # noqa
else:
output_file_base = os.path.join(output_dir, file.rsplit('.', 1)[0]) # noqa
cmd = 'tesseract "{}" "{}"'.format(file, output_file_base)
cmd += ' -l "{}"'.format(self.lang)
cmd += ' hocr pdf txt'
cmd += ' && '
cmd += 'sed -i \'s+{}/++g\' "{}".hocr'.format(input_dir, output_file_base) # noqa
lbl = 'ocr_-_{}-{}'.format(i, j)
task = self.addTask(command=cmd, dependencies=deps, label=lbl, nCores=n_cores) # noqa
ocr_tasks.append(task)
'''
' The following jobs are created based of the output files of the
' ocr_tasks. So wait until they are finished.
'''
self.waitForTasks()
'''
' ##################################################
' # combined pdf creation #
' ##################################################
'''
combined_pdf_creation_tasks = []
n_cores = min(self.n_cores, max(1, int(self.n_cores / len(self.jobs))))
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_file = os.path.join(job.output_dir, '{}.pdf'.format(job.name)) # noqa
files = filter(lambda x: x.endswith('.pdf'), os.listdir(input_dir))
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
cmd = 'gs'
cmd += ' -dBATCH'
cmd += ' -dNOPAUSE'
cmd += ' -dNumRenderingThreads={}'.format(n_cores)
cmd += ' -dPDFSETTINGS=/ebook'
cmd += ' -dQUIET'
cmd += ' -sDEVICE=pdfwrite'
cmd += ' "-sOutputFile={}"'.format(output_file)
cmd += ' "{}"'.format('" "'.join(files))
deps = filter(lambda x: x.startswith('ocr_-_{}'.format(i)), ocr_tasks) # noqa
lbl = 'combined_pdf_creation_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl, nCores=n_cores) # noqa
combined_pdf_creation_tasks.append(task)
'''
' ##################################################
' # combined txt creation #
' ##################################################
'''
combined_txt_creation_tasks = []
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_file = os.path.join(job.output_dir, '{}.txt'.format(job.name)) # noqa
files = filter(lambda x: x.endswith('.txt'), os.listdir(input_dir))
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
cmd = 'cat "{}" > "{}"'.format('" "'.join(files), output_file)
deps = filter(lambda x: x.startswith('ocr_-_{}'.format(i)), ocr_tasks) # noqa
lbl = 'combined_txt_creation_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
combined_txt_creation_tasks.append(task)
'''
' ##################################################
' # tei p5 creation #
' ##################################################
'''
tei_p5_creation_tasks = []
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_file = os.path.join(job.output_dir, '{}.xml'.format(job.name)) # noqa
files = filter(lambda x: x.endswith('.hocr'),
os.listdir(input_dir))
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
cmd = 'hocrtotei "{}" "{}"'.format('" "'.join(files),
output_file)
deps = filter(lambda x: x.startswith('ocr_-_{}'.format(i)), ocr_tasks) # noqa
lbl = 'tei_p5_creation_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
tei_p5_creation_tasks.append(task)
'''
' ##################################################
' # poco bundle creation #
' ##################################################
'''
poco_bundle_creation_tasks = []
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
output_dir = os.path.join(job.output_dir, 'poco')
files = os.listdir(input_dir)
if self.binarize:
files = filter(lambda x: x.endswith(('.bin.png', '.hocr')), files) # noqa
else:
files = filter(lambda x: x.endswith(('.tif', '.hocr')), files)
files = natsorted(files)
files = map(lambda x: os.path.join(input_dir, x), files)
cmd = 'mv "{}" "{}"'.format('" "'.join(files), output_dir)
deps = filter(lambda x: x.startswith('ocr_-_{}'.format(i)), ocr_tasks) # noqa
deps.append('tei_p5_creation_-_{}'.format(i))
lbl = 'poco_bundle_creation_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
poco_bundle_creation_tasks.append(task)
'''
' The following jobs are created based of the output files of the
' combined_pdf_creation_tasks. So wait until they are finished.
'''
self.waitForTasks()
'''
' ##################################################
' # cleanup #
' ##################################################
'''
cleanup_tasks = []
for i, job in enumerate(self.jobs):
input_dir = job.intermediate_dir
cmd = 'rm -r "{}"'.format(input_dir)
deps = ['combined_pdf_creation_-_{}'.format(i),
'combined_txt_creation_-_{}'.format(i),
'poco_bundle_creation_-_{}'.format(i),
'tei_p5_creation_-_{}'.format(i)]
lbl = 'job_cleanup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
cleanup_tasks.append(task)
input_dir = self.intermediate_dir
cmd = 'rm -r "{}"'.format(input_dir)
deps = cleanup_tasks
lbl = 'pipeline_cleanup'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
cleanup_tasks.append(task)
self.waitForTasks()
'''
' ##################################################
' # zip creation #
' ##################################################
'''
zip_creation_tasks = []
if self.zip is not None:
# zip all files
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.all.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*" "*tmp*"'
cmd += ' -i "*.pdf" "*.txt" "*.xml" "*.hocr" "*.{}"'.format('bin.png' if self.binarize else 'tif') # noqa
cmd += ' && '
cmd += 'cd -'
deps = combined_pdf_creation_tasks + combined_txt_creation_tasks + poco_bundle_creation_tasks # noqa
lbl = 'zip_creation_-_all'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
# zip PDF files
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.pdf.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*" "*tmp*"'
cmd += ' -i "*.pdf"'
cmd += ' && '
cmd += 'cd -'
deps = combined_pdf_creation_tasks
lbl = 'zip_creation_-_pdf'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
# zip TXT files
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.txt.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*" "*tmp*"'
cmd += ' -i "*.txt"'
cmd += ' && '
cmd += 'cd -'
deps = combined_txt_creation_tasks
lbl = 'zip_creation_-_txt'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
# zip XML files
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.xml.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*" "*tmp*"'
cmd += ' -i "*.xml"'
cmd += ' && '
cmd += 'cd -'
deps = tei_p5_creation_tasks
lbl = 'zip_creation_-_xml'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
# zip PoCo bundles
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.poco.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*" "*tmp*"'
cmd += ' -i "*.hocr" "*.{}"'.format('bin.png' if self.binarize else 'tif') # noqa
cmd += ' && '
cmd += 'cd -'
deps = poco_bundle_creation_tasks
lbl = 'zip_creation_-_poco'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
def collect_jobs(input_dir, output_dir, intermediate_dir):
jobs = []
for file in os.listdir(input_dir):
if os.path.isdir(os.path.join(input_dir, file)):
jobs += collect_jobs(os.path.join(input_dir, file),
os.path.join(output_dir, file))
elif file.lower().endswith('.pdf'):
job = OCRPipelineJob(os.path.join(input_dir, file),
os.path.join(output_dir, file),
os.path.join(intermediate_dir, file))
jobs.append(job)
return jobs
def main():
args = parse_args()
ocr_pipeline = OCRPipeline(args.input_directory, args.language,
args.output_directory, args.binarize,
args.intermediate_directory, args.n_cores,
args.zip)
retval = ocr_pipeline.run(
dataDirRoot=(args.log_dir or args.output_directory),
nCores=args.n_cores
)
sys.exit(retval)
if __name__ == '__main__':
main()