diff --git a/app/__init__.py b/app/__init__.py index 41ab537f..0a814722 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -3,12 +3,14 @@ from flask import Flask from flask_login import LoginManager from flask_mail import Mail from flask_sqlalchemy import SQLAlchemy +from .scheduler import Scheduler from .swarm import Swarm db = SQLAlchemy() login_manager = LoginManager() login_manager.login_view = 'auth.login' mail = Mail() +scheduler = Scheduler() swarm = Swarm() @@ -20,6 +22,7 @@ def create_app(config_name): db.init_app(app) login_manager.init_app(app) mail.init_app(app) + scheduler.init_app(app) swarm.init_app(app) from .auth import auth as auth_blueprint diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 00000000..01437b75 --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,47 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from datetime import datetime +from sqlalchemy import create_engine +from sqlalchemy.orm import scoped_session, sessionmaker +import os + + +class Scheduler(BackgroundScheduler): + def __init__(self, app=None): + super().__init__() + self.app = app + if app is not None: + self.init_app(app) + + def init_app(self, app): + engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI']) + self.session_factory = sessionmaker(bind=engine) + self.Session = scoped_session(self.session_factory) + if not app.debug or os.environ.get('WERKZEUG_RUN_MAIN') == 'true': + self.add_job(self.check_jobs, id='jobs', seconds=10, trigger='interval') + self.start() + + def check_jobs(self): + from .models import Job + from . import swarm + + Session = scoped_session(self.session_factory) + session = Session() + jobs = session.query(Job) + submitted_jobs = jobs.filter_by(status='submitted').all() + foo_jobs = jobs.filter(Job.status != 'complete', + Job.status != 'failed', + Job.status != 'submitted').all() + for job in submitted_jobs: + swarm.run(job.id) + job.status = 'scheduled' + for job in foo_jobs: + ''' + ' TODO: Handle service not found error. + ''' + service = swarm.docker.services.get(str(job.id)) + job.status = service.tasks()[0].get('Status').get('State') + if job.status == 'complete' or job.status == 'failed': + job.end_date = datetime.utcnow() + service.remove() + session.commit() + Session.remove() diff --git a/app/services/views.py b/app/services/views.py index 938e7bd6..9fdc4e0d 100644 --- a/app/services/views.py +++ b/app/services/views.py @@ -22,7 +22,7 @@ def nlp(): service='nlp', service_args=json.dumps(['-l {}'.format(new_nlp_job_form.language.data)]), service_version=new_nlp_job_form.version.data, - status='submitted', + status='preparing', title=new_nlp_job_form.title.data) db.session.add(nlp_job) @@ -48,8 +48,8 @@ def nlp(): ' NOTE: Using self created threads is just for testing purpose as ' there is no scheduler available. ''' - thread = Thread(target=swarm.run, args=(nlp_job.id,)) - thread.start() + nlp_job.status = 'submitted' + db.session.commit() flash('Job created!') return redirect(url_for('services.nlp')) @@ -74,7 +74,7 @@ def ocr(): '-l {}'.format(new_ocr_job_form.language.data), '--skip-binarisation']), service_version=new_ocr_job_form.version.data, - status='submitted', + status='preparing', title=new_ocr_job_form.title.data) db.session.add(ocr_job) @@ -94,14 +94,8 @@ def ocr(): else: for file in new_ocr_job_form.files.data: file.save(os.path.join(dir, file.filename)) - ''' - ' TODO: Let the scheduler run this job in the background. - ' - ' NOTE: Using self created threads is just for testing purpose as - ' there is no scheduler available. - ''' - thread = Thread(target=swarm.run, args=(ocr_job.id,)) - thread.start() + ocr_job.status = 'submitted' + db.session.commit() flash('Job created!') return redirect(url_for('services.ocr')) diff --git a/app/swarm.py b/app/swarm.py index 2e57c548..ecd34978 100644 --- a/app/swarm.py +++ b/app/swarm.py @@ -1,8 +1,6 @@ -from datetime import datetime from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import docker -import time import json import os @@ -92,33 +90,5 @@ class Swarm: resources=_resources, restart_policy=_restart_policy ) - ''' - ' Because it takes some time until all data in the service object is - ' initialized (especcially the task list returned by the service.tasks - ' method), a poll loop checks if the task list is empty. - ''' - while not service.tasks(): - time.sleep(1) - service.reload() - ''' - ' The following is scheduler work. - ' Poll the service until the job is completly executed. - ''' - session.add(job) - job.status = 'running' - session.commit() - current_state = None - while True: - current_state = service.tasks()[0].get('Status').get('State') - if current_state == 'complete' or current_state == 'failed': - break - time.sleep(1) - service.reload() - job.end_date = datetime.utcnow() - job.status = current_state - session.commit() - session.close() - # Remove the service from the swarm. - service.remove() - return + return service diff --git a/app/templates/main/dashboard.html.j2 b/app/templates/main/dashboard.html.j2 index 39d58718..3ec36cff 100644 --- a/app/templates/main/dashboard.html.j2 +++ b/app/templates/main/dashboard.html.j2 @@ -45,6 +45,10 @@ pagination: true }; corpusList = new List("corpus-list", corpusListOptions); + /* + * TODO: Hide Pagination if there is only on page + * corpusListPagination = document.querySelector("#corpus-list .pagination"); + */