file-setup/file-setup
2022-01-27 13:48:16 +01:00

191 lines
5.5 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
''' File setup pipeline for image file merging. '''
__version__ = '0.1.0'
from argparse import ArgumentParser
from pyflow import WorkflowRunner
import json
import os
import sys
class PipelineJob:
'''
File setup pipeline job class.
Each input directory containing images is represented here together with
all necessary information for the pipeline to process it.
Arguments:
dir -- Path to the directory
output_dir -- Path to a directory, where job results are stored
'''
def __init__(self, dir, output_dir):
self.dir = dir
self.name = os.path.basename(dir)
self.output_dir = output_dir
class CreatePDFWorkflow(WorkflowRunner):
def __init__(self, job):
self.job = job
def workflow(self):
'''
' ##################################################
' # convert #
' ##################################################
'''
n_cores = min(2, self.getNCores())
mem_mb = min(n_cores * 256, self.getMemMb())
cmd = 'ls -dv "{}/"* > "{}"'.format(
os.path.join(self.job.dir),
os.path.join(self.job.output_dir, 'inputs.txt')
)
cmd += ' && '
cmd += 'convert "@{}" "{}"'.format(
os.path.join(self.job.output_dir, 'inputs.txt'),
os.path.join(self.job.output_dir, '{}.pdf'.format(self.job.name))
)
cmd += ' && '
cmd += 'rm "{}"'.format(
os.path.join(self.job.output_dir, 'inputs.txt')
)
self.addTask(
'convert',
command=cmd,
memMb=mem_mb,
nCores=n_cores
)
class MainWorkflow(WorkflowRunner):
def __init__(self, input_dir, output_dir):
self.input_dir = input_dir
self.output_dir = output_dir
self.jobs = []
def collect_jobs(self):
self.jobs = []
for dir in os.listdir(self.input_dir):
if not os.path.isdir(os.path.join(self.input_dir, dir)):
continue
if not os.listdir(os.path.join(self.input_dir, dir)):
continue
# TODO: Filter for file types within the directory
job = PipelineJob(
os.path.join(self.input_dir, dir),
os.path.join(self.output_dir, dir)
)
self.jobs.append(job)
def workflow(self):
if not self.jobs:
return
# Create output and temporary directories
for job in self.jobs:
os.mkdir(job.output_dir)
'''
' ##################################################
' # create-pdf #
' ##################################################
'''
create_pdf_tasks = []
for i, job in enumerate(self.jobs):
task = self.addWorkflowTask(
'create_pdf_-_{}'.format(i),
CreatePDFWorkflow(job)
)
create_pdf_tasks.append(task)
self.waitForTasks()
outputs = []
for job in self.jobs:
# Track output files
relative_output_dir = os.path.relpath(
job.output_dir,
start=self.output_dir
)
outputs.append(
{
'description': 'PDF file without text layer.',
'file': os.path.join(
relative_output_dir,
'{}.pdf'.format(job.name)
),
'mimetype': 'application/pdf'
}
)
with open(os.path.join(self.output_dir, 'outputs.json'), 'w') as f:
json.dump(outputs, f, indent=4)
def parse_args():
parser = ArgumentParser(description='Pipeline for merging images')
parser.add_argument(
'-i', '--input-dir',
help='Input directory',
required=True
)
parser.add_argument(
'-o', '--output-dir',
help='Output directory',
required=True
)
parser.add_argument(
'--log-dir',
help='Logging directory (Default: --output-dir)'
)
parser.add_argument(
'--mem-mb',
help='Amount of system memory to be used '
'(Default: min(--n-cores * 256, available system memory))',
type=int
)
parser.add_argument(
'--n-cores',
default=1,
help='Number of CPU threads to be used',
type=int
)
parser.add_argument(
'-v', '--version',
action='version',
help='Returns the current version of the file setup pipeline',
version='%(prog)s {}'.format(__version__)
)
args = parser.parse_args()
# Set some tricky default values and check for insufficient input
if args.log_dir is None:
args.log_dir = args.output_dir
if args.n_cores < 1:
raise Exception('--n-cores must be greater or equal 1')
if args.mem_mb is None:
max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
args.mem_mb = min(args.n_cores * 256, max_mem_mb)
if args.mem_mb < 256:
raise Exception('--mem-mb must be greater or equal 256')
return args
def main():
args = parse_args()
main_workflow = MainWorkflow(args.input_dir, args.output_dir)
main_workflow.collect_jobs()
retval = main_workflow.run(
dataDirRoot=args.log_dir,
memMb=args.mem_mb,
nCores=args.n_cores
)
sys.exit(retval)
if __name__ == '__main__':
main()