From 289a551122fe6106d2fb653ac938338e993aadaa Mon Sep 17 00:00:00 2001
From: Patrick Jentsch
Date: Wed, 6 Nov 2024 13:04:30 +0100
Subject: [PATCH] Create dedicated '/users' Socket.IO Namespace
---
app/__init__.py | 3 +
app/blueprints/jobs/events.py | 138 -------------------------------
app/blueprints/users/__init__.py | 2 +-
app/namespaces/users/__init__.py | 78 +++++++++++++++++
app/static/js/app.js | 21 +++--
5 files changed, 97 insertions(+), 145 deletions(-)
delete mode 100644 app/blueprints/jobs/events.py
create mode 100644 app/namespaces/users/__init__.py
diff --git a/app/__init__.py b/app/__init__.py
index eb4f7bd6..9953a1cf 100644
--- a/app/__init__.py
+++ b/app/__init__.py
@@ -132,6 +132,9 @@ def create_app(config: Config = Config) -> Flask:
# region SocketIO Namespaces
from .namespaces.cqi_over_sio import CQiOverSocketIONamespace
socketio.on_namespace(CQiOverSocketIONamespace('/cqi_over_sio'))
+
+ from .namespaces.users import UsersNamespace
+ socketio.on_namespace(UsersNamespace('/users'))
# endregion SocketIO Namespaces
# region Database event Listeners
diff --git a/app/blueprints/jobs/events.py b/app/blueprints/jobs/events.py
deleted file mode 100644
index 0a1673d6..00000000
--- a/app/blueprints/jobs/events.py
+++ /dev/null
@@ -1,138 +0,0 @@
-from flask import current_app
-from flask_login import current_user
-from flask_socketio import Namespace
-from app import db, hashids, socketio
-from app.extensions.flask_socketio import admin_required, login_required
-from app.models import Job, JobStatus
-
-
-class JobsNamespace(Namespace):
- @login_required
- def on_delete(self, job_hashid: str):
- # Decode the job hashid
- job_id = hashids.decode(job_hashid)
-
- # Validate job_id
- if not isinstance(job_id, int):
- return {
- 'code': 400,
- 'body': 'job_id is invalid'
- }
-
- # Load job from database
- job = Job.query.get(job_id)
- if job is None:
- return {
- 'code': 404,
- 'body': 'Job not found'
- }
-
- # Check if the current user is allowed to delete the job
- if not (job.user == current_user or current_user.is_administrator):
- return {
- 'code': 403,
- 'body': 'Forbidden'
- }
-
- # TODO: This should be a method in the Job model
- def _delete_job(app, job_id):
- with app.app_context():
- job = Job.query.get(job_id)
- job.delete()
- db.session.commit()
-
- # Delete the job in a background task
- socketio.start_background_task(
- target=_delete_job,
- app=current_app._get_current_object(),
- job_id=job_id
- )
-
- return {
- 'code': 202,
- 'body': f'Job "{job.title}" marked for deletion'
- }
-
-
- @admin_required
- def on_get_log(self, job_hashid: str):
- # Decode the job hashid
- job_id = hashids.decode(job_hashid)
-
- # Validate job_id
- if not isinstance(job_id, int):
- return {
- 'code': 400,
- 'body': 'job_id is invalid'
- }
-
- # Load job from database
- job = Job.query.get(job_id)
- if job is None:
- return {
- 'code': 404,
- 'body': 'Job not found'
- }
-
- # Check if the job is already processed
- if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]:
- return {
- 'code': 409,
- 'body': 'Job is not done processing'
- }
-
- # Read the log file
- with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file:
- job_log = log_file.read()
-
- return {
- 'code': 200,
- 'body': job_log
- }
-
-
- @login_required
- def on_restart(self, job_hashid: str):
- # Decode the job hashid
- job_id = hashids.decode(job_hashid)
-
- # Validate job_id
- if not isinstance(job_id, int):
- return {
- 'code': 400,
- 'body': 'job_id is invalid'
- }
-
- # Load job from database
- job = Job.query.get(job_id)
- if job is None:
- return {
- 'code': 404,
- 'body': 'Job not found'
- }
-
- # Check if the current user is allowed to restart the job
- if not (job.user == current_user or current_user.is_administrator):
- return {
- 'code': 403,
- 'body': 'Forbidden'
- }
-
- # TODO: This should be a method in the Job model
- def _restart_job(app, job_id):
- with app.app_context():
- job = Job.query.get(job_id)
- job.restart()
- db.session.commit()
-
- # Restart the job in a background task
- socketio.start_background_task(
- target=_restart_job,
- app=current_app._get_current_object(),
- job_id=job_id
- )
-
- return {
- 'code': 202,
- 'body': f'Job "{job.title}" restarted'
- }
diff --git a/app/blueprints/users/__init__.py b/app/blueprints/users/__init__.py
index d305e242..21e9c382 100644
--- a/app/blueprints/users/__init__.py
+++ b/app/blueprints/users/__init__.py
@@ -15,4 +15,4 @@ def before_request():
pass
-from . import cli, events, json_routes, routes, settings
+from . import cli, json_routes, routes, settings
diff --git a/app/namespaces/users/__init__.py b/app/namespaces/users/__init__.py
new file mode 100644
index 00000000..37025224
--- /dev/null
+++ b/app/namespaces/users/__init__.py
@@ -0,0 +1,78 @@
+from flask_login import current_user
+from flask_socketio import join_room, leave_room, Namespace
+from app import hashids
+from app.decorators import socketio_login_required
+from app.models import User
+
+
+class UsersNamespace(Namespace):
+ @socketio_login_required
+ def on_get_user(self, user_hashid: str) -> dict:
+ user_id = hashids.decode(user_hashid)
+
+ if not isinstance(user_id, int):
+ return {'code': 400, 'msg': 'Bad Request'}
+
+ user = User.query.get(user_id)
+
+ if user is None:
+ return {'status': 404, 'statusText': 'Not found'}
+
+ if not (
+ user == current_user
+ or current_user.is_administrator
+ ):
+ return {'status': 403, 'statusText': 'Forbidden'}
+
+ return {
+ 'body': user.to_json_serializeable(
+ backrefs=True,
+ relationships=True
+ ),
+ 'status': 200,
+ 'statusText': 'OK'
+ }
+
+ @socketio_login_required
+ def on_subscribe_user(self, user_hashid: str) -> dict:
+ user_id = hashids.decode(user_hashid)
+
+ if not isinstance(user_id, int):
+ return {'code': 400, 'msg': 'Bad Request'}
+
+ user = User.query.get(user_id)
+
+ if user is None:
+ return {'status': 404, 'statusText': 'Not found'}
+
+ if not (
+ user == current_user
+ or current_user.is_administrator
+ ):
+ return {'status': 403, 'statusText': 'Forbidden'}
+
+ join_room(f'/users/{user.hashid}')
+
+ return {'status': 200, 'statusText': 'OK'}
+
+ @socketio_login_required
+ def on_unsubscribe_user(self, user_hashid: str) -> dict:
+ user_id = hashids.decode(user_hashid)
+
+ if not isinstance(user_id, int):
+ return {'code': 400, 'msg': 'Bad Request'}
+
+ user = User.query.get(user_id)
+
+ if user is None:
+ return {'status': 404, 'statusText': 'Not found'}
+
+ if not (
+ user == current_user
+ or current_user.is_administrator
+ ):
+ return {'status': 403, 'statusText': 'Forbidden'}
+
+ leave_room(f'/users/{user.hashid}')
+
+ return {'status': 200, 'statusText': 'OK'}
diff --git a/app/static/js/app.js b/app/static/js/app.js
index 8e291ba9..688af5ab 100644
--- a/app/static/js/app.js
+++ b/app/static/js/app.js
@@ -4,7 +4,16 @@ nopaque.App = class App {
promises: {getUser: {}, subscribeUser: {}},
users: {},
};
- this.socket = io({transports: ['websocket'], upgrade: false});
+ this.sockets = {};
+ this.sockets['/users'] = io(
+ '/users',
+ {
+ transports: ['websocket'],
+ upgrade: false
+ }
+ );
+ // this.socket = io({transports: ['websocket'], upgrade: false});
+ this.socket = this.sockets['/users'];
this.socket.on('PATCH', (patch) => {this.onPatch(patch);});
}
@@ -14,7 +23,7 @@ nopaque.App = class App {
}
this.data.promises.getUser[userId] = new Promise((resolve, reject) => {
- this.socket.emit('GET /users/', userId, (response) => {
+ this.socket.emit('get_user', userId, (response) => {
if (response.status === 200) {
this.data.users[userId] = response.body;
resolve(this.data.users[userId]);
@@ -33,7 +42,7 @@ nopaque.App = class App {
}
this.data.promises.subscribeUser[userId] = new Promise((resolve, reject) => {
- this.socket.emit('SUBSCRIBE /users/', userId, (response) => {
+ this.socket.emit('subscribe_user', userId, (response) => {
if (response.status !== 200) {
reject(response);
return;
@@ -82,7 +91,7 @@ nopaque.App = class App {
let toastCloseActionElement = toast.el.querySelector('.action-button[data-action="close"]');
toastCloseActionElement.addEventListener('click', () => {toast.dismiss();});
}
-
+
onPatch(patch) {
// Filter Patch to only include operations on users that are initialized
let regExp = new RegExp(`^/users/(${Object.keys(this.data.users).join('|')})`);
@@ -139,7 +148,7 @@ nopaque.App = class App {
/* Initialize Materialize Components */
// #region
-
+
// Automatically initialize Materialize Components that do not require
// additional configuration.
M.AutoInit();
@@ -173,7 +182,7 @@ nopaque.App = class App {
/* Initialize nopaque Components */
- // #region
+ // #region
nopaque.resource_displays.AutoInit();
nopaque.resource_lists.AutoInit();
nopaque.forms.AutoInit();