mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/file-setup.git
synced 2024-12-25 02:44:18 +00:00
191 lines
5.5 KiB
Python
Executable File
191 lines
5.5 KiB
Python
Executable File
#!/usr/bin/env python2.7
|
|
# coding=utf-8
|
|
|
|
''' File setup pipeline for image file merging. '''
|
|
__version__ = '0.1.0'
|
|
|
|
from argparse import ArgumentParser
|
|
from pyflow import WorkflowRunner
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
|
|
class PipelineJob:
|
|
'''
|
|
File setup pipeline job class.
|
|
|
|
Each input directory containing images is represented here together with
|
|
all necessary information for the pipeline to process it.
|
|
|
|
Arguments:
|
|
dir -- Path to the directory
|
|
output_dir -- Path to a directory, where job results are stored
|
|
'''
|
|
|
|
def __init__(self, dir, output_dir):
|
|
self.dir = dir
|
|
self.name = os.path.basename(dir)
|
|
self.output_dir = output_dir
|
|
|
|
|
|
class CreatePDFWorkflow(WorkflowRunner):
|
|
def __init__(self, job):
|
|
self.job = job
|
|
|
|
def workflow(self):
|
|
'''
|
|
' ##################################################
|
|
' # convert #
|
|
' ##################################################
|
|
'''
|
|
n_cores = min(2, self.getNCores())
|
|
mem_mb = min(n_cores * 256, self.getMemMb())
|
|
cmd = 'ls -dv "{}/"* > "{}"'.format(
|
|
os.path.join(self.job.dir),
|
|
os.path.join(self.job.output_dir, 'inputs.txt')
|
|
)
|
|
cmd += ' && '
|
|
cmd += 'convert "@{}" "{}"'.format(
|
|
os.path.join(self.job.output_dir, 'inputs.txt'),
|
|
os.path.join(self.job.output_dir, '{}.pdf'.format(self.job.name))
|
|
)
|
|
cmd += ' && '
|
|
cmd += 'rm "{}"'.format(
|
|
os.path.join(self.job.output_dir, 'inputs.txt')
|
|
)
|
|
self.addTask(
|
|
'convert',
|
|
command=cmd,
|
|
memMb=mem_mb,
|
|
nCores=n_cores
|
|
)
|
|
|
|
|
|
class MainWorkflow(WorkflowRunner):
|
|
def __init__(self, input_dir, output_dir):
|
|
self.input_dir = input_dir
|
|
self.output_dir = output_dir
|
|
self.jobs = []
|
|
|
|
def collect_jobs(self):
|
|
self.jobs = []
|
|
for dir in os.listdir(self.input_dir):
|
|
if not os.path.isdir(os.path.join(self.input_dir, dir)):
|
|
continue
|
|
if not os.listdir(os.path.join(self.input_dir, dir)):
|
|
continue
|
|
# TODO: Filter for file types within the directory
|
|
job = PipelineJob(
|
|
os.path.join(self.input_dir, dir),
|
|
os.path.join(self.output_dir, dir)
|
|
)
|
|
self.jobs.append(job)
|
|
|
|
def workflow(self):
|
|
if not self.jobs:
|
|
return
|
|
|
|
# Create output and temporary directories
|
|
for job in self.jobs:
|
|
os.mkdir(job.output_dir)
|
|
|
|
'''
|
|
' ##################################################
|
|
' # create-pdf #
|
|
' ##################################################
|
|
'''
|
|
create_pdf_tasks = []
|
|
for i, job in enumerate(self.jobs):
|
|
task = self.addWorkflowTask(
|
|
'create_pdf_-_{}'.format(i),
|
|
CreatePDFWorkflow(job)
|
|
)
|
|
create_pdf_tasks.append(task)
|
|
|
|
self.waitForTasks()
|
|
outputs = []
|
|
for job in self.jobs:
|
|
# Track output files
|
|
relative_output_dir = os.path.relpath(
|
|
job.output_dir,
|
|
start=self.output_dir
|
|
)
|
|
outputs.append(
|
|
{
|
|
'description': 'PDF file without text layer.',
|
|
'file': os.path.join(
|
|
relative_output_dir,
|
|
'{}.pdf'.format(job.name)
|
|
),
|
|
'mimetype': 'application/pdf'
|
|
}
|
|
)
|
|
with open(os.path.join(self.output_dir, 'outputs.json'), 'w') as f:
|
|
json.dump(outputs, f, indent=4)
|
|
|
|
|
|
def parse_args():
|
|
parser = ArgumentParser(description='Pipeline for merging images')
|
|
parser.add_argument(
|
|
'-i', '--input-dir',
|
|
help='Input directory',
|
|
required=True
|
|
)
|
|
parser.add_argument(
|
|
'-o', '--output-dir',
|
|
help='Output directory',
|
|
required=True
|
|
)
|
|
parser.add_argument(
|
|
'--log-dir',
|
|
help='Logging directory (Default: --output-dir)'
|
|
)
|
|
parser.add_argument(
|
|
'--mem-mb',
|
|
help='Amount of system memory to be used '
|
|
'(Default: min(--n-cores * 256, available system memory))',
|
|
type=int
|
|
)
|
|
parser.add_argument(
|
|
'--n-cores',
|
|
default=1,
|
|
help='Number of CPU threads to be used',
|
|
type=int
|
|
)
|
|
parser.add_argument(
|
|
'-v', '--version',
|
|
action='version',
|
|
help='Returns the current version of the file setup pipeline',
|
|
version='%(prog)s {}'.format(__version__)
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Set some tricky default values and check for insufficient input
|
|
if args.log_dir is None:
|
|
args.log_dir = args.output_dir
|
|
if args.n_cores < 1:
|
|
raise Exception('--n-cores must be greater or equal 1')
|
|
if args.mem_mb is None:
|
|
max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
|
|
args.mem_mb = min(args.n_cores * 256, max_mem_mb)
|
|
if args.mem_mb < 256:
|
|
raise Exception('--mem-mb must be greater or equal 256')
|
|
return args
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
main_workflow = MainWorkflow(args.input_dir, args.output_dir)
|
|
main_workflow.collect_jobs()
|
|
retval = main_workflow.run(
|
|
dataDirRoot=args.log_dir,
|
|
memMb=args.mem_mb,
|
|
nCores=args.n_cores
|
|
)
|
|
sys.exit(retval)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|