Use APScheduler to handle job execution.

This commit is contained in:
Patrick Jentsch 2019-08-12 17:03:12 +02:00
parent 344ba1a8e2
commit faf54e57fa
6 changed files with 62 additions and 44 deletions

View File

@ -3,12 +3,14 @@ from flask import Flask
from flask_login import LoginManager
from flask_mail import Mail
from flask_sqlalchemy import SQLAlchemy
from .scheduler import Scheduler
from .swarm import Swarm
db = SQLAlchemy()
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
mail = Mail()
scheduler = Scheduler()
swarm = Swarm()
@ -20,6 +22,7 @@ def create_app(config_name):
db.init_app(app)
login_manager.init_app(app)
mail.init_app(app)
scheduler.init_app(app)
swarm.init_app(app)
from .auth import auth as auth_blueprint

47
app/scheduler.py Normal file
View File

@ -0,0 +1,47 @@
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import os
class Scheduler(BackgroundScheduler):
def __init__(self, app=None):
super().__init__()
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
self.session_factory = sessionmaker(bind=engine)
self.Session = scoped_session(self.session_factory)
if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
self.add_job(self.check_jobs, id='jobs', seconds=10, trigger='interval')
self.start()
def check_jobs(self):
from .models import Job
from . import swarm
Session = scoped_session(self.session_factory)
session = Session()
jobs = session.query(Job)
submitted_jobs = jobs.filter_by(status='submitted').all()
foo_jobs = jobs.filter(Job.status != 'complete',
Job.status != 'failed',
Job.status != 'submitted').all()
for job in submitted_jobs:
swarm.run(job.id)
job.status = 'scheduled'
for job in foo_jobs:
'''
' TODO: Handle service not found error.
'''
service = swarm.docker.services.get(str(job.id))
job.status = service.tasks()[0].get('Status').get('State')
if job.status == 'complete' or job.status == 'failed':
job.end_date = datetime.utcnow()
service.remove()
session.commit()
Session.remove()

View File

@ -22,7 +22,7 @@ def nlp():
service='nlp',
service_args=json.dumps(['-l {}'.format(new_nlp_job_form.language.data)]),
service_version=new_nlp_job_form.version.data,
status='submitted',
status='preparing',
title=new_nlp_job_form.title.data)
db.session.add(nlp_job)
@ -48,8 +48,8 @@ def nlp():
' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available.
'''
thread = Thread(target=swarm.run, args=(nlp_job.id,))
thread.start()
nlp_job.status = 'submitted'
db.session.commit()
flash('Job created!')
return redirect(url_for('services.nlp'))
@ -74,7 +74,7 @@ def ocr():
'-l {}'.format(new_ocr_job_form.language.data),
'--skip-binarisation']),
service_version=new_ocr_job_form.version.data,
status='submitted',
status='preparing',
title=new_ocr_job_form.title.data)
db.session.add(ocr_job)
@ -94,14 +94,8 @@ def ocr():
else:
for file in new_ocr_job_form.files.data:
file.save(os.path.join(dir, file.filename))
'''
' TODO: Let the scheduler run this job in the background.
'
' NOTE: Using self created threads is just for testing purpose as
' there is no scheduler available.
'''
thread = Thread(target=swarm.run, args=(ocr_job.id,))
thread.start()
ocr_job.status = 'submitted'
db.session.commit()
flash('Job created!')
return redirect(url_for('services.ocr'))

View File

@ -1,8 +1,6 @@
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import docker
import time
import json
import os
@ -92,33 +90,5 @@ class Swarm:
resources=_resources,
restart_policy=_restart_policy
)
'''
' Because it takes some time until all data in the service object is
' initialized (especcially the task list returned by the service.tasks
' method), a poll loop checks if the task list is empty.
'''
while not service.tasks():
time.sleep(1)
service.reload()
'''
' The following is scheduler work.
' Poll the service until the job is completly executed.
'''
session.add(job)
job.status = 'running'
session.commit()
current_state = None
while True:
current_state = service.tasks()[0].get('Status').get('State')
if current_state == 'complete' or current_state == 'failed':
break
time.sleep(1)
service.reload()
job.end_date = datetime.utcnow()
job.status = current_state
session.commit()
session.close()
# Remove the service from the swarm.
service.remove()
return
return service

View File

@ -45,6 +45,10 @@
pagination: true
};
corpusList = new List("corpus-list", corpusListOptions);
/*
* TODO: Hide Pagination if there is only on page
* corpusListPagination = document.querySelector("#corpus-list .pagination");
*/
</script>
<div class="col s12">

View File

@ -1,6 +1,6 @@
APScheduler==3.6.1
docker==4.0.2
Flask==1.0.3
Flask-APScheduler==1.11.0
Flask-Login==0.4.1
Flask-Mail==0.9.1
Flask-Migrate==2.5.2