mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/file-setup.git
				synced 2025-10-31 10:42:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			191 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			191 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python2.7
 | |
| # coding=utf-8
 | |
| 
 | |
| ''' File setup pipeline for image file merging. '''
 | |
| __version__ = '0.1.0'
 | |
| 
 | |
| from argparse import ArgumentParser
 | |
| from pyflow import WorkflowRunner
 | |
| import json
 | |
| import os
 | |
| import sys
 | |
| 
 | |
| 
 | |
| class PipelineJob:
 | |
|     '''
 | |
|     File setup pipeline job class.
 | |
| 
 | |
|     Each input directory containing images is represented here together with
 | |
|     all necessary information for the pipeline to process it.
 | |
| 
 | |
|     Arguments:
 | |
|     dir -- Path to the directory
 | |
|     output_dir -- Path to a directory, where job results are stored
 | |
|     '''
 | |
| 
 | |
|     def __init__(self, dir, output_dir):
 | |
|         self.dir = dir
 | |
|         self.name = os.path.basename(dir)
 | |
|         self.output_dir = output_dir
 | |
| 
 | |
| 
 | |
| class CreatePDFWorkflow(WorkflowRunner):
 | |
|     def __init__(self, job):
 | |
|         self.job = job
 | |
| 
 | |
|     def workflow(self):
 | |
|         '''
 | |
|         ' ##################################################
 | |
|         ' # convert                                        #
 | |
|         ' ##################################################
 | |
|         '''
 | |
|         n_cores = min(2, self.getNCores())
 | |
|         mem_mb = min(n_cores * 256, self.getMemMb())
 | |
|         cmd = 'ls -dv "{}/"* > "{}"'.format(
 | |
|             os.path.join(self.job.dir),
 | |
|             os.path.join(self.job.output_dir, 'inputs.txt')
 | |
|         )
 | |
|         cmd += ' && '
 | |
|         cmd += 'convert "@{}" "{}"'.format(
 | |
|             os.path.join(self.job.output_dir, 'inputs.txt'),
 | |
|             os.path.join(self.job.output_dir, '{}.pdf'.format(self.job.name))
 | |
|         )
 | |
|         cmd += ' && '
 | |
|         cmd += 'rm "{}"'.format(
 | |
|             os.path.join(self.job.output_dir, 'inputs.txt')
 | |
|         )
 | |
|         self.addTask(
 | |
|             'convert',
 | |
|             command=cmd,
 | |
|             memMb=mem_mb,
 | |
|             nCores=n_cores
 | |
|         )
 | |
| 
 | |
| 
 | |
| class MainWorkflow(WorkflowRunner):
 | |
|     def __init__(self, input_dir, output_dir):
 | |
|         self.input_dir = input_dir
 | |
|         self.output_dir = output_dir
 | |
|         self.jobs = []
 | |
| 
 | |
|     def collect_jobs(self):
 | |
|         self.jobs = []
 | |
|         for dir in os.listdir(self.input_dir):
 | |
|             if not os.path.isdir(os.path.join(self.input_dir, dir)):
 | |
|                 continue
 | |
|             if not os.listdir(os.path.join(self.input_dir, dir)):
 | |
|                 continue
 | |
|             # TODO: Filter for file types within the directory
 | |
|             job = PipelineJob(
 | |
|                 os.path.join(self.input_dir, dir),
 | |
|                 os.path.join(self.output_dir, dir)
 | |
|             )
 | |
|             self.jobs.append(job)
 | |
| 
 | |
|     def workflow(self):
 | |
|         if not self.jobs:
 | |
|             return
 | |
| 
 | |
|         # Create output and temporary directories
 | |
|         for job in self.jobs:
 | |
|             os.mkdir(job.output_dir)
 | |
| 
 | |
|         '''
 | |
|         ' ##################################################
 | |
|         ' # create-pdf                                     #
 | |
|         ' ##################################################
 | |
|         '''
 | |
|         create_pdf_tasks = []
 | |
|         for i, job in enumerate(self.jobs):
 | |
|             task = self.addWorkflowTask(
 | |
|                 'create_pdf_-_{}'.format(i),
 | |
|                 CreatePDFWorkflow(job)
 | |
|             )
 | |
|             create_pdf_tasks.append(task)
 | |
| 
 | |
|         self.waitForTasks()
 | |
|         outputs = []
 | |
|         for job in self.jobs:
 | |
|             # Track output files
 | |
|             relative_output_dir = os.path.relpath(
 | |
|                 job.output_dir,
 | |
|                 start=self.output_dir
 | |
|             )
 | |
|             outputs.append(
 | |
|                 {
 | |
|                     'description': 'PDF file without text layer.',
 | |
|                     'file': os.path.join(
 | |
|                         relative_output_dir,
 | |
|                         '{}.pdf'.format(job.name)
 | |
|                     ),
 | |
|                     'mimetype': 'application/pdf'
 | |
|                 }
 | |
|             )
 | |
|         with open(os.path.join(self.output_dir, 'outputs.json'), 'w') as f:
 | |
|             json.dump(outputs, f, indent=4)
 | |
| 
 | |
| 
 | |
| def parse_args():
 | |
|     parser = ArgumentParser(description='Pipeline for merging images')
 | |
|     parser.add_argument(
 | |
|         '-i', '--input-dir',
 | |
|         help='Input directory',
 | |
|         required=True
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         '-o', '--output-dir',
 | |
|         help='Output directory',
 | |
|         required=True
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         '--log-dir',
 | |
|         help='Logging directory (Default: --output-dir)'
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         '--mem-mb',
 | |
|         help='Amount of system memory to be used '
 | |
|              '(Default: min(--n-cores * 256, available system memory))',
 | |
|         type=int
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         '--n-cores',
 | |
|         default=1,
 | |
|         help='Number of CPU threads to be used',
 | |
|         type=int
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         '-v', '--version',
 | |
|         action='version',
 | |
|         help='Returns the current version of the file setup pipeline',
 | |
|         version='%(prog)s {}'.format(__version__)
 | |
|     )
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     # Set some tricky default values and check for insufficient input
 | |
|     if args.log_dir is None:
 | |
|         args.log_dir = args.output_dir
 | |
|     if args.n_cores < 1:
 | |
|         raise Exception('--n-cores must be greater or equal 1')
 | |
|     if args.mem_mb is None:
 | |
|         max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
 | |
|         args.mem_mb = min(args.n_cores * 256, max_mem_mb)
 | |
|     if args.mem_mb < 256:
 | |
|         raise Exception('--mem-mb must be greater or equal 256')
 | |
|     return args
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     args = parse_args()
 | |
|     main_workflow = MainWorkflow(args.input_dir, args.output_dir)
 | |
|     main_workflow.collect_jobs()
 | |
|     retval = main_workflow.run(
 | |
|         dataDirRoot=args.log_dir,
 | |
|         memMb=args.mem_mb,
 | |
|         nCores=args.n_cores
 | |
|     )
 | |
|     sys.exit(retval)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |