mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2024-11-16 01:35:41 +00:00
173 lines
5.3 KiB
Python
173 lines
5.3 KiB
Python
from datetime import datetime
|
|
from enum import IntEnum
|
|
from flask import current_app, url_for
|
|
from flask_hashids import HashidMixin
|
|
from time import sleep
|
|
from typing import Union
|
|
from pathlib import Path
|
|
import shutil
|
|
from app import db
|
|
from app.extensions.sqlalchemy import ContainerColumn, IntEnumColumn
|
|
|
|
|
|
class JobStatus(IntEnum):
|
|
INITIALIZING = 1
|
|
SUBMITTED = 2
|
|
QUEUED = 3
|
|
RUNNING = 4
|
|
CANCELING = 5
|
|
CANCELED = 6
|
|
COMPLETED = 7
|
|
FAILED = 8
|
|
|
|
@staticmethod
|
|
def get(job_status: Union['JobStatus', int, str]) -> 'JobStatus':
|
|
if isinstance(job_status, JobStatus):
|
|
return job_status
|
|
if isinstance(job_status, int):
|
|
return JobStatus(job_status)
|
|
if isinstance(job_status, str):
|
|
return JobStatus[job_status]
|
|
raise TypeError('job_status must be JobStatus, int, or str')
|
|
|
|
|
|
class Job(HashidMixin, db.Model):
|
|
'''
|
|
Class to define Jobs.
|
|
'''
|
|
__tablename__ = 'jobs'
|
|
# Primary key
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
# Foreign keys
|
|
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
|
# Fields
|
|
creation_date = \
|
|
db.Column(db.DateTime(), default=datetime.utcnow)
|
|
description = db.Column(db.String(255))
|
|
end_date = db.Column(db.DateTime())
|
|
service = db.Column(db.String(64))
|
|
service_args = db.Column(ContainerColumn(dict, 255))
|
|
service_version = db.Column(db.String(16))
|
|
status = db.Column(
|
|
IntEnumColumn(JobStatus),
|
|
default=JobStatus.INITIALIZING
|
|
)
|
|
title = db.Column(db.String(32))
|
|
# Relationships
|
|
inputs = db.relationship(
|
|
'JobInput',
|
|
back_populates='job',
|
|
cascade='all, delete-orphan',
|
|
lazy='dynamic'
|
|
)
|
|
results = db.relationship(
|
|
'JobResult',
|
|
back_populates='job',
|
|
cascade='all, delete-orphan',
|
|
lazy='dynamic'
|
|
)
|
|
user = db.relationship(
|
|
'User',
|
|
back_populates='jobs'
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f'<Job {self.title}>'
|
|
|
|
@property
|
|
def jsonpatch_path(self):
|
|
return f'{self.user.jsonpatch_path}/jobs/{self.hashid}'
|
|
|
|
@property
|
|
def path(self) -> Path:
|
|
return self.user.path / 'jobs' / f'{self.id}'
|
|
|
|
@property
|
|
def url(self):
|
|
return url_for('jobs.job', job_id=self.id)
|
|
|
|
@property
|
|
def user_hashid(self):
|
|
return self.user.hashid
|
|
|
|
@staticmethod
|
|
def create(**kwargs):
|
|
job = Job(**kwargs)
|
|
db.session.add(job)
|
|
db.session.flush(objects=[job])
|
|
db.session.refresh(job)
|
|
job_inputs_dir = job.path / 'inputs'
|
|
job_pipeline_data_dir = job.path / 'pipeline_data'
|
|
job_results_dir = job.path / 'results'
|
|
try:
|
|
job.path.mkdir()
|
|
job_inputs_dir.mkdir()
|
|
job_pipeline_data_dir.mkdir()
|
|
job_results_dir.mkdir()
|
|
except OSError as e:
|
|
# TODO: Potential leftover cleanup
|
|
current_app.logger.error(e)
|
|
db.session.rollback()
|
|
raise
|
|
return job
|
|
|
|
def delete(self):
|
|
''' Delete the job and its inputs and results from the database. '''
|
|
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
|
|
self.status = JobStatus.CANCELING
|
|
db.session.commit()
|
|
while self.status != JobStatus.CANCELED:
|
|
# In case the daemon handled a job in any way
|
|
if self.status != JobStatus.CANCELING:
|
|
self.status = JobStatus.CANCELING
|
|
db.session.commit()
|
|
sleep(1)
|
|
db.session.refresh(self)
|
|
try:
|
|
shutil.rmtree(self.path)
|
|
except OSError as e:
|
|
current_app.logger.error(e)
|
|
db.session.rollback()
|
|
raise e
|
|
db.session.delete(self)
|
|
|
|
def restart(self):
|
|
''' Restart a job - only if the status is failed '''
|
|
if self.status != JobStatus.FAILED:
|
|
raise Exception('Job status is not "failed"')
|
|
shutil.rmtree(self.path / 'results', ignore_errors=True)
|
|
shutil.rmtree(self.path / 'pyflow.data', ignore_errors=True)
|
|
for result in self.results:
|
|
db.session.delete(result)
|
|
self.end_date = None
|
|
self.status = JobStatus.SUBMITTED
|
|
|
|
def to_json_serializeable(self, backrefs=False, relationships=False):
|
|
json_serializeable = {
|
|
'id': self.hashid,
|
|
'creation_date': f'{self.creation_date.isoformat()}Z',
|
|
'description': self.description,
|
|
'end_date': (
|
|
None if self.end_date is None
|
|
else f'{self.end_date.isoformat()}Z'
|
|
),
|
|
'service': self.service,
|
|
'service_args': self.service_args,
|
|
'service_version': self.service_version,
|
|
'status': self.status.name,
|
|
'title': self.title
|
|
}
|
|
if backrefs:
|
|
json_serializeable['user'] = \
|
|
self.user.to_json_serializeable(backrefs=True)
|
|
if relationships:
|
|
json_serializeable['inputs'] = {
|
|
x.hashid: x.to_json_serializeable(relationships=True)
|
|
for x in self.inputs
|
|
}
|
|
json_serializeable['results'] = {
|
|
x.hashid: x.to_json_serializeable(relationships=True)
|
|
for x in self.results
|
|
}
|
|
return json_serializeable
|