diff --git a/app/__init__.py b/app/__init__.py index 0a814722..4432a4e8 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,29 +1,28 @@ from config import config from flask import Flask +from flask_apscheduler import APScheduler from flask_login import LoginManager from flask_mail import Mail from flask_sqlalchemy import SQLAlchemy -from .scheduler import Scheduler -from .swarm import Swarm + db = SQLAlchemy() login_manager = LoginManager() login_manager.login_view = 'auth.login' mail = Mail() -scheduler = Scheduler() -swarm = Swarm() +scheduler = APScheduler() def create_app(config_name): app = Flask(__name__) app.config.from_object(config[config_name]) - config[config_name].init_app(app) + config[config_name].init_app(app) db.init_app(app) login_manager.init_app(app) mail.init_app(app) scheduler.init_app(app) - swarm.init_app(app) + scheduler.start() from .auth import auth as auth_blueprint app.register_blueprint(auth_blueprint, url_prefix='/auth') @@ -35,3 +34,13 @@ def create_app(config_name): app.register_blueprint(main_blueprint) return app + + +def create_minimal_app(config_name): + app = Flask(__name__) + app.config.from_object(config[config_name]) + + config[config_name].init_app(app) + db.init_app(app) + + return app diff --git a/app/scheduler.py b/app/scheduler.py deleted file mode 100644 index cf8b9f1e..00000000 --- a/app/scheduler.py +++ /dev/null @@ -1,45 +0,0 @@ -from apscheduler.schedulers.background import BackgroundScheduler -from datetime import datetime -from sqlalchemy import create_engine -from sqlalchemy.orm import scoped_session, sessionmaker -import os - - -class Scheduler(BackgroundScheduler): - def __init__(self, app=None): - super().__init__() - self.app = app - if app is not None: - self.init_app(app) - - def init_app(self, app): - engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI']) - self.Session = scoped_session(sessionmaker(bind=engine)) - if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': - self.add_job(self.check_jobs, id='jobs', seconds=10, trigger='interval') - self.start() - - def check_jobs(self): - from .models import Job - from . import swarm - - session = self.Session() - jobs = session.query(Job) - submitted_jobs = jobs.filter_by(status='submitted').all() - foo_jobs = jobs.filter(Job.status != 'complete', - Job.status != 'failed', - Job.status != 'submitted').all() - for job in submitted_jobs: - swarm.run(job) - job.status = 'scheduled' - for job in foo_jobs: - ''' - ' TODO: Handle service not found error. - ''' - service = swarm.docker.services.get(str(job.id)) - job.status = service.tasks()[0].get('Status').get('State') - if job.status == 'complete' or job.status == 'failed': - job.end_date = datetime.utcnow() - service.remove() - session.commit() - self.Session.remove() diff --git a/app/swarm.py b/app/scheduler_functions.py similarity index 67% rename from app/swarm.py rename to app/scheduler_functions.py index d79c3d53..6afba342 100644 --- a/app/swarm.py +++ b/app/scheduler_functions.py @@ -1,40 +1,25 @@ +from datetime import datetime +from . import create_minimal_app, db +from .models import Job import docker import json import os -class Swarm: - def __init__(self, app=None): - self.app = app - if app is not None: - self.init_app(app) - self.docker = docker.from_env() - - def init_app(self, app): - pass - - ''' - ' Swarm mode is intendet to run containers which serve a non terminating - ' service like a webserver. For processing an occuring job it is necessary - ' to use an one-shot container, which stops after the wrapped job process - ' is completly executed. In order to run these one-shot containers in Swarm - ' mode, the following run method is implemented analog to the presented - ' implementation in Alex Ellis' blog post "One-shot containers on Docker - ' Swarm"¹. - ' - ' ¹ https://blog.alexellis.io/containers-on-swarm/ - ''' - - def run(self, job): - ''' - Input is a job. - ''' - # Prepare argument values needed for the service creation. - service_args = json.loads(job.service_args) +def check_jobs(): + app = create_minimal_app(os.getenv('FLASK_CONFIG') or 'default') + app.app_context().push() + docker_client = docker.from_env() + jobs = db.session.query(Job) + submitted_jobs = jobs.filter_by(status='submitted').all() + foo_jobs = jobs.filter(Job.status != 'complete', + Job.status != 'failed', + Job.status != 'submitted').all() + for job in submitted_jobs: _command = (job.service + ' -i /files' + ' -o /files/output' - + ' ' + ' '.join(service_args)) + + ' ' + ' '.join(json.loads(job.service_args))) _constraints = ['node.role==worker'] _image = 'gitlab.ub.uni-bielefeld.de:4567/sfb1288inf/{}:{}'.format( job.service, @@ -74,7 +59,7 @@ class Swarm: ' (name=_name). Because there is no id generator for now, it is ' not set, so that the Docker engine assigns a random name. ''' - service = self.docker.services.create( + service = docker_client.services.create( _image, command=_command, constraints=_constraints, @@ -84,5 +69,14 @@ class Swarm: resources=_resources, restart_policy=_restart_policy ) - - return service + job.status = 'scheduled' + for job in foo_jobs: + ''' + ' TODO: Handle service not found error. + ''' + service = docker_client.services.get(str(job.id)) + job.status = service.tasks()[0].get('Status').get('State') + if job.status == 'complete' or job.status == 'failed': + job.end_date = datetime.utcnow() + service.remove() + db.session.commit() diff --git a/app/services/views.py b/app/services/views.py index 9fdc4e0d..e86664f1 100644 --- a/app/services/views.py +++ b/app/services/views.py @@ -3,9 +3,7 @@ from . import services from flask_login import current_user, login_required from .forms import NewOCRJobForm, NewNLPJobForm from ..models import Job -from ..import swarm from .. import db -from threading import Thread import json import os diff --git a/config.py b/config.py index 13061f25..7358052e 100644 --- a/config.py +++ b/config.py @@ -5,6 +5,14 @@ basedir = os.path.abspath(os.path.dirname(__file__)) class Config: + JOBS = [ + { + 'id': 'check_jobs', + 'func': 'app.scheduler_functions:check_jobs', + 'seconds': 3, + 'trigger': 'interval' + } + ] MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.gmail.com') MAIL_PORT = int(os.environ.get('MAIL_PORT', '587')) MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in \ diff --git a/requirements.txt b/requirements.txt index 7f9f4ae6..55c5be5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -APScheduler==3.6.1 docker==4.0.2 Flask==1.0.3 +Flask-APScheduler==1.11.0 Flask-Login==0.4.1 Flask-Mail==0.9.1 Flask-Migrate==2.5.2