#!/usr/bin/env python3 # coding=utf-8 import json import os import subprocess from threading import Thread import urllib.parse import urllib.request ERRORRMESSAGE = urllib.parse.quote("Bei der Verarbeitung der Daten ist ein Fehler aufgetreten.") VRE_MANAGER = "http://localhost:5000/vre/jobs" IMAGES = { "nlp": "gitlab.ub.uni-bielefeld.de:4567/pjentsch/vre_nlp_node", "ocr": "gitlab.ub.uni-bielefeld.de:4567/pjentsch/vre_ocr_node" } def manage_jobs(service): # Get queued jobs queued = json.loads( urllib.request.urlopen(VRE_MANAGER + "?service=" + service + "&status=queued").read().decode('utf-8') ) # Return if no jobs are available if len(queued) == 0: return # Get a list of compute nodes compute_nodes = subprocess.run( ["docker", "ps", "--filter", "ancestor=" + IMAGES[service], "--format", "{{.Names}}"], stdout=subprocess.PIPE ).stdout.split() # Filter occupied compute nodes out compute_nodes = [compute_node for compute_node in compute_nodes if bool(subprocess.run(["docker", "exec", compute_node, "test", "-f", "pyflow.data/active_pyflow_process.txt"]).returncode)] # Return if no compute nodes are available if len(compute_nodes) == 0: return # Assign jobs to compute nodes job_assignments = zip(queued, compute_nodes) for job_assignment in job_assignments: # Prepare and start the assigned jobs jobThread = Thread(target=start_job, args=(service, job_assignment[1], job_assignment[0])) jobThread.start() def start_job(service, compute_node, job): # Update job status to "running" urllib.request.urlopen( urllib.request.Request(url=VRE_MANAGER + "/" + job["id"] + "?status=running", method="PUT") ) # Start the service process = subprocess.run( ["docker", "exec", compute_node, service, "-i", "/root/vre_files/jobs/" + job["id"], "-o", "/root/vre_files/jobs/" + job["id"], "-l", job["language"]], stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL ) if process.returncode != 0: urllib.request.urlopen( urllib.request.Request(url=VRE_MANAGER + "/" + job["id"] + "?status=failed&report=" + ERRORRMESSAGE, method="PUT") ) return # Update job status to "finished" urllib.request.urlopen( urllib.request.Request(url=VRE_MANAGER + "/" + job["id"] + "?status=finished", method="PUT") ) # TODO: Send E-Mail to user manage_jobs("ocr") manage_jobs("nlp")