From 3c9a800886016541dc48977163083feb81788a05 Mon Sep 17 00:00:00 2001 From: Patrick Jentsch Date: Mon, 12 Apr 2021 14:55:14 +0200 Subject: [PATCH] Use pyFlow for file setup service --- .gitlab-ci.yml | 11 +-- Dockerfile | 31 ++++++- README.md | 37 ++++++++ file-setup | 227 +++++++++++++++++++++++++++++++++------------ wrapper/file-setup | 33 ++++--- 5 files changed, 254 insertions(+), 85 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3d97cac..9ee7b2d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,8 +1,5 @@ image: docker:19.03.13 -variables: - DOCKER_TLS_CERTDIR: "/certs" - services: - docker:19.03.13-dind @@ -10,6 +7,10 @@ stages: - build - push +variables: + DOCKER_TLS_CERTDIR: "/certs" + INTERMEDIATE_IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME-$CI_COMMIT_SHA + .reg_setup: before_script: - apk add --no-cache curl @@ -28,8 +29,6 @@ build_image: stage: build tags: - docker - variables: - INTERMEDIATE_IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA push_master: extends: @@ -47,7 +46,6 @@ push_master: - docker variables: IMAGE_TAG: $CI_REGISTRY_IMAGE:latest - INTERMEDIATE_IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA push_other: extends: @@ -68,4 +66,3 @@ push_other: - docker variables: IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME - INTERMEDIATE_IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA diff --git a/Dockerfile b/Dockerfile index a8d29fd..9cccf1f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM debian:buster-slim -LABEL authors="Patrick Jentsch , Stephan Porada " +LABEL authors="Patrick Jentsch , Stephan Porada " ENV LANG=C.UTF-8 @@ -9,17 +9,38 @@ ENV LANG=C.UTF-8 RUN apt-get update \ && apt-get install --no-install-recommends --yes \ + wget + + +# Install the NLP pipeline and it's dependencies # +## Install pyFlow ## +ENV PYFLOW_VERSION=1.1.20 +RUN wget --no-check-certificate --quiet \ + "https://github.com/Illumina/pyflow/releases/download/v${PYFLOW_VERSION}/pyflow-${PYFLOW_VERSION}.tar.gz" \ +&& tar -xzf "pyflow-${PYFLOW_VERSION}.tar.gz" \ +&& cd "pyflow-${PYFLOW_VERSION}" \ +&& apt-get install --no-install-recommends --yes \ + python2.7 \ +&& python2.7 setup.py build install \ +&& cd .. \ +&& rm -r "pyflow-${PYFLOW_VERSION}" "pyflow-${PYFLOW_VERSION}.tar.gz" + + +## Further dependencies ## +RUN apt-get install --no-install-recommends --yes \ imagemagick \ + procps \ python3.7 \ zip \ - && rm -r /var/lib/apt/lists/* - - -RUN mv /etc/ImageMagick-6/policy.xml /etc/ImageMagick-6/policy.xml.bak + && mv /etc/ImageMagick-6/policy.xml /etc/ImageMagick-6/policy.xml.bak +## Install Pipeline ## COPY file-setup /usr/local/bin +RUN rm -r /var/lib/apt/lists/* + + ENTRYPOINT ["file-setup"] CMD ["--help"] diff --git a/README.md b/README.md index e69de29..ec53ab0 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,37 @@ +# File setup + +This software implements a parallelized pipeline to setup image files. It is used for nopaque's File setup service but you can also use it standalone, for that purpose a convenient wrapper script is provided. + +## Software used in this pipeline implementation +- Official Debian Docker image (buster-slim) and programs from its free repositories: https://hub.docker.com/_/debian + + +## Use this image + +1. Create input and output directories for the pipeline. +``` bash +mkdir -p //input //output +``` + +2. Place your images files inside a directory in `//input`. + +3. Start the pipeline process. Check the pipeline help (`file-setup --help`) for more details. +``` +# Option one: Use the wrapper script +## Install the wrapper script (only on first run). Get it from https://gitlab.ub.uni-bielefeld.de/sfb1288inf/file-setup/-/raw/1.0.0/wrapper/file-setup, make it executeable and add it to your ${PATH} +cd / +file-setup -i input -o output + +# Option two: Classic Docker style +docker run \ + --rm \ + -it \ + -u $(id -u $USER):$(id -g $USER) \ + -v //input:/input \ + -v //output:/output \ + gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:1.0.0 \ + -i /input \ + -o /output +``` + +4. Check your results in the `//output` directory. diff --git a/file-setup b/file-setup index 8832cc4..8ccb27f 100755 --- a/file-setup +++ b/file-setup @@ -1,79 +1,188 @@ -#!/usr/bin/env python3.7 +#!/usr/bin/env python2.7 # coding=utf-8 +"""A file setup pipeline for image file merging.""" -""" -file-setup - -Usage: For usage instructions run with option --help -Authors: Patrick Jentsch -""" +__author__ = 'Patrick Jentsch ,' \ + 'Stephan Porada ' +__version__ = '1.0.0' from argparse import ArgumentParser +from pyflow import WorkflowRunner import os -import re -import subprocess +import sys -def parse_arguments(): - parser = ArgumentParser(description='Merge images (JPEG, PNG or TIFF) into one PDF file.') - parser.add_argument('-i', '--input-directory', +class FileSetupPipelineJob: + """An file setup pipeline job class + + Each image containing input directory of the pipeline is represented as an + file setup pipeline job, which holds all necessary information for the + pipeline to process it. + + Arguments: + dir -- Path to the directory + output_dir -- Path to a directory, where job results a stored + """ + + def __init__(self, dir, output_dir): + self.dir = dir + self.name = os.path.basename(dir) + self.output_dir = output_dir + + +class FileSetupPipeline(WorkflowRunner): + def __init__(self, input_dir, output_dir, zip): + self.input_dir = input_dir + self.output_dir = output_dir + self.zip = zip + self.jobs = collect_jobs(self.input_dir, self.output_dir) + + def workflow(self): + if not self.jobs: + return + + ''' + ' ################################################## + ' # setup output directory # + ' ################################################## + ''' + setup_output_directory_tasks = [] + for i, job in enumerate(self.jobs): + cmd = 'mkdir -p "{}"'.format(job.output_dir) + lbl = 'setup_output_directory_-_{}'.format(i) + task = self.addTask(command=cmd, label=lbl) + setup_output_directory_tasks.append(task) + + ''' + ' ################################################## + ' # pre file setup # + ' ################################################## + ''' + pre_file_setup_tasks = [] + for i, job in enumerate(self.jobs): + input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa + cmd = 'ls -dQv "{}/"* >> "{}"'.format(job.dir, input_file) + deps = 'setup_output_directory_-_{}'.format(i) + lbl = 'pre_file_setup_-_{}'.format(i) + task = self.addTask(command=cmd, dependencies=deps, label=lbl) + pre_file_setup_tasks.append(task) + + ''' + ' ################################################## + ' # file setup # + ' ################################################## + ''' + file_setup_tasks = [] + n_cores = max(1, int(self.getNCores() / len(self.jobs))) + for i, job in enumerate(self.jobs): + input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa + output_file = os.path.join(job.output_dir, '{}.pdf'.format(job.name)) # noqa + cmd = 'convert "@{}" "{}"'.format(input_file, output_file) + deps = 'pre_file_setup_-_{}'.format(i) + lbl = 'file_setup_-_{}'.format(i) + task = self.addTask(command=cmd, dependencies=deps, label=lbl, + nCores=n_cores) + file_setup_tasks.append(task) + + ''' + ' ################################################## + ' # post file setup # + ' ################################################## + ''' + post_file_setup_tasks = [] + for i, job in enumerate(self.jobs): + input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa + cmd = 'rm "{}"'.format(input_file) + deps = 'file_setup_-_{}'.format(i) + lbl = 'post_file_setup_-_{}'.format(i) + task = self.addTask(command=cmd, dependencies=deps, label=lbl) + post_file_setup_tasks.append(task) + + ''' + ' ################################################## + ' # zip creation # + ' ################################################## + ''' + zip_creation_tasks = [] + if self.zip is not None: + cmd = 'cd "{}"'.format(self.output_dir) + cmd += ' && ' + cmd += 'zip' + cmd += ' -r' + cmd += ' "{}.zip" .'.format(self.zip) + cmd += ' -x "pyflow.data*"' + cmd += ' -i "*.pdf"' + cmd += ' && ' + cmd += 'cd -' + deps = file_setup_tasks + lbl = 'zip_creation' + task = self.addTask(command=cmd, dependencies=deps, label=lbl) + zip_creation_tasks.append(task) + + +def collect_jobs(input_dir, output_dir): + jobs = [] + for dir in os.listdir(input_dir): + if not os.path.isdir(os.path.join(input_dir, dir)): + continue + # TODO: Filter for file types + if not os.listdir(os.path.join(input_dir, dir)): + continue + job = FileSetupPipelineJob(os.path.join(input_dir, dir), + os.path.join(output_dir, dir)) + jobs.append(job) + return jobs + + +def parse_args(): + parser = ArgumentParser(description='A file setup pipeline for image file merging', # noqa + prog='File setup pipeline') + parser.add_argument('-i', '--input-dir', help='Input directory', required=True) - parser.add_argument('-o', '--output-directory', + parser.add_argument('-o', '--output-dir', help='Output directory', required=True) - parser.add_argument('-f', '--output-file-base', - help='output file base', - required=True) - parser.add_argument('--log-dir') - parser.add_argument('--zip') - return parser.parse_args() + parser.add_argument('--log-dir', + help='Logging directory') + parser.add_argument('--mem-mb', + help='Amount of system memory to be used (Default: min(--n-cores * 2048, available system memory))', # noqa + type=int) + parser.add_argument('--n-cores', + default=1, + help='Number of CPU threads to be used (Default: 1)', + type=int) + parser.add_argument('--zip', + help='Create one zip file per filetype') + parser.add_argument('-v', '--version', + action='version', + help='Returns the current version of the file setup pipeline', # noqa + version='%(prog)s {}'.format(__version__)) + args = parser.parse_args() - -def natural_sorted(iterable): - """ Sort the given list in the way that humans expect. - """ - convert = lambda text: int(text) if text.isdigit() else text - alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] - return sorted(iterable, key=alphanum_key) - - -def merge_images(input_dir, output_dir, output_file_base, zip): - try: - os.mkdir(output_dir) - except FileExistsError: - pass - files = filter(lambda x: x.lower().endswith(('.jpg', '.jpeg', '.png', '.tif', '.tiff')), - os.listdir(input_dir)) - files = natural_sorted(files) - files = map(lambda x: os.path.join(input_dir, x), files) - output_file = os.path.join(output_dir, '{}.pdf'.format(output_file_base)) - # Convert input files to a single PDF - cmd = 'convert "{}" "{}"'.format('" "'.join(files), output_file) - subprocess.run(cmd, shell=True) - # zip stuff - if zip is not None: + # Set some tricky default values and check for insufficient input + if args.log_dir is None: + args.log_dir = args.output_dir + if args.n_cores < 1: + raise Exception('--n-cores must be greater or equal 1') + if args.mem_mb is None: + max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0]) + args.mem_mb = min(args.n_cores * 2048, max_mem_mb) + if args.mem_mb < 2048: + raise Exception('--mem-mb must be greater or equal 2048') + if args.zip is not None and args.zip.lower().endswith('.zip'): # Remove .zip file extension if provided - if zip.lower().endswith('.zip'): - zip = zip[:-4] - zip = zip if zip else 'output' - cmd = 'cd "{}"'.format(output_dir) - cmd += ' && ' - cmd += 'zip' - cmd += ' "{}.zip" "{}.pdf"'.format(zip, output_file_base) - cmd += ' && ' - cmd += 'cd -' - subprocess.run(cmd, shell=True) + args.zip = args.zip[:-4] + args.zip = args.zip if args.zip else 'output' + return args def main(): - args = parse_arguments() - merge_images(args.input_directory, - args.output_directory, - args.output_file_base, - args.zip) + args = parse_args() + file_setup_pipeline = FileSetupPipeline(args.input_dir, args.output_dir, args.zip) # noqa + retval = file_setup_pipeline.run(dataDirRoot=args.log_dir, memMb=args.mem_mb, nCores=args.n_cores) # noqa + sys.exit(retval) if __name__ == '__main__': diff --git a/wrapper/file-setup b/wrapper/file-setup index 7dad642..e3ceb8f 100755 --- a/wrapper/file-setup +++ b/wrapper/file-setup @@ -4,30 +4,35 @@ from argparse import ArgumentParser import os import subprocess +import sys -CONTAINER_IMAGE = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:latest' +CONTAINER_IMAGE = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:1.0.0' CONTAINER_INPUT_DIR = '/input' CONTAINER_OUTPUT_DIR = '/output' +CONTAINER_LOG_DIR = '/logs' UID = str(os.getuid()) GID = str(os.getgid()) parser = ArgumentParser(add_help=False) -parser.add_argument('-i', '--input-directory') -parser.add_argument('-o', '--output-directory') +parser.add_argument('-i', '--input-dir') +parser.add_argument('-o', '--output-dir') +parser.add_argument('--log-dir') args, remaining_args = parser.parse_known_args() cmd = ['docker', 'run', '--rm', '-it', '-u', '{}:{}'.format(UID, GID)] -if args.output_directory is not None: - cmd += ['-v', '{}:{}'.format(os.path.abspath(args.output_directory), - CONTAINER_OUTPUT_DIR)] - remaining_args.insert(0, CONTAINER_OUTPUT_DIR) - remaining_args.insert(0, '-o') -if args.input_directory is not None: - cmd += ['-v', '{}:{}'.format(os.path.abspath(args.input_directory), - CONTAINER_INPUT_DIR)] - remaining_args.insert(0, CONTAINER_INPUT_DIR) - remaining_args.insert(0, '-i') +if args.input_dir is not None: + mapping = os.path.abspath(args.input_dir) + ':' + CONTAINER_INPUT_DIR + cmd += ['-v', mapping] + remaining_args += ['-i', CONTAINER_INPUT_DIR] +if args.output_dir is not None: + mapping = os.path.abspath(args.output_dir) + ':' + CONTAINER_OUTPUT_DIR + cmd += ['-v', mapping] + remaining_args += ['-o', CONTAINER_OUTPUT_DIR] +if args.log_dir is not None: + mapping = os.path.abspath(args.log_dir) + ':' + CONTAINER_LOG_DIR + cmd += ['-v', mapping] + remaining_args += ['--log-dir', CONTAINER_LOG_DIR] cmd.append(CONTAINER_IMAGE) cmd += remaining_args -subprocess.run(cmd) +sys.exit(subprocess.run(cmd).returncode)