nopaque/app/models.py

304 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import current_app
from flask_login import UserMixin, AnonymousUserMixin
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from werkzeug.security import generate_password_hash, check_password_hash
from . import db
from . import login_manager
from datetime import datetime
class Permission:
"""
Defines User permissions as integers by the power of 2. User permission
can be evaluated using the bitwise operator &. 3 equals to CREATE_JOB and
DELETE_JOB and so on.
"""
CREATE_JOB = 1
DELETE_JOB = 2
# WRITE = 4
# MODERATE = 8
ADMIN = 16
class Role(db.Model):
"""
Model for the different roles Users can have. Is a one-to-many relationship.
A Role can be associated with many User rows.
"""
__tablename__ = 'roles'
# Primary key
id = db.Column(db.Integer, primary_key=True)
default = db.Column(db.Boolean, default=False, index=True)
name = db.Column(db.String(64), unique=True)
permissions = db.Column(db.Integer)
# Relationships
users = db.relationship('User', backref='role', lazy='dynamic')
def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0
def __repr__(self):
"""
String representation of the Role. For human readability.
"""
return '<Role %r>' % self.name
def add_permission(self, perm):
"""
Add new permission to Role. Input is a Permission.
"""
if not self.has_permission(perm):
self.permissions += perm
def remove_permission(self, perm):
"""
Removes permission from a Role. Input a Permission.
"""
if self.has_permission(perm):
self.permissions -= perm
def reset_permissions(self):
"""
Resets permissions to zero. Zero equals no permissions at all.
"""
self.permissions = 0
def has_permission(self, perm):
"""
Checks if a Role has a specific Permission. Does this wit hthe bitwise
operator.
"""
return self.permissions & perm == perm
@staticmethod
def insert_roles():
"""
Inserts roles into the databes. This has to be executed befor Users are
added to the database. Otherwiese Users will not have a Role assigned
to them. Order of the roles dictionary determines the ID of each role.
User hast the ID 1 and Administrator has the ID 2.
"""
roles = {
'User': [Permission.CREATE_JOB],
'Administrator': [Permission.ADMIN,
Permission.CREATE_JOB,
Permission.DELETE_JOB]
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()
class User(UserMixin, db.Model):
"""
Model for Users that are registered to Opaque.
"""
__tablename__ = 'users'
# Primary key
id = db.Column(db.Integer, primary_key=True)
confirmed = db.Column(db.Boolean, default=False)
email = db.Column(db.String(64), unique=True, index=True)
password_hash = db.Column(db.String(128))
registration_date = db.Column(db.DateTime(), default=datetime.utcnow)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
username = db.Column(db.String(64), unique=True, index=True)
# Relationships
corpora = db.relationship('Corpus', backref='creator', lazy='dynamic',
cascade='save-update, merge, delete')
jobs = db.relationship('Job', backref='creator', lazy='dynamic',
cascade='save-update, merge, delete')
def __repr__(self):
"""
String representation of the User. For human readability.
"""
return '<User %r>' % self.username
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['OPAQUE_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()
def generate_confirmation_token(self, expiration=3600):
"""
Generates a confirmation token for user confirmation via email.
"""
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id}).decode('utf-8')
def generate_reset_token(self, expiration=3600):
"""
Generates a reset token for password reset via email.
"""
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.id}).decode('utf-8')
def confirm(self, token):
"""
Confirms User if the given token is valid and not expired.
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True
@staticmethod
def reset_password(token, new_password):
"""
Resets password for User if the given token is valid and not expired.
"""
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
user = User.query.get(data.get('reset'))
if user is None:
return False
user.password = new_password
db.session.add(user)
return True
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def can(self, perm):
"""
Checks if a User with its current role can doe something. Checks if the
associated role actually has the needed Permission.
"""
return self.role is not None and self.role.has_permission(perm)
def is_administrator(self):
"""
Checks if User has Admin permissions.
"""
return self.can(Permission.ADMIN)
class AnonymousUser(AnonymousUserMixin):
"""
Model replaces the default AnonymousUser.
"""
def can(self, permissions):
return False
def is_administrator(self):
return False
class Job(db.Model):
"""
Class to define Jobs.
"""
__tablename__ = 'jobs'
# Primary key
id = db.Column(db.Integer, primary_key=True)
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(255))
end_date = db.Column(db.DateTime())
mem_mb = db.Column(db.Integer)
n_cores = db.Column(db.Integer)
service = db.Column(db.String(64))
'''
' Service specific arguments as string list.
' Example: ["-l eng", "--keep-intermediates", "--skip-binarization"]
'''
service_args = db.Column(db.String(255))
service_version = db.Column(db.String(16))
status = db.Column(db.String(16))
title = db.Column(db.String(32))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
def __init__(self, **kwargs):
super(Job, self).__init__(**kwargs)
def __repr__(self):
"""
String representation of the Job. For human readability.
"""
return '<Job %r>' % self.title
def to_jsonifyable(self):
return {'id': self.id,
'creation_date': self.creation_date.timestamp(),
'description': self.description,
'end_date': (self.end_date.timestamp() if self.end_date else
None),
'mem_mb': self.mem_mb,
'n_cores': self.n_cores,
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,
'status': self.status,
'title': self.title,
'user_id': self.user_id}
class Corpus(db.Model):
"""
Class to define a corpus.
"""
__tablename__ = 'corpora'
# Primary key
id = db.Column(db.Integer, primary_key=True)
creation_date = db.Column(db.DateTime(), default=datetime.utcnow)
description = db.Column(db.String(255))
title = db.Column(db.String(32))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
def __init__(self, **kwargs):
super(Corpus, self).__init__(**kwargs)
def __repr__(self):
"""
String representation of the corpus. For human readability.
"""
return '<Corpus %r>' % self.title
def to_jsonifyable(self):
return {'id': self.id,
'creation_date': self.creation_date,
'description': self.description,
'title': self.title,
'user_id': self.user_id}
'''
' Flask-Login is told to use the applications custom anonymous user by setting
' its class in the login_manager.anonymous_user attribute.
'''
login_manager.anonymous_user = AnonymousUser
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))