From db0c2532adcd505376ae880b41fdd03c018e9de2 Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Tue, 13 Aug 2019 14:10:50 +0200
Subject: [PATCH] Use Flask-APScheduler. Move docker swarm logic to scheduler.
---
app/__init__.py | 21 ++++++---
app/scheduler.py | 45 ------------------
app/{swarm.py => scheduler_functions.py} | 58 +++++++++++-------------
app/services/views.py | 2 -
config.py | 8 ++++
requirements.txt | 2 +-
6 files changed, 50 insertions(+), 86 deletions(-)
delete mode 100644 app/scheduler.py
rename app/{swarm.py => scheduler_functions.py} (67%)
diff --git a/app/__init__.py b/app/__init__.py
index 0a814722..4432a4e8 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -1,29 +1,28 @@
from config import config
from flask import Flask
+from flask_apscheduler import APScheduler
from flask_login import LoginManager
from flask_mail import Mail
from flask_sqlalchemy import SQLAlchemy
-from .scheduler import Scheduler
-from .swarm import Swarm
+
db = SQLAlchemy()
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
mail = Mail()
-scheduler = Scheduler()
-swarm = Swarm()
+scheduler = APScheduler()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
- config[config_name].init_app(app)
+ config[config_name].init_app(app)
db.init_app(app)
login_manager.init_app(app)
mail.init_app(app)
scheduler.init_app(app)
- swarm.init_app(app)
+ scheduler.start()
from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint, url_prefix='/auth')
@@ -35,3 +34,13 @@ def create_app(config_name):
app.register_blueprint(main_blueprint)
return app
+
+
+def create_minimal_app(config_name):
+ app = Flask(__name__)
+ app.config.from_object(config[config_name])
+
+ config[config_name].init_app(app)
+ db.init_app(app)
+
+ return app
diff --git a/app/scheduler.py b/app/scheduler.py
deleted file mode 100644
index cf8b9f1e..00000000
--- a/app/scheduler.py
+++ /dev/null
@@ -1,45 +0,0 @@
-from apscheduler.schedulers.background import BackgroundScheduler
-from datetime import datetime
-from sqlalchemy import create_engine
-from sqlalchemy.orm import scoped_session, sessionmaker
-import os
-
-
-class Scheduler(BackgroundScheduler):
- def __init__(self, app=None):
- super().__init__()
- self.app = app
- if app is not None:
- self.init_app(app)
-
- def init_app(self, app):
- engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
- self.Session = scoped_session(sessionmaker(bind=engine))
- if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
- self.add_job(self.check_jobs, id='jobs', seconds=10, trigger='interval')
- self.start()
-
- def check_jobs(self):
- from .models import Job
- from . import swarm
-
- session = self.Session()
- jobs = session.query(Job)
- submitted_jobs = jobs.filter_by(status='submitted').all()
- foo_jobs = jobs.filter(Job.status != 'complete',
- Job.status != 'failed',
- Job.status != 'submitted').all()
- for job in submitted_jobs:
- swarm.run(job)
- job.status = 'scheduled'
- for job in foo_jobs:
- '''
- ' TODO: Handle service not found error.
- '''
- service = swarm.docker.services.get(str(job.id))
- job.status = service.tasks()[0].get('Status').get('State')
- if job.status == 'complete' or job.status == 'failed':
- job.end_date = datetime.utcnow()
- service.remove()
- session.commit()
- self.Session.remove()
diff --git a/app/swarm.py b/app/scheduler_functions.py
similarity index 67%
rename from app/swarm.py
rename to app/scheduler_functions.py
index d79c3d53..6afba342 100644
--- a/app/swarm.py
+++ b/app/scheduler_functions.py
@@ -1,40 +1,25 @@
+from datetime import datetime
+from . import create_minimal_app, db
+from .models import Job
import docker
import json
import os
-class Swarm:
- def __init__(self, app=None):
- self.app = app
- if app is not None:
- self.init_app(app)
- self.docker = docker.from_env()
-
- def init_app(self, app):
- pass
-
- '''
- ' Swarm mode is intendet to run containers which serve a non terminating
- ' service like a webserver. For processing an occuring job it is necessary
- ' to use an one-shot container, which stops after the wrapped job process
- ' is completly executed. In order to run these one-shot containers in Swarm
- ' mode, the following run method is implemented analog to the presented
- ' implementation in Alex Ellis' blog post "One-shot containers on Docker
- ' Swarm"¹.
- '
- ' ¹ https://blog.alexellis.io/containers-on-swarm/
- '''
-
- def run(self, job):
- '''
- Input is a job.
- '''
- # Prepare argument values needed for the service creation.
- service_args = json.loads(job.service_args)
+def check_jobs():
+ app = create_minimal_app(os.getenv('FLASK_CONFIG') or 'default')
+ app.app_context().push()
+ docker_client = docker.from_env()
+ jobs = db.session.query(Job)
+ submitted_jobs = jobs.filter_by(status='submitted').all()
+ foo_jobs = jobs.filter(Job.status != 'complete',
+ Job.status != 'failed',
+ Job.status != 'submitted').all()
+ for job in submitted_jobs:
_command = (job.service
+ ' -i /files'
+ ' -o /files/output'
- + ' ' + ' '.join(service_args))
+ + ' ' + ' '.join(json.loads(job.service_args)))
_constraints = ['node.role==worker']
_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
job.service,
@@ -74,7 +59,7 @@ class Swarm:
' (name=_name). Because there is no id generator for now, it is
' not set, so that the Docker engine assigns a random name.
'''
- service = self.docker.services.create(
+ service = docker_client.services.create(
_image,
command=_command,
constraints=_constraints,
@@ -84,5 +69,14 @@ class Swarm:
resources=_resources,
restart_policy=_restart_policy
)
-
- return service
+ job.status = 'scheduled'
+ for job in foo_jobs:
+ '''
+ ' TODO: Handle service not found error.
+ '''
+ service = docker_client.services.get(str(job.id))
+ job.status = service.tasks()[0].get('Status').get('State')
+ if job.status == 'complete' or job.status == 'failed':
+ job.end_date = datetime.utcnow()
+ service.remove()
+ db.session.commit()
diff --git a/app/services/views.py b/app/services/views.py
index 9fdc4e0d..e86664f1 100644
--- a/app/services/views.py
+++ b/app/services/views.py
@@ -3,9 +3,7 @@ from . import services
from flask_login import current_user, login_required
from .forms import NewOCRJobForm, NewNLPJobForm
from ..models import Job
-from ..import swarm
from .. import db
-from threading import Thread
import json
import os
diff --git a/config.py b/config.py
index 13061f25..7358052e 100644
--- a/config.py
+++ b/config.py
@@ -5,6 +5,14 @@ basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
+ JOBS = [
+ {
+ 'id': 'check_jobs',
+ 'func': 'app.scheduler_functions:check_jobs',
+ 'seconds': 3,
+ 'trigger': 'interval'
+ }
+ ]
MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com')
MAIL_PORT = int(os.environ.get('MAIL_PORT', '587'))
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in \
diff --git a/requirements.txt b/requirements.txt
index 7f9f4ae6..55c5be5e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
-APScheduler==3.6.1
docker==4.0.2
Flask==1.0.3
+Flask-APScheduler==1.11.0
Flask-Login==0.4.1
Flask-Mail==0.9.1
Flask-Migrate==2.5.2