From c29c50feb91b18a27718fc7e1539835a62256e0d Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Thu, 18 Apr 2024 15:37:17 +0200
Subject: [PATCH] new event system first draft
---
app/admin/events.py | 49 ++++++++++++++++
app/jobs/events.py | 138 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 187 insertions(+)
create mode 100644 app/admin/events.py
create mode 100644 app/jobs/events.py
diff --git a/app/admin/events.py b/app/admin/events.py
new file mode 100644
index 00000000..e5ed7b0b
--- /dev/null
+++ b/app/admin/events.py
@@ -0,0 +1,49 @@
+from flask_login import current_user
+from flask_socketio import disconnect, Namespace
+from app import db, hashids
+from app.extensions.flask_socketio_extras import admin_required
+from app.models import User
+
+
+class AdminNamespace(Namespace):
+ def on_connect(self):
+ # Check if the user is authenticated and is an administrator
+ if not (current_user.is_authenticated and current_user.is_administrator):
+ disconnect()
+
+
+ @admin_required
+ def on_set_user_confirmed(self, user_hashid: str, confirmed_value: bool):
+ # Decode the user hashid
+ user_id = hashids.decode(user_hashid)
+
+ # Validate user_id
+ if not isinstance(user_id, int):
+ return {
+ 'code': 400,
+ 'body': 'user_id is invalid'
+ }
+
+ # Validate confirmed_value
+ if not isinstance(confirmed_value, bool):
+ return {
+ 'code': 400,
+ 'body': 'confirmed_value is invalid'
+ }
+
+ # Load user from database
+ user = User.query.get(user_id)
+ if user is None:
+ return {
+ 'code': 404,
+ 'body': 'User not found'
+ }
+
+ # Update user confirmed status
+ user.confirmed = confirmed_value
+ db.session.commit()
+
+ return {
+ 'code': 200,
+ 'body': f'User "{user.username}" is now {"confirmed" if confirmed_value else "unconfirmed"}'
+ }
diff --git a/app/jobs/events.py b/app/jobs/events.py
new file mode 100644
index 00000000..0a1673d6
--- /dev/null
+++ b/app/jobs/events.py
@@ -0,0 +1,138 @@
+from flask import current_app
+from flask_login import current_user
+from flask_socketio import Namespace
+from app import db, hashids, socketio
+from app.extensions.flask_socketio import admin_required, login_required
+from app.models import Job, JobStatus
+
+
+class JobsNamespace(Namespace):
+ @login_required
+ def on_delete(self, job_hashid: str):
+ # Decode the job hashid
+ job_id = hashids.decode(job_hashid)
+
+ # Validate job_id
+ if not isinstance(job_id, int):
+ return {
+ 'code': 400,
+ 'body': 'job_id is invalid'
+ }
+
+ # Load job from database
+ job = Job.query.get(job_id)
+ if job is None:
+ return {
+ 'code': 404,
+ 'body': 'Job not found'
+ }
+
+ # Check if the current user is allowed to delete the job
+ if not (job.user == current_user or current_user.is_administrator):
+ return {
+ 'code': 403,
+ 'body': 'Forbidden'
+ }
+
+ # TODO: This should be a method in the Job model
+ def _delete_job(app, job_id):
+ with app.app_context():
+ job = Job.query.get(job_id)
+ job.delete()
+ db.session.commit()
+
+ # Delete the job in a background task
+ socketio.start_background_task(
+ target=_delete_job,
+ app=current_app._get_current_object(),
+ job_id=job_id
+ )
+
+ return {
+ 'code': 202,
+ 'body': f'Job "{job.title}" marked for deletion'
+ }
+
+
+ @admin_required
+ def on_get_log(self, job_hashid: str):
+ # Decode the job hashid
+ job_id = hashids.decode(job_hashid)
+
+ # Validate job_id
+ if not isinstance(job_id, int):
+ return {
+ 'code': 400,
+ 'body': 'job_id is invalid'
+ }
+
+ # Load job from database
+ job = Job.query.get(job_id)
+ if job is None:
+ return {
+ 'code': 404,
+ 'body': 'Job not found'
+ }
+
+ # Check if the job is already processed
+ if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]:
+ return {
+ 'code': 409,
+ 'body': 'Job is not done processing'
+ }
+
+ # Read the log file
+ with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file:
+ job_log = log_file.read()
+
+ return {
+ 'code': 200,
+ 'body': job_log
+ }
+
+
+ @login_required
+ def on_restart(self, job_hashid: str):
+ # Decode the job hashid
+ job_id = hashids.decode(job_hashid)
+
+ # Validate job_id
+ if not isinstance(job_id, int):
+ return {
+ 'code': 400,
+ 'body': 'job_id is invalid'
+ }
+
+ # Load job from database
+ job = Job.query.get(job_id)
+ if job is None:
+ return {
+ 'code': 404,
+ 'body': 'Job not found'
+ }
+
+ # Check if the current user is allowed to restart the job
+ if not (job.user == current_user or current_user.is_administrator):
+ return {
+ 'code': 403,
+ 'body': 'Forbidden'
+ }
+
+ # TODO: This should be a method in the Job model
+ def _restart_job(app, job_id):
+ with app.app_context():
+ job = Job.query.get(job_id)
+ job.restart()
+ db.session.commit()
+
+ # Restart the job in a background task
+ socketio.start_background_task(
+ target=_restart_job,
+ app=current_app._get_current_object(),
+ job_id=job_id
+ )
+
+ return {
+ 'code': 202,
+ 'body': f'Job "{job.title}" restarted'
+ }