from datetime import datetime from enum import IntEnum from flask import current_app, url_for from flask_hashids import HashidMixin from time import sleep from typing import Union from pathlib import Path import shutil from app import db from app.ext.flask_sqlalchemy import ContainerColumn, IntEnumColumn class JobStatus(IntEnum): INITIALIZING = 1 SUBMITTED = 2 QUEUED = 3 RUNNING = 4 CANCELING = 5 CANCELED = 6 COMPLETED = 7 FAILED = 8 @staticmethod def get(job_status: Union['JobStatus', int, str]) -> 'JobStatus': if isinstance(job_status, JobStatus): return job_status if isinstance(job_status, int): return JobStatus(job_status) if isinstance(job_status, str): return JobStatus[job_status] raise TypeError('job_status must be JobStatus, int, or str') class Job(HashidMixin, db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = \ db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) service_args = db.Column(ContainerColumn(dict, 255)) service_version = db.Column(db.String(16)) status = db.Column( IntEnumColumn(JobStatus), default=JobStatus.INITIALIZING ) title = db.Column(db.String(32)) # Relationships inputs = db.relationship( 'JobInput', back_populates='job', cascade='all, delete-orphan', lazy='dynamic' ) results = db.relationship( 'JobResult', back_populates='job', cascade='all, delete-orphan', lazy='dynamic' ) user = db.relationship( 'User', back_populates='jobs' ) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/jobs/{self.hashid}' @property def path(self) -> Path: return self.user.path / 'jobs' / f'{self.id}' @property def url(self): return url_for('jobs.job', job_id=self.id) @property def user_hashid(self): return self.user.hashid @staticmethod def create(**kwargs): job = Job(**kwargs) db.session.add(job) db.session.flush(objects=[job]) db.session.refresh(job) job_inputs_dir = job.path / 'inputs' job_pipeline_data_dir = job.path / 'pipeline_data' job_results_dir = job.path / 'results' try: job.path.mkdir() job_inputs_dir.mkdir() job_pipeline_data_dir.mkdir() job_results_dir.mkdir() except OSError as e: # TODO: Potential leftover cleanup current_app.logger.error(e) db.session.rollback() raise return job def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa self.status = JobStatus.CANCELING db.session.commit() while self.status != JobStatus.CANCELED: # In case the daemon handled a job in any way if self.status != JobStatus.CANCELING: self.status = JobStatus.CANCELING db.session.commit() sleep(1) db.session.refresh(self) try: shutil.rmtree(self.path) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e db.session.delete(self) def restart(self): ''' Restart a job - only if the status is failed ''' if self.status != JobStatus.FAILED: raise Exception('Job status is not "failed"') shutil.rmtree(self.path / 'results', ignore_errors=True) shutil.rmtree(self.path / 'pyflow.data', ignore_errors=True) for result in self.results: db.session.delete(result) self.end_date = None self.status = JobStatus.SUBMITTED def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'creation_date': f'{self.creation_date.isoformat()}Z', 'description': self.description, 'end_date': ( None if self.end_date is None else f'{self.end_date.isoformat()}Z' ), 'service': self.service, 'service_args': self.service_args, 'service_version': self.service_version, 'status': self.status.name, 'title': self.title } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: json_serializeable['inputs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.inputs } json_serializeable['results'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.results } return json_serializeable