from flask_login import current_user from flask_socketio import join_room, leave_room from app import hashids, socketio from app.decorators import socketio_login_required from app.models import User @socketio.on('SUBSCRIBE User') @socketio_login_required def subscribe(user_hashid: str) -> dict: if not isinstance(user_hashid, str): return {'status': 400, 'statusText': 'Bad Request'} user_id = hashids.decode(user_hashid) if not isinstance(user_id, int): return {'status': 400, 'statusText': 'Bad Request'} user = User.query.get(user_id) if user is None: return {'status': 404, 'statusText': 'Not Found'} if not ( user == current_user or current_user.is_administrator ): return {'status': 403, 'statusText': 'Forbidden'} join_room(f'/users/{user.hashid}') return {'status': 200, 'statusText': 'OK'} @socketio.on('UNSUBSCRIBE User') @socketio_login_required def unsubscribe(user_hashid: str) -> dict: if not isinstance(user_hashid, str): return {'status': 400, 'statusText': 'Bad Request'} user_id = hashids.decode(user_hashid) if not isinstance(user_id, int): return {'status': 400, 'statusText': 'Bad Request'} user = User.query.get(user_id) if user is None: return {'status': 404, 'statusText': 'Not Found'} if not ( user == current_user or current_user.is_administrator ): return {'status': 403, 'statusText': 'Forbidden'} leave_room(f'/users/{user.hashid}') return {'status': 200, 'statusText': 'OK'}