nopaque/app/events/socketio.py

74 lines
2.4 KiB
Python
Raw Normal View History

from flask import request
from flask_login import current_user
from flask_socketio import join_room, leave_room
from .. import socketio
from ..decorators import socketio_login_required
from ..models import User
2019-09-02 08:59:13 +00:00
'''
' A list containing session ids of Socket.IO sessions, to keep track
' of all connected sessions, which can be used to determine the runtimes of
2019-11-07 09:44:02 +00:00
' associated background tasks.
'''
sessions = []
###############################################################################
# Socket.IO event handlers #
###############################################################################
@socketio.on('connect')
@socketio_login_required
def socketio_connect():
'''
2019-11-07 09:44:02 +00:00
' The Socket.IO module creates a session id (sid) for each request.
' On connect the sid is saved in the sessions list.
'''
sessions.append(request.sid)
return {'code': 200, 'msg': 'OK'}
@socketio.on('disconnect')
def socketio_disconnect():
'''
' On disconnect the session id gets removed from the sessions list.
'''
2021-08-23 14:43:47 +00:00
try:
sessions.remove(request.sid)
2021-08-23 14:43:47 +00:00
except ValueError:
pass
return {'code': 200, 'msg': 'OK'}
2020-12-15 13:38:52 +00:00
@socketio.on('start_user_session')
@socketio_login_required
def socketio_start_user_session(user_id):
user = User.query.get(user_id)
if user is None:
response = {'code': 404, 'msg': 'Not found'}
socketio.emit('start_user_session', response, room=request.sid)
elif not (user == current_user or current_user.is_administrator):
response = {'code': 403, 'msg': 'Forbidden'}
socketio.emit('start_user_session', response, room=request.sid)
else:
response = {'code': 200, 'msg': 'OK'}
socketio.emit('start_user_session', response, room=request.sid)
socketio.emit('user_{}_init'.format(user.id), user.to_dict(),
room=request.sid)
room = 'user_{}'.format(user.id)
join_room(room)
2019-11-07 09:44:02 +00:00
@socketio.on('users.request')
@socketio_login_required
def socketio_start_session(user_id):
user = User.query.get(user_id)
if user is None:
response = {'code': 404, 'msg': 'Not found'}
elif not (user == current_user or current_user.is_administrator):
response = {'code': 403, 'msg': 'Forbidden'}
else:
response = {'code': 200, 'msg': 'OK', 'payload': user.to_dict()}
join_room('users.{}'.format(user.id))
return response