Update Pipeline structure

This commit is contained in:
Patrick Jentsch 2022-01-18 12:58:42 +01:00
parent 3c9a800886
commit cdb4f1889c
5 changed files with 175 additions and 180 deletions

View File

@ -9,38 +9,32 @@ ENV LANG=C.UTF-8
RUN apt-get update \ RUN apt-get update \
&& apt-get install --no-install-recommends --yes \ && apt-get install --no-install-recommends --yes \
wget imagemagick \
procps \
wget \
&& mv /etc/ImageMagick-6/policy.xml /etc/ImageMagick-6/policy.xml.bak
# Install the NLP pipeline and it's dependencies # # Install the File setup pipeline and it's dependencies #
## Install pyFlow ## ## Install pyFlow ##
ENV PYFLOW_VERSION=1.1.20 ENV PYFLOW_VERSION=1.1.20
RUN wget --no-check-certificate --quiet \ RUN wget --no-check-certificate --quiet \
"https://github.com/Illumina/pyflow/releases/download/v${PYFLOW_VERSION}/pyflow-${PYFLOW_VERSION}.tar.gz" \ "https://github.com/Illumina/pyflow/releases/download/v${PYFLOW_VERSION}/pyflow-${PYFLOW_VERSION}.tar.gz" \
&& tar -xzf "pyflow-${PYFLOW_VERSION}.tar.gz" \ && tar -xzf "pyflow-${PYFLOW_VERSION}.tar.gz" \
&& cd "pyflow-${PYFLOW_VERSION}" \ && cd "pyflow-${PYFLOW_VERSION}" \
&& apt-get install --no-install-recommends --yes \ && apt-get install --no-install-recommends --yes \
python2.7 \ python2.7 \
&& python2.7 setup.py build install \ && python2.7 setup.py build install \
&& cd .. \ && cd - > /dev/null \
&& rm -r "pyflow-${PYFLOW_VERSION}" "pyflow-${PYFLOW_VERSION}.tar.gz" && rm -r "pyflow-${PYFLOW_VERSION}" "pyflow-${PYFLOW_VERSION}.tar.gz"
## Further dependencies ## RUN rm -r /var/lib/apt/lists/*
RUN apt-get install --no-install-recommends --yes \
imagemagick \
procps \
python3.7 \
zip \
&& mv /etc/ImageMagick-6/policy.xml /etc/ImageMagick-6/policy.xml.bak
## Install Pipeline ## ## Install Pipeline ##
COPY file-setup /usr/local/bin COPY file-setup /usr/local/bin
RUN rm -r /var/lib/apt/lists/*
ENTRYPOINT ["file-setup"] ENTRYPOINT ["file-setup"]
CMD ["--help"] CMD ["--help"]

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Bielefeld University - CRC 1288 - INF
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,37 +1,37 @@
# File setup # File setup
This software implements a parallelized pipeline to setup image files. It is used for nopaque's File setup service but you can also use it standalone, for that purpose a convenient wrapper script is provided. This software implements a parallelized pipeline to setup image files. It is used for nopaque's File setup service but you can also use it standalone, for that purpose a convenient wrapper script is provided. The pipeline is designed to run on Linux operating systems, but with some tweaks it should also run on Windows with WSL installed.
## Software used in this pipeline implementation ## Software used in this pipeline implementation
- Official Debian Docker image (buster-slim) and programs from its free repositories: https://hub.docker.com/_/debian
- Official Debian Docker image (buster-slim): https://hub.docker.com/_/debian
- Software from Debian Buster's free repositories
## Use this image ## Installation
1. Create input and output directories for the pipeline. 1. Install Docker and Python 3.
``` bash 2. Clone this repository: `git clone https://gitlab.ub.uni-bielefeld.de/sfb1288inf/file-setup.git`
mkdir -p /<my_data_location>/input /<my_data_location>/output 2. Build the Docker image: `docker build -t gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:v0.1.0 file-setup`
3. Add the wrapper script (`wrapper/filesetup` relative to this README file) to your `${PATH}`.
4. Create working directories for the pipeline: `mkdir -p /<my_data_location>/{input,output}`.
## Use the Pipeline
1. Place your images files inside a subdirectory in `/<my_data_location>/input`. It should look similar to this:
```
.
|-- input
| |-- alice_in_wonderland
| |-- page-1.png
| |-- page-2.png
| |-- ...
| `-- page-x.png
`-- output
``` ```
2. Place your images files inside a directory in `/<my_data_location>/input`.
3. Start the pipeline process. Check the pipeline help (`file-setup --help`) for more details. 3. Start the pipeline process. Check the pipeline help (`file-setup --help`) for more details.
``` ```bash
# Option one: Use the wrapper script
## Install the wrapper script (only on first run). Get it from https://gitlab.ub.uni-bielefeld.de/sfb1288inf/file-setup/-/raw/1.0.0/wrapper/file-setup, make it executeable and add it to your ${PATH}
cd /<my_data_location> cd /<my_data_location>
file-setup -i input -o output file-setup -i input -o output
# Option two: Classic Docker style
docker run \
--rm \
-it \
-u $(id -u $USER):$(id -g $USER) \
-v /<my_data_location>/input:/input \
-v /<my_data_location>/output:/output \
gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:1.0.0 \
-i /input \
-o /output
``` ```
4. Check your results in the `/<my_data_location>/output` directory. 4. Check your results in the `/<my_data_location>/output` directory.

View File

@ -1,29 +1,28 @@
#!/usr/bin/env python2.7 #!/usr/bin/env python2.7
# coding=utf-8 # coding=utf-8
"""A file setup pipeline for image file merging.""" ''' File setup pipeline for image file merging. '''
__version__ = '0.1.0'
__author__ = 'Patrick Jentsch <p.jentsch@uni-bielefeld.de>,' \
'Stephan Porada <porada@posteo.de>'
__version__ = '1.0.0'
from argparse import ArgumentParser from argparse import ArgumentParser
from pyflow import WorkflowRunner from pyflow import WorkflowRunner
import json
import multiprocessing
import os import os
import sys import sys
class FileSetupPipelineJob: class PipelineJob:
"""An file setup pipeline job class '''
File setup pipeline job class.
Each image containing input directory of the pipeline is represented as an Each input directory containing images is represented here together with
file setup pipeline job, which holds all necessary information for the all necessary information for the pipeline to process it.
pipeline to process it.
Arguments: Arguments:
dir -- Path to the directory dir -- Path to the directory
output_dir -- Path to a directory, where job results a stored output_dir -- Path to a directory, where job results are stored
""" '''
def __init__(self, dir, output_dir): def __init__(self, dir, output_dir):
self.dir = dir self.dir = dir
@ -31,134 +30,118 @@ class FileSetupPipelineJob:
self.output_dir = output_dir self.output_dir = output_dir
class FileSetupPipeline(WorkflowRunner): class CreatePDFWorkflow(WorkflowRunner):
def __init__(self, input_dir, output_dir, zip): def __init__(self, job):
self.job = job
def workflow(self):
'''
' ##################################################
' # convert #
' ##################################################
'''
n_cores = min(2, self.getNCores())
mem_mb = min(n_cores * 256, self.getMemMb())
cmd = 'ls -dv "{}/"* > "{}"'.format(
os.path.join(self.job.dir),
os.path.join(self.job.output_dir, 'inputs.txt')
)
cmd += ' && '
cmd += 'convert "@{}" "{}"'.format(
os.path.join(self.job.output_dir, 'inputs.txt'),
os.path.join(self.job.output_dir, '{}.pdf'.format(self.job.name))
)
cmd += ' && '
cmd += 'rm "{}"'.format(os.path.join(self.job.output_dir, 'inputs.txt')) # noqa
self.addTask(
'convert',
command=cmd,
memMb=mem_mb,
nCores=n_cores
)
class MainWorkflow(WorkflowRunner):
def __init__(self, input_dir, output_dir):
self.input_dir = input_dir self.input_dir = input_dir
self.output_dir = output_dir self.output_dir = output_dir
self.zip = zip self.jobs = []
self.jobs = collect_jobs(self.input_dir, self.output_dir) self.collect_jobs()
def collect_jobs(self):
for dir in os.listdir(self.input_dir):
if not os.path.isdir(os.path.join(self.input_dir, dir)):
continue
if not os.listdir(os.path.join(self.input_dir, dir)):
continue
# TODO: Filter for file types within the directory
job = PipelineJob(
os.path.join(self.input_dir, dir),
os.path.join(self.output_dir, dir)
)
self.jobs.append(job)
def workflow(self): def workflow(self):
if not self.jobs: if not self.jobs:
return return
# Create output and temporary directories
for job in self.jobs:
os.mkdir(job.output_dir)
''' '''
' ################################################## ' ##################################################
' # setup output directory # ' # create-pdf #
' ################################################## ' ##################################################
''' '''
setup_output_directory_tasks = [] create_pdf_tasks = []
for i, job in enumerate(self.jobs): for i, job in enumerate(self.jobs):
cmd = 'mkdir -p "{}"'.format(job.output_dir) task = self.addWorkflowTask(
lbl = 'setup_output_directory_-_{}'.format(i) 'create_pdf_-_{}'.format(i),
task = self.addTask(command=cmd, label=lbl) CreatePDFWorkflow(job)
setup_output_directory_tasks.append(task) )
create_pdf_tasks.append(task)
''' self.waitForTasks()
' ################################################## for job in self.jobs:
' # pre file setup # # Track output files
' ################################################## relative_output_dir = os.path.relpath(job.output_dir, start=self.output_dir) # noqa
''' self.output_files.append(
pre_file_setup_tasks = [] {
for i, job in enumerate(self.jobs): 'description': 'PDF file without text layer.',
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa 'file': os.path.join(relative_output_dir, '{}.pdf'.format(job.name)), # noqa
cmd = 'ls -dQv "{}/"* >> "{}"'.format(job.dir, input_file) 'mimetype': 'application/pdf'
deps = 'setup_output_directory_-_{}'.format(i) }
lbl = 'pre_file_setup_-_{}'.format(i) )
task = self.addTask(command=cmd, dependencies=deps, label=lbl) with open(os.path.join(self.output_dir, 'output_records.json'), 'w') as f: # noqa
pre_file_setup_tasks.append(task) json.dump(self.output_files, f, indent=4)
'''
' ##################################################
' # file setup #
' ##################################################
'''
file_setup_tasks = []
n_cores = max(1, int(self.getNCores() / len(self.jobs)))
for i, job in enumerate(self.jobs):
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa
output_file = os.path.join(job.output_dir, '{}.pdf'.format(job.name)) # noqa
cmd = 'convert "@{}" "{}"'.format(input_file, output_file)
deps = 'pre_file_setup_-_{}'.format(i)
lbl = 'file_setup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl,
nCores=n_cores)
file_setup_tasks.append(task)
'''
' ##################################################
' # post file setup #
' ##################################################
'''
post_file_setup_tasks = []
for i, job in enumerate(self.jobs):
input_file = os.path.join(job.output_dir, 'file_setup_input_files.txt') # noqa
cmd = 'rm "{}"'.format(input_file)
deps = 'file_setup_-_{}'.format(i)
lbl = 'post_file_setup_-_{}'.format(i)
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
post_file_setup_tasks.append(task)
'''
' ##################################################
' # zip creation #
' ##################################################
'''
zip_creation_tasks = []
if self.zip is not None:
cmd = 'cd "{}"'.format(self.output_dir)
cmd += ' && '
cmd += 'zip'
cmd += ' -r'
cmd += ' "{}.zip" .'.format(self.zip)
cmd += ' -x "pyflow.data*"'
cmd += ' -i "*.pdf"'
cmd += ' && '
cmd += 'cd -'
deps = file_setup_tasks
lbl = 'zip_creation'
task = self.addTask(command=cmd, dependencies=deps, label=lbl)
zip_creation_tasks.append(task)
def collect_jobs(input_dir, output_dir):
jobs = []
for dir in os.listdir(input_dir):
if not os.path.isdir(os.path.join(input_dir, dir)):
continue
# TODO: Filter for file types
if not os.listdir(os.path.join(input_dir, dir)):
continue
job = FileSetupPipelineJob(os.path.join(input_dir, dir),
os.path.join(output_dir, dir))
jobs.append(job)
return jobs
def parse_args(): def parse_args():
parser = ArgumentParser(description='A file setup pipeline for image file merging', # noqa parser = ArgumentParser(description='Pipeline for merging images')
prog='File setup pipeline') parser.add_argument(
parser.add_argument('-i', '--input-dir', '-i', '--input-dir', help='Input directory', required=True)
help='Input directory', parser.add_argument(
required=True) '-o', '--output-dir', help='Output directory', required=True)
parser.add_argument('-o', '--output-dir', parser.add_argument(
help='Output directory', '--log-dir', help='Logging directory (Default: --output-dir)')
required=True) parser.add_argument(
parser.add_argument('--log-dir', '--mem-mb',
help='Logging directory') help='Amount of system memory to be used (Default: min(--n-cores * 256, available system memory))', # noqa
parser.add_argument('--mem-mb', type=int
help='Amount of system memory to be used (Default: min(--n-cores * 2048, available system memory))', # noqa )
type=int) parser.add_argument(
parser.add_argument('--n-cores', '--n-cores',
default=1, default=min(2, multiprocessing.cpu_count()),
help='Number of CPU threads to be used (Default: 1)', help='Number of CPU threads to be used (Default: min(2, CPU count))',
type=int) type=int
parser.add_argument('--zip', )
help='Create one zip file per filetype') parser.add_argument(
parser.add_argument('-v', '--version', '-v', '--version',
action='version', action='version',
help='Returns the current version of the file setup pipeline', # noqa help='Returns the current version of the file setup pipeline',
version='%(prog)s {}'.format(__version__)) version='%(prog)s {}'.format(__version__)
)
args = parser.parse_args() args = parser.parse_args()
# Set some tricky default values and check for insufficient input # Set some tricky default values and check for insufficient input
@ -168,20 +151,17 @@ def parse_args():
raise Exception('--n-cores must be greater or equal 1') raise Exception('--n-cores must be greater or equal 1')
if args.mem_mb is None: if args.mem_mb is None:
max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0]) max_mem_mb = int(os.popen('free -t -m').readlines()[-1].split()[1:][0])
args.mem_mb = min(args.n_cores * 2048, max_mem_mb) args.mem_mb = min(args.n_cores * 256, max_mem_mb)
if args.mem_mb < 2048: if args.mem_mb < 256:
raise Exception('--mem-mb must be greater or equal 2048') raise Exception('--mem-mb must be greater or equal 256')
if args.zip is not None and args.zip.lower().endswith('.zip'):
# Remove .zip file extension if provided
args.zip = args.zip[:-4]
args.zip = args.zip if args.zip else 'output'
return args return args
def main(): def main():
args = parse_args() args = parse_args()
file_setup_pipeline = FileSetupPipeline(args.input_dir, args.output_dir, args.zip) # noqa main_workflow = MainWorkflow(args.input_dir, args.output_dir)
retval = file_setup_pipeline.run(dataDirRoot=args.log_dir, memMb=args.mem_mb, nCores=args.n_cores) # noqa retval = main_workflow.run(
dataDirRoot=args.log_dir, memMb=args.mem_mb, nCores=args.n_cores) # noqa
sys.exit(retval) sys.exit(retval)

View File

@ -6,7 +6,7 @@ import os
import subprocess import subprocess
import sys import sys
CONTAINER_IMAGE = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:1.0.0' CONTAINER_IMAGE = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/file-setup:v0.1.0' # noqa
CONTAINER_INPUT_DIR = '/input' CONTAINER_INPUT_DIR = '/input'
CONTAINER_OUTPUT_DIR = '/output' CONTAINER_OUTPUT_DIR = '/output'
CONTAINER_LOG_DIR = '/logs' CONTAINER_LOG_DIR = '/logs'
@ -19,17 +19,17 @@ parser.add_argument('-o', '--output-dir')
parser.add_argument('--log-dir') parser.add_argument('--log-dir')
args, remaining_args = parser.parse_known_args() args, remaining_args = parser.parse_known_args()
cmd = ['docker', 'run', '--rm', '-it', '-u', '{}:{}'.format(UID, GID)] cmd = ['docker', 'run', '--rm', '-it', '-u', f'{UID}:{GID}']
if args.input_dir is not None: if args.input_dir is not None:
mapping = os.path.abspath(args.input_dir) + ':' + CONTAINER_INPUT_DIR mapping = f'{os.path.abspath(args.input_dir)}:{CONTAINER_INPUT_DIR}'
cmd += ['-v', mapping] cmd += ['-v', mapping]
remaining_args += ['-i', CONTAINER_INPUT_DIR] remaining_args += ['-i', CONTAINER_INPUT_DIR]
if args.output_dir is not None: if args.output_dir is not None:
mapping = os.path.abspath(args.output_dir) + ':' + CONTAINER_OUTPUT_DIR mapping = f'{os.path.abspath(args.output_dir)}:{CONTAINER_OUTPUT_DIR}'
cmd += ['-v', mapping] cmd += ['-v', mapping]
remaining_args += ['-o', CONTAINER_OUTPUT_DIR] remaining_args += ['-o', CONTAINER_OUTPUT_DIR]
if args.log_dir is not None: if args.log_dir is not None:
mapping = os.path.abspath(args.log_dir) + ':' + CONTAINER_LOG_DIR mapping = '{os.path.abspath(args.log_dir)}:{CONTAINER_LOG_DIR}'
cmd += ['-v', mapping] cmd += ['-v', mapping]
remaining_args += ['--log-dir', CONTAINER_LOG_DIR] remaining_args += ['--log-dir', CONTAINER_LOG_DIR]
cmd.append(CONTAINER_IMAGE) cmd.append(CONTAINER_IMAGE)