Use Flask-APScheduler. Move docker swarm logic to scheduler.

This commit is contained in:
Patrick Jentsch 2019-08-13 14:10:50 +02:00
parent 1fa9cbc586
commit db0c2532ad
6 changed files with 50 additions and 86 deletions

View File

@ -1,29 +1,28 @@
from config import config from config import config
from flask import Flask from flask import Flask
from flask_apscheduler import APScheduler
from flask_login import LoginManager from flask_login import LoginManager
from flask_mail import Mail from flask_mail import Mail
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from .scheduler import Scheduler
from .swarm import Swarm
db = SQLAlchemy() db = SQLAlchemy()
login_manager = LoginManager() login_manager = LoginManager()
login_manager.login_view = 'auth.login' login_manager.login_view = 'auth.login'
mail = Mail() mail = Mail()
scheduler = Scheduler() scheduler = APScheduler()
swarm = Swarm()
def create_app(config_name): def create_app(config_name):
app = Flask(__name__) app = Flask(__name__)
app.config.from_object(config[config_name]) app.config.from_object(config[config_name])
config[config_name].init_app(app)
config[config_name].init_app(app)
db.init_app(app) db.init_app(app)
login_manager.init_app(app) login_manager.init_app(app)
mail.init_app(app) mail.init_app(app)
scheduler.init_app(app) scheduler.init_app(app)
swarm.init_app(app) scheduler.start()
from .auth import auth as auth_blueprint from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint, url_prefix='/auth') app.register_blueprint(auth_blueprint, url_prefix='/auth')
@ -35,3 +34,13 @@ def create_app(config_name):
app.register_blueprint(main_blueprint) app.register_blueprint(main_blueprint)
return app return app
def create_minimal_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
db.init_app(app)
return app

View File

@ -1,45 +0,0 @@
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import os
class Scheduler(BackgroundScheduler):
def __init__(self, app=None):
super().__init__()
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
self.Session = scoped_session(sessionmaker(bind=engine))
if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
self.add_job(self.check_jobs, id='jobs', seconds=10, trigger='interval')
self.start()
def check_jobs(self):
from .models import Job
from . import swarm
session = self.Session()
jobs = session.query(Job)
submitted_jobs = jobs.filter_by(status='submitted').all()
foo_jobs = jobs.filter(Job.status != 'complete',
Job.status != 'failed',
Job.status != 'submitted').all()
for job in submitted_jobs:
swarm.run(job)
job.status = 'scheduled'
for job in foo_jobs:
'''
' TODO: Handle service not found error.
'''
service = swarm.docker.services.get(str(job.id))
job.status = service.tasks()[0].get('Status').get('State')
if job.status == 'complete' or job.status == 'failed':
job.end_date = datetime.utcnow()
service.remove()
session.commit()
self.Session.remove()

View File

@ -1,40 +1,25 @@
from datetime import datetime
from . import create_minimal_app, db
from .models import Job
import docker import docker
import json import json
import os import os
class Swarm: def check_jobs():
def __init__(self, app=None): app = create_minimal_app(os.getenv('FLASK_CONFIG') or 'default')
self.app = app app.app_context().push()
if app is not None: docker_client = docker.from_env()
self.init_app(app) jobs = db.session.query(Job)
self.docker = docker.from_env() submitted_jobs = jobs.filter_by(status='submitted').all()
foo_jobs = jobs.filter(Job.status != 'complete',
def init_app(self, app): Job.status != 'failed',
pass Job.status != 'submitted').all()
for job in submitted_jobs:
'''
' Swarm mode is intendet to run containers which serve a non terminating
' service like a webserver. For processing an occuring job it is necessary
' to use an one-shot container, which stops after the wrapped job process
' is completly executed. In order to run these one-shot containers in Swarm
' mode, the following run method is implemented analog to the presented
' implementation in Alex Ellis' blog post "One-shot containers on Docker
' Swarm"¹.
'
' ¹ https://blog.alexellis.io/containers-on-swarm/
'''
def run(self, job):
'''
Input is a job.
'''
# Prepare argument values needed for the service creation.
service_args = json.loads(job.service_args)
_command = (job.service _command = (job.service
+ ' -i /files' + ' -i /files'
+ ' -o /files/output' + ' -o /files/output'
+ ' ' + ' '.join(service_args)) + ' ' + ' '.join(json.loads(job.service_args)))
_constraints = ['node.role==worker'] _constraints = ['node.role==worker']
_image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format(
job.service, job.service,
@ -74,7 +59,7 @@ class Swarm:
' (name=_name). Because there is no id generator for now, it is ' (name=_name). Because there is no id generator for now, it is
' not set, so that the Docker engine assigns a random name. ' not set, so that the Docker engine assigns a random name.
''' '''
service = self.docker.services.create( service = docker_client.services.create(
_image, _image,
command=_command, command=_command,
constraints=_constraints, constraints=_constraints,
@ -84,5 +69,14 @@ class Swarm:
resources=_resources, resources=_resources,
restart_policy=_restart_policy restart_policy=_restart_policy
) )
job.status = 'scheduled'
return service for job in foo_jobs:
'''
' TODO: Handle service not found error.
'''
service = docker_client.services.get(str(job.id))
job.status = service.tasks()[0].get('Status').get('State')
if job.status == 'complete' or job.status == 'failed':
job.end_date = datetime.utcnow()
service.remove()
db.session.commit()

View File

@ -3,9 +3,7 @@ from . import services
from flask_login import current_user, login_required from flask_login import current_user, login_required
from .forms import NewOCRJobForm, NewNLPJobForm from .forms import NewOCRJobForm, NewNLPJobForm
from ..models import Job from ..models import Job
from ..import swarm
from .. import db from .. import db
from threading import Thread
import json import json
import os import os

View File

@ -5,6 +5,14 @@ basedir = os.path.abspath(os.path.dirname(__file__))
class Config: class Config:
JOBS = [
{
'id': 'check_jobs',
'func': 'app.scheduler_functions:check_jobs',
'seconds': 3,
'trigger': 'interval'
}
]
MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com') MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com')
MAIL_PORT = int(os.environ.get('MAIL_PORT', '587')) MAIL_PORT = int(os.environ.get('MAIL_PORT', '587'))
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in \ MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in \

View File

@ -1,6 +1,6 @@
APScheduler==3.6.1
docker==4.0.2 docker==4.0.2
Flask==1.0.3 Flask==1.0.3
Flask-APScheduler==1.11.0
Flask-Login==0.4.1 Flask-Login==0.4.1
Flask-Mail==0.9.1 Flask-Mail==0.9.1
Flask-Migrate==2.5.2 Flask-Migrate==2.5.2