mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-04 12:22:47 +00:00 
			
		
		
		
	Create dedicated '/users' Socket.IO Namespace
This commit is contained in:
		@@ -132,6 +132,9 @@ def create_app(config: Config = Config) -> Flask:
 | 
			
		||||
    # region SocketIO Namespaces
 | 
			
		||||
    from .namespaces.cqi_over_sio import CQiOverSocketIONamespace
 | 
			
		||||
    socketio.on_namespace(CQiOverSocketIONamespace('/cqi_over_sio'))
 | 
			
		||||
 | 
			
		||||
    from .namespaces.users import UsersNamespace
 | 
			
		||||
    socketio.on_namespace(UsersNamespace('/users'))
 | 
			
		||||
    # endregion SocketIO Namespaces
 | 
			
		||||
 | 
			
		||||
    # region Database event Listeners
 | 
			
		||||
 
 | 
			
		||||
@@ -1,138 +0,0 @@
 | 
			
		||||
from flask import current_app
 | 
			
		||||
from flask_login import current_user
 | 
			
		||||
from flask_socketio import Namespace
 | 
			
		||||
from app import db, hashids, socketio
 | 
			
		||||
from app.extensions.flask_socketio import admin_required, login_required
 | 
			
		||||
from app.models import Job, JobStatus
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class JobsNamespace(Namespace):
 | 
			
		||||
    @login_required
 | 
			
		||||
    def on_delete(self, job_hashid: str):
 | 
			
		||||
        # Decode the job hashid
 | 
			
		||||
        job_id = hashids.decode(job_hashid)
 | 
			
		||||
 | 
			
		||||
        # Validate job_id
 | 
			
		||||
        if not isinstance(job_id, int):
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 400,
 | 
			
		||||
                'body': 'job_id is invalid'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Load job from database
 | 
			
		||||
        job = Job.query.get(job_id)
 | 
			
		||||
        if job is None:
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 404,
 | 
			
		||||
                'body': 'Job not found'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Check if the current user is allowed to delete the job
 | 
			
		||||
        if not (job.user == current_user or current_user.is_administrator):
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 403,
 | 
			
		||||
                'body': 'Forbidden'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # TODO: This should be a method in the Job model
 | 
			
		||||
        def _delete_job(app, job_id):
 | 
			
		||||
            with app.app_context():
 | 
			
		||||
                job = Job.query.get(job_id)
 | 
			
		||||
                job.delete()
 | 
			
		||||
                db.session.commit()
 | 
			
		||||
 | 
			
		||||
        # Delete the job in a background task
 | 
			
		||||
        socketio.start_background_task(
 | 
			
		||||
            target=_delete_job,
 | 
			
		||||
            app=current_app._get_current_object(),
 | 
			
		||||
            job_id=job_id
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            'code': 202,
 | 
			
		||||
            'body': f'Job "{job.title}" marked for deletion'
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @admin_required
 | 
			
		||||
    def on_get_log(self, job_hashid: str):
 | 
			
		||||
        # Decode the job hashid
 | 
			
		||||
        job_id = hashids.decode(job_hashid)
 | 
			
		||||
 | 
			
		||||
        # Validate job_id
 | 
			
		||||
        if not isinstance(job_id, int):
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 400,
 | 
			
		||||
                'body': 'job_id is invalid'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Load job from database
 | 
			
		||||
        job = Job.query.get(job_id)
 | 
			
		||||
        if job is None:
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 404,
 | 
			
		||||
                'body': 'Job not found'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Check if the job is already processed
 | 
			
		||||
        if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]:
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 409,
 | 
			
		||||
                'body': 'Job is not done processing'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Read the log file
 | 
			
		||||
        with open(job.path / 'pipeline_data' / 'logs' / 'pyflow_log.txt') as log_file:
 | 
			
		||||
            job_log = log_file.read()
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            'code': 200,
 | 
			
		||||
            'body': job_log
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @login_required
 | 
			
		||||
    def on_restart(self, job_hashid: str):
 | 
			
		||||
        # Decode the job hashid
 | 
			
		||||
        job_id = hashids.decode(job_hashid)
 | 
			
		||||
 | 
			
		||||
        # Validate job_id
 | 
			
		||||
        if not isinstance(job_id, int):
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 400,
 | 
			
		||||
                'body': 'job_id is invalid'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Load job from database
 | 
			
		||||
        job = Job.query.get(job_id)
 | 
			
		||||
        if job is None:
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 404,
 | 
			
		||||
                'body': 'Job not found'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # Check if the current user is allowed to restart the job
 | 
			
		||||
        if not (job.user == current_user or current_user.is_administrator):
 | 
			
		||||
            return {
 | 
			
		||||
                'code': 403,
 | 
			
		||||
                'body': 'Forbidden'
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        # TODO: This should be a method in the Job model
 | 
			
		||||
        def _restart_job(app, job_id):
 | 
			
		||||
            with app.app_context():
 | 
			
		||||
                job = Job.query.get(job_id)
 | 
			
		||||
                job.restart()
 | 
			
		||||
                db.session.commit()
 | 
			
		||||
 | 
			
		||||
        # Restart the job in a background task
 | 
			
		||||
        socketio.start_background_task(
 | 
			
		||||
            target=_restart_job,
 | 
			
		||||
            app=current_app._get_current_object(),
 | 
			
		||||
            job_id=job_id
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            'code': 202,
 | 
			
		||||
            'body': f'Job "{job.title}" restarted'
 | 
			
		||||
        }
 | 
			
		||||
@@ -15,4 +15,4 @@ def before_request():
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from . import cli, events, json_routes, routes, settings
 | 
			
		||||
from . import cli, json_routes, routes, settings
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										78
									
								
								app/namespaces/users/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										78
									
								
								app/namespaces/users/__init__.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,78 @@
 | 
			
		||||
from flask_login import current_user
 | 
			
		||||
from flask_socketio import join_room, leave_room, Namespace
 | 
			
		||||
from app import hashids
 | 
			
		||||
from app.decorators import socketio_login_required
 | 
			
		||||
from app.models import User
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class UsersNamespace(Namespace):
 | 
			
		||||
    @socketio_login_required
 | 
			
		||||
    def on_get_user(self, user_hashid: str) -> dict:
 | 
			
		||||
        user_id = hashids.decode(user_hashid)
 | 
			
		||||
 | 
			
		||||
        if not isinstance(user_id, int):
 | 
			
		||||
            return {'code': 400, 'msg': 'Bad Request'}
 | 
			
		||||
 | 
			
		||||
        user = User.query.get(user_id)
 | 
			
		||||
 | 
			
		||||
        if user is None:
 | 
			
		||||
            return {'status': 404, 'statusText': 'Not found'}
 | 
			
		||||
 | 
			
		||||
        if not (
 | 
			
		||||
            user == current_user
 | 
			
		||||
            or current_user.is_administrator
 | 
			
		||||
        ):
 | 
			
		||||
            return {'status': 403, 'statusText': 'Forbidden'}
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            'body': user.to_json_serializeable(
 | 
			
		||||
                backrefs=True,
 | 
			
		||||
                relationships=True
 | 
			
		||||
            ),
 | 
			
		||||
            'status': 200,
 | 
			
		||||
            'statusText': 'OK'
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    @socketio_login_required
 | 
			
		||||
    def on_subscribe_user(self, user_hashid: str) -> dict:
 | 
			
		||||
        user_id = hashids.decode(user_hashid)
 | 
			
		||||
 | 
			
		||||
        if not isinstance(user_id, int):
 | 
			
		||||
            return {'code': 400, 'msg': 'Bad Request'}
 | 
			
		||||
 | 
			
		||||
        user = User.query.get(user_id)
 | 
			
		||||
 | 
			
		||||
        if user is None:
 | 
			
		||||
            return {'status': 404, 'statusText': 'Not found'}
 | 
			
		||||
 | 
			
		||||
        if not (
 | 
			
		||||
            user == current_user
 | 
			
		||||
            or current_user.is_administrator
 | 
			
		||||
        ):
 | 
			
		||||
            return {'status': 403, 'statusText': 'Forbidden'}
 | 
			
		||||
 | 
			
		||||
        join_room(f'/users/{user.hashid}')
 | 
			
		||||
 | 
			
		||||
        return {'status': 200, 'statusText': 'OK'}
 | 
			
		||||
 | 
			
		||||
    @socketio_login_required
 | 
			
		||||
    def on_unsubscribe_user(self, user_hashid: str) -> dict:
 | 
			
		||||
        user_id = hashids.decode(user_hashid)
 | 
			
		||||
 | 
			
		||||
        if not isinstance(user_id, int):
 | 
			
		||||
            return {'code': 400, 'msg': 'Bad Request'}
 | 
			
		||||
 | 
			
		||||
        user = User.query.get(user_id)
 | 
			
		||||
 | 
			
		||||
        if user is None:
 | 
			
		||||
            return {'status': 404, 'statusText': 'Not found'}
 | 
			
		||||
 | 
			
		||||
        if not (
 | 
			
		||||
            user == current_user
 | 
			
		||||
            or current_user.is_administrator
 | 
			
		||||
        ):
 | 
			
		||||
            return {'status': 403, 'statusText': 'Forbidden'}
 | 
			
		||||
 | 
			
		||||
        leave_room(f'/users/{user.hashid}')
 | 
			
		||||
 | 
			
		||||
        return {'status': 200, 'statusText': 'OK'}
 | 
			
		||||
@@ -4,7 +4,16 @@ nopaque.App = class App {
 | 
			
		||||
      promises: {getUser: {}, subscribeUser: {}},
 | 
			
		||||
      users: {},
 | 
			
		||||
    };
 | 
			
		||||
    this.socket = io({transports: ['websocket'], upgrade: false});
 | 
			
		||||
    this.sockets = {};
 | 
			
		||||
    this.sockets['/users'] = io(
 | 
			
		||||
      '/users',
 | 
			
		||||
      {
 | 
			
		||||
        transports: ['websocket'],
 | 
			
		||||
        upgrade: false
 | 
			
		||||
      }
 | 
			
		||||
    );
 | 
			
		||||
    // this.socket = io({transports: ['websocket'], upgrade: false});
 | 
			
		||||
    this.socket = this.sockets['/users'];
 | 
			
		||||
    this.socket.on('PATCH', (patch) => {this.onPatch(patch);});
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@@ -14,7 +23,7 @@ nopaque.App = class App {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.data.promises.getUser[userId] = new Promise((resolve, reject) => {
 | 
			
		||||
      this.socket.emit('GET /users/<user_id>', userId, (response) => {
 | 
			
		||||
      this.socket.emit('get_user', userId, (response) => {
 | 
			
		||||
        if (response.status === 200) {
 | 
			
		||||
          this.data.users[userId] = response.body;
 | 
			
		||||
          resolve(this.data.users[userId]);
 | 
			
		||||
@@ -33,7 +42,7 @@ nopaque.App = class App {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    this.data.promises.subscribeUser[userId] = new Promise((resolve, reject) => {
 | 
			
		||||
      this.socket.emit('SUBSCRIBE /users/<user_id>', userId, (response) => {
 | 
			
		||||
      this.socket.emit('subscribe_user', userId, (response) => {
 | 
			
		||||
        if (response.status !== 200) {
 | 
			
		||||
          reject(response);
 | 
			
		||||
          return;
 | 
			
		||||
@@ -82,7 +91,7 @@ nopaque.App = class App {
 | 
			
		||||
    let toastCloseActionElement = toast.el.querySelector('.action-button[data-action="close"]');
 | 
			
		||||
    toastCloseActionElement.addEventListener('click', () => {toast.dismiss();});
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
  onPatch(patch) {
 | 
			
		||||
    // Filter Patch to only include operations on users that are initialized
 | 
			
		||||
    let regExp = new RegExp(`^/users/(${Object.keys(this.data.users).join('|')})`);
 | 
			
		||||
@@ -139,7 +148,7 @@ nopaque.App = class App {
 | 
			
		||||
 | 
			
		||||
    /* Initialize Materialize Components */
 | 
			
		||||
    // #region
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
    // Automatically initialize Materialize Components that do not require
 | 
			
		||||
    // additional configuration.
 | 
			
		||||
    M.AutoInit();
 | 
			
		||||
@@ -173,7 +182,7 @@ nopaque.App = class App {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    /* Initialize nopaque Components */
 | 
			
		||||
    // #region 
 | 
			
		||||
    // #region
 | 
			
		||||
    nopaque.resource_displays.AutoInit();
 | 
			
		||||
    nopaque.resource_lists.AutoInit();
 | 
			
		||||
    nopaque.forms.AutoInit();
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user