Merge branch 'public-corpus' of gitlab.ub.uni-bielefeld.de:sfb1288inf/nopaque into public-corpus

This commit is contained in:
Patrick Jentsch
2023-03-01 16:33:03 +01:00
10 changed files with 150 additions and 64 deletions

View File

@ -3,27 +3,24 @@ from flask_login import current_user
from functools import wraps
from app.models import Corpus, CorpusFollowerAssociation
def corpus_follower_permission_required(permissions):
def corpus_follower_permission_required(*permissions):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
corpus_id = kwargs.get('corpus_id')
corpus = Corpus.query.get_or_404(corpus_id)
if current_user == corpus.user or current_user.is_administrator():
print('user or admin')
return f(*args, **kwargs)
if not current_user.is_following_corpus(corpus):
print('not following corpus')
abort(403)
corpus_follower_association = CorpusFollowerAssociation.query.filter_by(corpus_id=corpus_id, follower_id=current_user.id).first_or_404()
for permission in permissions:
if not corpus_follower_association.role.has_permission(permission):
abort(403)
if not all([corpus_follower_association.role.has_permission(p) for p in permissions]):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def owner_or_admin_required():
def corpus_owner_or_admin_required():
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):

View File

@ -16,7 +16,7 @@ from flask_login import current_user, login_required
from threading import Thread
import jwt
import os
from .decorators import corpus_follower_permission_required, owner_or_admin_required
from .decorators import corpus_follower_permission_required, corpus_owner_or_admin_required
from app import db, hashids
from app.models import (
Corpus,
@ -34,12 +34,6 @@ from .forms import (
UpdateCorpusFileForm
)
@bp.route('/<hashid:corpus_id>/test')
@login_required
@corpus_follower_permission_required(['VIEW', 'ADD_CORPUS_FILE'])
def test(corpus_id):
return 'ok'
@bp.route('/fake-add')
@login_required
def fake_add():
@ -52,7 +46,7 @@ def fake_add():
@bp.route('/<hashid:corpus_id>/is_public', methods=['POST'])
@login_required
@owner_or_admin_required()
@corpus_owner_or_admin_required()
def update_corpus_is_public(corpus_id):
is_public = request.json
if not isinstance(is_public, bool):
@ -67,7 +61,7 @@ def update_corpus_is_public(corpus_id):
@bp.route('/<hashid:corpus_id>/followers/add', methods=['POST'])
@login_required
@owner_or_admin_required()
@corpus_owner_or_admin_required()
def add_corpus_followers(corpus_id):
usernames = request.json
if not (isinstance(usernames, list) or all(isinstance(u, str) for u in usernames)):
@ -124,7 +118,7 @@ def current_user_unfollow_corpus(corpus_id):
@bp.route('/<hashid:corpus_id>/followers/<hashid:follower_id>/role', methods=['POST'])
@corpus_follower_permission_required(['REMOVE_FOLLOWER', 'UPDATE_FOLLOWER'])
@corpus_follower_permission_required('REMOVE_FOLLOWER', 'UPDATE_FOLLOWER')
def add_permission(corpus_id, follower_id):
corpus_follower_association = CorpusFollowerAssociation.query.filter_by(corpus_id=corpus_id, follower_id=follower_id).first_or_404()
if not (corpus_follower_association.corpus.user == current_user or current_user.is_administrator()):
@ -218,6 +212,7 @@ def generate_corpus_share_link(corpus_id):
@bp.route('/<hashid:corpus_id>', methods=['DELETE'])
@login_required
@corpus_owner_or_admin_required()
def delete_corpus(corpus_id):
def _delete_corpus(app, corpus_id):
with app.app_context():
@ -226,8 +221,6 @@ def delete_corpus(corpus_id):
db.session.commit()
corpus = Corpus.query.get_or_404(corpus_id)
if not (corpus.user == current_user or current_user.is_administrator()):
abort(403)
thread = Thread(
target=_delete_corpus,
args=(current_app._get_current_object(), corpus_id)
@ -238,12 +231,9 @@ def delete_corpus(corpus_id):
@bp.route('/<hashid:corpus_id>/analyse')
@login_required
@corpus_follower_permission_required('VIEW')
def analyse_corpus(corpus_id):
corpus = Corpus.query.get_or_404(corpus_id)
if not (corpus.user == current_user
or current_user.is_administrator()
or current_user.is_following_corpus(corpus)):
abort(403)
return render_template(
'corpora/analyse_corpus.html.j2',
corpus=corpus,
@ -253,6 +243,7 @@ def analyse_corpus(corpus_id):
@bp.route('/<hashid:corpus_id>/build', methods=['POST'])
@login_required
@corpus_owner_or_admin_required()
def build_corpus(corpus_id):
def _build_corpus(app, corpus_id):
with app.app_context():
@ -277,6 +268,7 @@ def build_corpus(corpus_id):
@bp.route('/<hashid:corpus_id>/files/create', methods=['GET', 'POST'])
@login_required
@corpus_follower_permission_required('ADD_CORPUS_FILE')
def create_corpus_file(corpus_id):
corpus = Corpus.query.get_or_404(corpus_id)
if not (corpus.user == current_user or current_user.is_administrator()):
@ -324,10 +316,9 @@ def create_corpus_file(corpus_id):
@bp.route('/<hashid:corpus_id>/files/<hashid:corpus_file_id>', methods=['GET', 'POST'])
@login_required
@corpus_follower_permission_required('ADD_CORPUS_FILE', 'UPDATE_CORPUS_FILE', 'REMOVE_CORPUS_FILE')
def corpus_file(corpus_id, corpus_file_id):
corpus_file = CorpusFile.query.filter_by(corpus_id = corpus_id, id=corpus_file_id).first_or_404()
if not (corpus_file.corpus.user == current_user or current_user.is_administrator()):
abort(403)
form = UpdateCorpusFileForm(data=corpus_file.to_json_serializeable())
if form.validate_on_submit():
form.populate_obj(corpus_file)
@ -348,6 +339,7 @@ def corpus_file(corpus_id, corpus_file_id):
@bp.route('/<hashid:corpus_id>/files/<hashid:corpus_file_id>', methods=['DELETE'])
@login_required
@corpus_follower_permission_required('REMOVE_CORPUS_FILE')
def delete_corpus_file(corpus_id, corpus_file_id):
def _delete_corpus_file(app, corpus_file_id):
with app.app_context():
@ -368,6 +360,7 @@ def delete_corpus_file(corpus_id, corpus_file_id):
@bp.route('/<hashid:corpus_id>/files/<hashid:corpus_file_id>/download')
@login_required
@corpus_follower_permission_required('VIEW')
def download_corpus_file(corpus_id, corpus_file_id):
corpus_file = CorpusFile.query.filter_by(corpus_id = corpus_id, id=corpus_file_id).first_or_404()
if not (corpus_file.corpus.user == current_user or current_user.is_administrator()):