from flask import abort, current_app, request from flask_login import current_user from flask_socketio import disconnect from functools import wraps from threading import Thread def admin_required(f): @wraps(f) def wrapped(*args, **kwargs): if current_user.is_administrator: return f(*args, **kwargs) else: abort(403) return wrapped def background(f): ''' ' This decorator executes a function in a Thread. ' Decorated functions need to be executed within a code block where an ' app context exists. ' ' NOTE: An app object is passed as a keyword argument to the decorated ' function. ''' @wraps(f) def wrapped(*args, **kwargs): kwargs['app'] = current_app._get_current_object() thread = Thread(target=f, args=args, kwargs=kwargs) thread.start() return thread return wrapped def socketio_admin_required(f): @wraps(f) def wrapped(*args, **kwargs): if current_user.is_administrator: return f(*args, **kwargs) else: response = {'code': 401, 'desc': 'Unauthorized'} socketio.emit(request.event['message'], response, room=request.sid) return wrapped def socketio_login_required(f): @wraps(f) def wrapped(*args, **kwargs): if current_user.is_authenticated: return f(*args, **kwargs) else: response = {'code': 401, 'desc': 'Unauthorized'} socketio.emit(request.event['message'], response, room=request.sid) return wrapped