mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-06-21 05:20:36 +00:00
move models in seperate modules
This commit is contained in:
168
app/models/job.py
Normal file
168
app/models/job.py
Normal file
@ -0,0 +1,168 @@
|
||||
from datetime import datetime
|
||||
from enum import IntEnum
|
||||
from flask import current_app, url_for
|
||||
from flask_hashids import HashidMixin
|
||||
from time import sleep
|
||||
from typing import Union
|
||||
import os
|
||||
import shutil
|
||||
from app import db
|
||||
from app.ext.flask_sqlalchemy import ContainerColumn, IntEnumColumn
|
||||
|
||||
|
||||
class JobStatus(IntEnum):
|
||||
INITIALIZING = 1
|
||||
SUBMITTED = 2
|
||||
QUEUED = 3
|
||||
RUNNING = 4
|
||||
CANCELING = 5
|
||||
CANCELED = 6
|
||||
COMPLETED = 7
|
||||
FAILED = 8
|
||||
|
||||
@staticmethod
|
||||
def get(job_status: Union['JobStatus', int, str]) -> 'JobStatus':
|
||||
if isinstance(job_status, JobStatus):
|
||||
return job_status
|
||||
if isinstance(job_status, int):
|
||||
return JobStatus(job_status)
|
||||
if isinstance(job_status, str):
|
||||
return JobStatus[job_status]
|
||||
raise TypeError('job_status must be JobStatus, int, or str')
|
||||
|
||||
|
||||
class Job(HashidMixin, db.Model):
|
||||
'''
|
||||
Class to define Jobs.
|
||||
'''
|
||||
__tablename__ = 'jobs'
|
||||
# Primary key
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
# Foreign keys
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
||||
# Fields
|
||||
creation_date = \
|
||||
db.Column(db.DateTime(), default=datetime.utcnow)
|
||||
description = db.Column(db.String(255))
|
||||
end_date = db.Column(db.DateTime())
|
||||
service = db.Column(db.String(64))
|
||||
service_args = db.Column(ContainerColumn(dict, 255))
|
||||
service_version = db.Column(db.String(16))
|
||||
status = db.Column(
|
||||
IntEnumColumn(JobStatus),
|
||||
default=JobStatus.INITIALIZING
|
||||
)
|
||||
title = db.Column(db.String(32))
|
||||
# Relationships
|
||||
inputs = db.relationship(
|
||||
'JobInput',
|
||||
back_populates='job',
|
||||
cascade='all, delete-orphan',
|
||||
lazy='dynamic'
|
||||
)
|
||||
results = db.relationship(
|
||||
'JobResult',
|
||||
back_populates='job',
|
||||
cascade='all, delete-orphan',
|
||||
lazy='dynamic'
|
||||
)
|
||||
user = db.relationship(
|
||||
'User',
|
||||
back_populates='jobs'
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f'<Job {self.title}>'
|
||||
|
||||
@property
|
||||
def jsonpatch_path(self):
|
||||
return f'{self.user.jsonpatch_path}/jobs/{self.hashid}'
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return os.path.join(self.user.path, 'jobs', str(self.id))
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return url_for('jobs.job', job_id=self.id)
|
||||
|
||||
@property
|
||||
def user_hashid(self):
|
||||
return self.user.hashid
|
||||
|
||||
@staticmethod
|
||||
def create(**kwargs):
|
||||
job = Job(**kwargs)
|
||||
db.session.add(job)
|
||||
db.session.flush(objects=[job])
|
||||
db.session.refresh(job)
|
||||
try:
|
||||
os.mkdir(job.path)
|
||||
os.mkdir(os.path.join(job.path, 'inputs'))
|
||||
os.mkdir(os.path.join(job.path, 'pipeline_data'))
|
||||
os.mkdir(os.path.join(job.path, 'results'))
|
||||
except OSError as e:
|
||||
current_app.logger.error(e)
|
||||
db.session.rollback()
|
||||
raise e
|
||||
return job
|
||||
|
||||
def delete(self):
|
||||
''' Delete the job and its inputs and results from the database. '''
|
||||
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
|
||||
self.status = JobStatus.CANCELING
|
||||
db.session.commit()
|
||||
while self.status != JobStatus.CANCELED:
|
||||
# In case the daemon handled a job in any way
|
||||
if self.status != JobStatus.CANCELING:
|
||||
self.status = JobStatus.CANCELING
|
||||
db.session.commit()
|
||||
sleep(1)
|
||||
db.session.refresh(self)
|
||||
try:
|
||||
shutil.rmtree(self.path)
|
||||
except OSError as e:
|
||||
current_app.logger.error(e)
|
||||
db.session.rollback()
|
||||
raise e
|
||||
db.session.delete(self)
|
||||
|
||||
def restart(self):
|
||||
''' Restart a job - only if the status is failed '''
|
||||
if self.status != JobStatus.FAILED:
|
||||
raise Exception('Job status is not "failed"')
|
||||
shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True)
|
||||
shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True)
|
||||
for result in self.results:
|
||||
db.session.delete(result)
|
||||
self.end_date = None
|
||||
self.status = JobStatus.SUBMITTED
|
||||
|
||||
def to_json_serializeable(self, backrefs=False, relationships=False):
|
||||
json_serializeable = {
|
||||
'id': self.hashid,
|
||||
'creation_date': f'{self.creation_date.isoformat()}Z',
|
||||
'description': self.description,
|
||||
'end_date': (
|
||||
None if self.end_date is None
|
||||
else f'{self.end_date.isoformat()}Z'
|
||||
),
|
||||
'service': self.service,
|
||||
'service_args': self.service_args,
|
||||
'service_version': self.service_version,
|
||||
'status': self.status.name,
|
||||
'title': self.title
|
||||
}
|
||||
if backrefs:
|
||||
json_serializeable['user'] = \
|
||||
self.user.to_json_serializeable(backrefs=True)
|
||||
if relationships:
|
||||
json_serializeable['inputs'] = {
|
||||
x.hashid: x.to_json_serializeable(relationships=True)
|
||||
for x in self.inputs
|
||||
}
|
||||
json_serializeable['results'] = {
|
||||
x.hashid: x.to_json_serializeable(relationships=True)
|
||||
for x in self.results
|
||||
}
|
||||
return json_serializeable
|
Reference in New Issue
Block a user