from flask import current_app from flask_login import current_user from flask_socketio import Namespace from app import db, hashids, socketio from app.extensions.flask_socketio import admin_required, login_required from app.models import Job, JobStatus class JobsNamespace(Namespace): @login_required def on_delete(self, job_hashid: str): # Decode the job hashid job_id = hashids.decode(job_hashid) # Validate job_id if not isinstance(job_id, int): return { 'code': 400, 'body': 'job_id is invalid' } # Load job from database job = Job.query.get(job_id) if job is None: return { 'code': 404, 'body': 'Job not found' } # Check if the current user is allowed to delete the job if not (job.user == current_user or current_user.is_administrator): return { 'code': 403, 'body': 'Forbidden' } # TODO: This should be a method in the Job model def _delete_job(app, job_id): with app.app_context(): job = Job.query.get(job_id) job.delete() db.session.commit() # Delete the job in a background task socketio.start_background_task( target=_delete_job, app=current_app._get_current_object(), job_id=job_id ) return { 'code': 202, 'body': f'Job "{job.title}" marked for deletion' } @admin_required def on_get_log(self, job_hashid: str): # Decode the job hashid job_id = hashids.decode(job_hashid) # Validate job_id if not isinstance(job_id, int): return { 'code': 400, 'body': 'job_id is invalid' } # Load job from database job = Job.query.get(job_id) if job is None: return { 'code': 404, 'body': 'Job not found' } # Check if the job is already processed if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: return { 'code': 409, 'body': 'Job is not done processing' } # Read the log file with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file: job_log = log_file.read() return { 'code': 200, 'body': job_log } @login_required def on_restart(self, job_hashid: str): # Decode the job hashid job_id = hashids.decode(job_hashid) # Validate job_id if not isinstance(job_id, int): return { 'code': 400, 'body': 'job_id is invalid' } # Load job from database job = Job.query.get(job_id) if job is None: return { 'code': 404, 'body': 'Job not found' } # Check if the current user is allowed to restart the job if not (job.user == current_user or current_user.is_administrator): return { 'code': 403, 'body': 'Forbidden' } # TODO: This should be a method in the Job model def _restart_job(app, job_id): with app.app_context(): job = Job.query.get(job_id) job.restart() db.session.commit() # Restart the job in a background task socketio.start_background_task( target=_restart_job, app=current_app._get_current_object(), job_id=job_id ) return { 'code': 202, 'body': f'Job "{job.title}" restarted' }