file-setup/file-setup
2021-04-12 14:55:14 +02:00

190 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python2.7
# coding=utf-8
"""A file setup pipeline for image file merging."""
__author__ = 'Patrick Jentsch <p.jentsch@uni-bielefeld.de>,' \
'Stephan Porada <porada@posteo.de>'
__version__ = '1.0.0'
from argparse import ArgumentParser
from pyflow import WorkflowRunner
import os
import sys
class FileSetupPipelineJob:
"""An file setup pipeline job class
Each image containing input directory of the pipeline is represented as an
file setup pipeline job, which holds all necessary information for the
pipeline to process it.
Arguments:
dir -- Path to the directory
output_dir -- Path to a directory, where job results a stored
"""
def __init__(self, dir, output_dir):
self.dir = dir
self.name = os.path.basename(dir)
self.output_dir = output_dir
class FileSetupPipeline(WorkflowRunner):
def __init__(self, input_dir, output_dir, zip):
self.input_dir = input_dir
self.output_dir = output_dir
self.zip = zip
self.jobs = collect_jobs(self.input_dir, self.output_dir)
def workflow(self):
if not self.jobs:
return
'''
' ##################################################
' # setup output directory #
' ##################################################
'''
setup_output_directory_tasks = []
for i, job in enumerate(self.jobs):
cmd = 'mkdir -p "{}"'.format(job.output_dir)
lbl = 'setup_output_directory_-_{}'.format(i)
task = self.addTask(command=cmd, label=lbl)
setup_output_directory_tasks.append(task)
'''
' ##################################################
' # pre file setup #
' ##################################################
'''
pre_file_setup_tasks = []
for i, job in enumerate(self.jobs):
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa
cmd = 'ls -dQv "{}/"* >> "{}"'.format(job.dir, input_file)
deps = 'setup_output_directory_-_{}'.format(i)
lbl = 'pre_file_setup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
pre_file_setup_tasks.append(task)
'''
' ##################################################
' # file setup #
' ##################################################
'''
file_setup_tasks = []
n_cores = max(1, int(self.getNCores() / len(self.jobs)))
for i, job in enumerate(self.jobs):
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa
output_file = os.path.join(job.output_dir, '{}.pdf'.format(job.name)) # noqa
cmd = 'convert "@{}" "{}"'.format(input_file, output_file)
deps = 'pre_file_setup_-_{}'.format(i)
lbl = 'file_setup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl,
nCores=n_cores)
file_setup_tasks.append(task)
'''
' ##################################################
' # post file setup #
' ##################################################
'''
post_file_setup_tasks = []
for i, job in enumerate(self.jobs):
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa
cmd = 'rm "{}"'.format(input_file)
deps = 'file_setup_-_{}'.format(i)
lbl = 'post_file_setup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
post_file_setup_tasks.append(task)
'''
' ##################################################
' # zip creation #
' ##################################################
'''
zip_creation_tasks = []
if self.zip is not None:
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*"'
cmd += ' -i "*.pdf"'
cmd += ' && '
cmd += 'cd -'
deps = file_setup_tasks
lbl = 'zip_creation'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
def collect_jobs(input_dir, output_dir):
jobs = []
for dir in os.listdir(input_dir):
if not os.path.isdir(os.path.join(input_dir, dir)):
continue
# TODO: Filter for file types
if not os.listdir(os.path.join(input_dir, dir)):
continue
job = FileSetupPipelineJob(os.path.join(input_dir, dir),
os.path.join(output_dir, dir))
jobs.append(job)
return jobs
def parse_args():
parser = ArgumentParser(description='A file setup pipeline for image file merging', # noqa
prog='File setup pipeline')
parser.add_argument('-i', '--input-dir',
help='Input directory',
required=True)
parser.add_argument('-o', '--output-dir',
help='Output directory',
required=True)
parser.add_argument('--log-dir',
help='Logging directory')
parser.add_argument('--mem-mb',
help='Amount of system memory to be used (Default: min(--n-cores * 2048, available system memory))', # noqa
type=int)
parser.add_argument('--n-cores',
default=1,
help='Number of CPU threads to be used (Default: 1)',
type=int)
parser.add_argument('--zip',
help='Create one zip file per filetype')
parser.add_argument('-v', '--version',
action='version',
help='Returns the current version of the file setup pipeline', # noqa
version='%(prog)s {}'.format(__version__))
args = parser.parse_args()
# Set some tricky default values and check for insufficient input
if args.log_dir is None:
args.log_dir = args.output_dir
if args.n_cores < 1:
raise Exception('--n-cores must be greater or equal 1')
if args.mem_mb is None:
max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
args.mem_mb = min(args.n_cores * 2048, max_mem_mb)
if args.mem_mb < 2048:
raise Exception('--mem-mb must be greater or equal 2048')
if args.zip is not None and args.zip.lower().endswith('.zip'):
# Remove .zip file extension if provided
args.zip = args.zip[:-4]
args.zip = args.zip if args.zip else 'output'
return args
def main():
args = parse_args()
file_setup_pipeline = FileSetupPipeline(args.input_dir, args.output_dir, args.zip) # noqa
retval = file_setup_pipeline.run(dataDirRoot=args.log_dir, memMb=args.mem_mb, nCores=args.n_cores) # noqa
sys.exit(retval)
if __name__ == '__main__':
main()