nopaque/app/models/job.py

172 lines
5.3 KiB
Python
Raw Normal View History

2024-03-05 15:02:23 +00:00
from datetime import datetime
from enum import IntEnum
from flask import current_app, url_for
from flask_hashids import HashidMixin
from time import sleep
2024-03-07 14:49:04 +00:00
from pathlib import Path
2024-03-05 15:02:23 +00:00
import shutil
from app import db
2024-11-07 07:35:02 +00:00
from app.extensions.nopaque_sqlalchemy_extras import ContainerColumn, IntEnumColumn
2024-03-05 15:02:23 +00:00
class JobStatus(IntEnum):
INITIALIZING = 1
SUBMITTED = 2
QUEUED = 3
RUNNING = 4
CANCELING = 5
CANCELED = 6
COMPLETED = 7
FAILED = 8
@staticmethod
2024-09-25 15:46:53 +00:00
def get(job_status: 'JobStatus | int | str') -> 'JobStatus':
2024-03-05 15:02:23 +00:00
if isinstance(job_status, JobStatus):
return job_status
if isinstance(job_status, int):
return JobStatus(job_status)
if isinstance(job_status, str):
return JobStatus[job_status]
raise TypeError('job_status must be JobStatus, int, or str')
class Job(HashidMixin, db.Model):
'''
Class to define Jobs.
'''
__tablename__ = 'jobs'
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Foreign keys
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# Fields
creation_date = \
db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(255))
end_date = db.Column(db.DateTime())
service = db.Column(db.String(64))
service_args = db.Column(ContainerColumn(dict, 255))
service_version = db.Column(db.String(16))
status = db.Column(
IntEnumColumn(JobStatus),
default=JobStatus.INITIALIZING
)
title = db.Column(db.String(32))
# Relationships
inputs = db.relationship(
'JobInput',
back_populates='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
results = db.relationship(
'JobResult',
back_populates='job',
cascade='all, delete-orphan',
lazy='dynamic'
)
user = db.relationship(
'User',
back_populates='jobs'
)
def __repr__(self):
return f'<Job {self.title}>'
@property
def jsonpatch_path(self):
return f'{self.user.jsonpatch_path}/jobs/{self.hashid}'
@property
2024-03-07 14:49:04 +00:00
def path(self) -> Path:
return self.user.path / 'jobs' / f'{self.id}'
2024-03-05 15:02:23 +00:00
@property
def url(self):
return url_for('jobs.job', job_id=self.id)
@property
def user_hashid(self):
return self.user.hashid
@staticmethod
def create(**kwargs):
job = Job(**kwargs)
db.session.add(job)
db.session.flush(objects=[job])
db.session.refresh(job)
2024-03-07 14:49:04 +00:00
job_inputs_dir = job.path / 'inputs'
job_pipeline_data_dir = job.path / 'pipeline_data'
job_results_dir = job.path / 'results'
2024-03-05 15:02:23 +00:00
try:
2024-03-07 14:49:04 +00:00
job.path.mkdir()
job_inputs_dir.mkdir()
job_pipeline_data_dir.mkdir()
job_results_dir.mkdir()
2024-03-05 15:02:23 +00:00
except OSError as e:
2024-03-07 14:49:04 +00:00
# TODO: Potential leftover cleanup
2024-03-05 15:02:23 +00:00
current_app.logger.error(e)
db.session.rollback()
2024-03-07 14:49:04 +00:00
raise
2024-03-05 15:02:23 +00:00
return job
def delete(self):
''' Delete the job and its inputs and results from the database. '''
if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa
self.status = JobStatus.CANCELING
db.session.commit()
while self.status != JobStatus.CANCELED:
# In case the daemon handled a job in any way
if self.status != JobStatus.CANCELING:
self.status = JobStatus.CANCELING
db.session.commit()
sleep(1)
db.session.refresh(self)
try:
shutil.rmtree(self.path)
except OSError as e:
current_app.logger.error(e)
db.session.rollback()
raise e
db.session.delete(self)
def restart(self):
''' Restart a job - only if the status is failed '''
if self.status != JobStatus.FAILED:
raise Exception('Job status is not "failed"')
2024-03-07 14:49:04 +00:00
shutil.rmtree(self.path / 'results', ignore_errors=True)
shutil.rmtree(self.path / 'pyflow.data', ignore_errors=True)
2024-03-05 15:02:23 +00:00
for result in self.results:
db.session.delete(result)
self.end_date = None
self.status = JobStatus.SUBMITTED
def to_json_serializeable(self, backrefs=False, relationships=False):
json_serializeable = {
'id': self.hashid,
'creation_date': f'{self.creation_date.isoformat()}Z',
'description': self.description,
'end_date': (
None if self.end_date is None
else f'{self.end_date.isoformat()}Z'
),
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,
'status': self.status.name,
'title': self.title
}
if backrefs:
json_serializeable['user'] = \
self.user.to_json_serializeable(backrefs=True)
if relationships:
json_serializeable['inputs'] = {
x.hashid: x.to_json_serializeable(relationships=True)
for x in self.inputs
}
json_serializeable['results'] = {
x.hashid: x.to_json_serializeable(relationships=True)
for x in self.results
}
return json_serializeable