from apifairy import authenticate, body, response, other_responses from flask import Blueprint, request, abort from app import db from app.models import Token, User from .auth import basic_auth from .schemas import EmptySchema, TokenSchema bp = Blueprint('tokens', __name__) token_schema = TokenSchema() @bp.route('', methods=['DELETE']) @response(EmptySchema, status_code=204, description='Token revoked') @other_responses({401: 'Invalid access token'}) def delete_token(): """Revoke an access token""" access_token = request.headers['Authorization'].split()[1] token = Token.query.filter(Token.access_token == access_token).first() if token is None: # pragma: no cover abort(401) token.expire() db.session.commit() return {} @bp.route('', methods=['POST']) @authenticate(basic_auth) @response(token_schema) @other_responses({401: 'Invalid username or password'}) def create_token(): """Create new access and refresh tokens""" user = basic_auth.current_user() token = user.generate_auth_token() db.session.add(token) Token.clean() # keep token table clean of old tokens db.session.commit() return token, 200 @bp.route('', methods=['PUT']) @body(token_schema) @response(token_schema, description='Newly issued access and refresh tokens') @other_responses({401: 'Invalid access or refresh token'}) def refresh_token(args): """Refresh an access token""" access_token = args.get('access_token') refresh_token = args.get('refresh_token') if access_token is None or refresh_token is None: abort(401) token = User.verify_refresh_token(refresh_token, access_token) if token is None: abort(401) token.expire() new_token = token.user.generate_auth_token() db.session.add_all([token, new_token]) db.session.commit() return new_token, 200