From df2bffe0fd28c122fb20056b34f3058e282ca02f Mon Sep 17 00:00:00 2001
From: Patrick Jentsch 
Date: Tue, 3 Dec 2024 16:09:14 +0100
Subject: [PATCH] implement first version of jobs socketio namespace
---
 app/blueprints/users/events.py | 122 ---------------------------------
 app/namespaces/jobs.py         | 109 +++++++++++++++++++++++++++++
 2 files changed, 109 insertions(+), 122 deletions(-)
 delete mode 100644 app/blueprints/users/events.py
 create mode 100644 app/namespaces/jobs.py
diff --git a/app/blueprints/users/events.py b/app/blueprints/users/events.py
deleted file mode 100644
index 4f1284a6..00000000
--- a/app/blueprints/users/events.py
+++ /dev/null
@@ -1,122 +0,0 @@
-from flask import current_app, Flask
-from flask_login import current_user
-from flask_socketio import join_room, leave_room
-from app import db, hashids, socketio
-from app.decorators import socketio_login_required
-from app.models import User
-
-
-def _delete_user(app: Flask, user_id: int):
-    with app.app_context():
-        user = User.query.get(user_id)
-        user.delete()
-        db.session.commit()
-
-
-@socketio.on('users.delete')
-@socketio_login_required
-def delete_user(user_hashid: str) -> dict:
-    user_id = hashids.decode(user_hashid)
-
-    if not isinstance(user_id, int):
-        return {'status': 400, 'statusText': 'Bad Request'}
-
-    user = User.query.get(user_id)
-
-    if user is None:
-        return {'status': 404, 'statusText': 'Not found'}
-
-    if not (
-        user == current_user
-        or current_user.is_administrator
-    ):
-        return {'status': 403, 'statusText': 'Forbidden'}
-
-    socketio.start_background_task(
-        _delete_user,
-        current_app._get_current_object(),
-        user.id
-    )
-
-    return {
-        'body': f'User "{user.username}" marked for deletion',
-        'status': 202,
-        'statusText': 'Accepted'
-    }
-
-
-@socketio.on('users.get')
-@socketio_login_required
-def get_user(user_hashid: str) -> dict:
-    user_id = hashids.decode(user_hashid)
-
-    if not isinstance(user_id, int):
-        return {'status': 400, 'statusText': 'Bad Request'}
-
-    user = User.query.get(user_id)
-
-    if user is None:
-        return {'status': 404, 'statusText': 'Not found'}
-
-    if not (
-        user == current_user
-        or current_user.is_administrator
-    ):
-        return {'status': 403, 'statusText': 'Forbidden'}
-
-    return {
-        'body': user.to_json_serializeable(
-            backrefs=True,
-            relationships=True
-        ),
-        'status': 200,
-        'statusText': 'OK'
-    }
-
-
-@socketio.on('users.subscribe')
-@socketio_login_required
-def subscribe_user(user_hashid: str) -> dict:
-    user_id = hashids.decode(user_hashid)
-
-    if not isinstance(user_id, int):
-        return {'status': 400, 'statusText': 'Bad Request'}
-
-    user = User.query.get(user_id)
-
-    if user is None:
-        return {'status': 404, 'statusText': 'Not found'}
-
-    if not (
-        user == current_user
-        or current_user.is_administrator
-    ):
-        return {'status': 403, 'statusText': 'Forbidden'}
-
-    join_room(f'/users/{user.hashid}')
-
-    return {'status': 200, 'statusText': 'OK'}
-
-
-@socketio.on('users.unsubscribe')
-@socketio_login_required
-def unsubscribe_user(user_hashid: str) -> dict:
-    user_id = hashids.decode(user_hashid)
-
-    if not isinstance(user_id, int):
-        return {'status': 400, 'statusText': 'Bad Request'}
-
-    user = User.query.get(user_id)
-
-    if user is None:
-        return {'status': 404, 'statusText': 'Not found'}
-
-    if not (
-        user == current_user
-        or current_user.is_administrator
-    ):
-        return {'status': 403, 'statusText': 'Forbidden'}
-
-    leave_room(f'/users/{user.hashid}')
-
-    return {'status': 200, 'statusText': 'OK'}
diff --git a/app/namespaces/jobs.py b/app/namespaces/jobs.py
new file mode 100644
index 00000000..03da5543
--- /dev/null
+++ b/app/namespaces/jobs.py
@@ -0,0 +1,109 @@
+from flask import current_app, Flask
+from flask_login import current_user
+from flask_socketio import Namespace
+from app import db, hashids, socketio
+from app.decorators import socketio_admin_required, socketio_login_required
+from app.models import Job, JobStatus
+
+
+def _delete_job(app: Flask, job_id: int):
+    with app.app_context():
+        job = Job.query.get(job_id)
+        job.delete()
+        db.session.commit()
+
+
+def _restart_job(app, job_id):
+    with app.app_context():
+        job = Job.query.get(job_id)
+        job.restart()
+        db.session.commit()
+
+
+class UsersNamespace(Namespace):
+    @socketio_login_required
+    def on_delete(self, job_hashid: str) -> dict:
+        job_id = hashids.decode(job_hashid)
+
+        if not isinstance(job_id, int):
+            return {'status': 400, 'statusText': 'Bad Request'}
+
+        job = Job.query.get(job_id)
+
+        if job is None:
+            return {'status': 404, 'statusText': 'Not found'}
+
+        if not (
+            job.user == current_user
+            or current_user.is_administrator
+        ):
+            return {'status': 403, 'statusText': 'Forbidden'}
+
+        socketio.start_background_task(
+            _delete_job,
+            current_app._get_current_object(),
+            job_id
+        )
+
+        return {
+            'body': f'Job "{job.title}" marked for deletion',
+            'status': 202,
+            'statusText': 'Accepted'
+        }
+
+    @socketio_admin_required
+    def on_log(self, job_hashid: str):
+        job_id = hashids.decode(job_hashid)
+
+        if not isinstance(job_id, int):
+            return {'status': 400, 'statusText': 'Bad Request'}
+
+        job = Job.query.get(job_id)
+
+        if job is None:
+            return {'status': 404, 'statusText': 'Not found'}
+
+        if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]:
+            return {'status': 409, 'statusText': 'Conflict'}
+
+        with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file:
+            log = log_file.read()
+
+        return {
+            'body': log,
+            'status': 200,
+            'statusText': 'Forbidden'
+        }
+
+    socketio_login_required
+    def on_restart(self, job_hashid: str):
+        job_id = hashids.decode(job_hashid)
+
+        if not isinstance(job_id, int):
+            return {'status': 400, 'statusText': 'Bad Request'}
+
+        job = Job.query.get(job_id)
+
+        if job is None:
+            return {'status': 404, 'statusText': 'Not found'}
+
+        if not (
+            job.user == current_user
+            or current_user.is_administrator
+        ):
+            return {'status': 403, 'statusText': 'Forbidden'}
+
+        if job.status == JobStatus.FAILED:
+            return {'status': 409, 'statusText': 'Conflict'}
+
+        socketio.start_background_task(
+            _restart_job,
+            current_app._get_current_object(),
+            job_id
+        )
+
+        return {
+            'body': f'Job "{job.title}" marked for restarting',
+            'status': 202,
+            'statusText': 'Accepted'
+        }