from datetime import datetime, timedelta from enum import IntEnum from flask import current_app, url_for from flask_hashids import HashidMixin from flask_login import UserMixin from sqlalchemy.ext.associationproxy import association_proxy from pathlib import Path from typing import Union from werkzeug.security import generate_password_hash, check_password_hash import jwt import re import secrets import shutil from app import db, hashids from app.extensions.sqlalchemy_extras import IntEnumColumn from .corpus import Corpus from .corpus_follower_association import CorpusFollowerAssociation from .corpus_follower_role import CorpusFollowerRole from .role import Permission, Role from .token import Token class ProfilePrivacySettings(IntEnum): SHOW_EMAIL = 1 SHOW_LAST_SEEN = 2 SHOW_MEMBER_SINCE = 4 @staticmethod def get(profile_privacy_setting: Union['ProfilePrivacySettings', int, str]) -> 'ProfilePrivacySettings': if isinstance(profile_privacy_setting, ProfilePrivacySettings): return profile_privacy_setting if isinstance(profile_privacy_setting, int): return ProfilePrivacySettings(profile_privacy_setting) if isinstance(profile_privacy_setting, str): return ProfilePrivacySettings[profile_privacy_setting] raise TypeError('profile_privacy_setting must be ProfilePrivacySettings, int, or str') class UserSettingJobStatusMailNotificationLevel(IntEnum): NONE = 1 END = 2 ALL = 3 class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields email = db.Column(db.String(254), index=True, unique=True) username = db.Column(db.String(64), index=True, unique=True) username_pattern = re.compile(r'^[A-Za-zÄÖÜäöüß0-9_.]*$') password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) terms_of_use_accepted = db.Column(db.Boolean, default=False) member_since = db.Column(db.DateTime(), default=datetime.utcnow) setting_job_status_mail_notification_level = db.Column( IntEnumColumn(UserSettingJobStatusMailNotificationLevel), default=UserSettingJobStatusMailNotificationLevel.END ) last_seen = db.Column(db.DateTime()) full_name = db.Column(db.String(64)) about_me = db.Column(db.String(256)) location = db.Column(db.String(64)) website = db.Column(db.String(128)) organization = db.Column(db.String(128)) is_public = db.Column(db.Boolean, default=False) profile_privacy_settings = db.Column(db.Integer(), default=0) # Relationships avatar = db.relationship( 'Avatar', back_populates='user', cascade='all, delete-orphan', uselist=False ) corpora = db.relationship( 'Corpus', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) corpus_follower_associations = db.relationship( 'CorpusFollowerAssociation', back_populates='follower', cascade='all, delete-orphan' ) followed_corpora = association_proxy( 'corpus_follower_associations', 'corpus', creator=lambda c: CorpusFollowerAssociation(corpus=c) ) jobs = db.relationship( 'Job', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) role = db.relationship( 'Role', back_populates='users' ) spacy_nlp_pipeline_models = db.relationship( 'SpaCyNLPPipelineModel', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) tesseract_ocr_pipeline_models = db.relationship( 'TesseractOCRPipelineModel', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) tokens = db.relationship( 'Token', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) def __init__(self, **kwargs): if 'role' not in kwargs: kwargs['role'] = ( Role.query.filter_by(name='Administrator').first() if kwargs['email'] == current_app.config['NOPAQUE_ADMIN'] else Role.query.filter_by(default=True).first() ) super().__init__(**kwargs) def __repr__(self): return f'' @property def is_administrator(self): return self.can(Permission.ADMINISTRATE) @property def jsonpatch_path(self): return f'/users/{self.hashid}' @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) @property def path(self) -> Path: return current_app.config.get('NOPAQUE_DATA_DIR') / 'users' / f'{self.id}' @staticmethod def create(**kwargs): user = User(**kwargs) db.session.add(user) db.session.flush(objects=[user]) db.session.refresh(user) user_spacy_nlp_pipeline_models_dir = user.path / 'spacy_nlp_pipeline_models' user_tesseract_ocr_pipeline_models_dir = user.path / 'tesseract_ocr_pipeline_models' user_corpora_dir = user.path / 'corpora' user_jobs_dir = user.path / 'jobs' try: user.path.mkdir() user_spacy_nlp_pipeline_models_dir.mkdir() user_tesseract_ocr_pipeline_models_dir.mkdir() user_corpora_dir.mkdir() user_jobs_dir.mkdir() except OSError as e: # TODO: Potential leftover cleanup current_app.logger.error(e) db.session.rollback() raise return user @staticmethod def insert_defaults(): nopaque_user = User.query.filter_by(username='nopaque').first() system_user_role = Role.query.filter_by(name='System user').first() if nopaque_user is None: nopaque_user = User.create( username='nopaque', role=system_user_role ) db.session.add(nopaque_user) elif nopaque_user.role != system_user_role: nopaque_user.role = system_user_role db.session.commit() @staticmethod def reset_password(token, new_password): try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'User.reset_password': return False user_hashid = payload.get('sub') user_id = hashids.decode(user_hashid) user = User.query.get(user_id) if user is None: return False user.password = new_password db.session.add(user) return True @staticmethod def verify_access_token(access_token, refresh_token=None): token = Token.query.filter(Token.access_token == access_token).first() if token is not None: if token.access_expiration > datetime.utcnow(): token.user.ping() db.session.commit() if token.user.role.name != 'System user': return token.user @staticmethod def verify_refresh_token(refresh_token, access_token): token = Token.query.filter((Token.refresh_token == refresh_token) & (Token.access_token == access_token)).first() if token is not None: if token.refresh_expiration > datetime.utcnow(): return token # someone tried to refresh with an expired token # revoke all tokens from this user as a precaution token.user.revoke_auth_tokens() db.session.commit() def can(self, permission): return self.role is not None and self.role.has_permission(permission) def confirm(self, confirmation_token): try: payload = jwt.decode( confirmation_token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'user.confirm': return False if payload.get('sub') != self.hashid: return False self.confirmed = True db.session.add(self) return True def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def generate_auth_token(self): return Token( access_token=secrets.token_urlsafe(), access_expiration=datetime.utcnow() + timedelta(minutes=15), refresh_token=secrets.token_urlsafe(), refresh_expiration=datetime.utcnow() + timedelta(days=7), user=self ) def generate_confirm_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'user.confirm', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def generate_reset_password_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'User.reset_password', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def ping(self): self.last_seen = datetime.utcnow() def revoke_auth_tokens(self): for token in self.tokens: db.session.delete(token) def verify_password(self, password): if self.role.name == 'System user': return False return check_password_hash(self.password_hash, password) #region Profile Privacy settings def has_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) return self.profile_privacy_settings & s.value == s.value def add_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) if not self.has_profile_privacy_setting(s): self.profile_privacy_settings += s.value def remove_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) if self.has_profile_privacy_setting(s): self.profile_privacy_settings -= s.value def reset_profile_privacy_settings(self): self.profile_privacy_settings = 0 #endregion Profile Privacy settings def follow_corpus(self, corpus, role=None): if role is None: cfr = CorpusFollowerRole.query.filter_by(default=True).first() else: cfr = role if self.is_following_corpus(corpus): cfa = CorpusFollowerAssociation.query.filter_by(corpus=corpus, follower=self).first() if cfa.role != cfr: cfa.role = cfr else: cfa = CorpusFollowerAssociation(corpus=corpus, role=cfr, follower=self) db.session.add(cfa) def unfollow_corpus(self, corpus): if not self.is_following_corpus(corpus): return self.followed_corpora.remove(corpus) def is_following_corpus(self, corpus): return corpus in self.followed_corpora def generate_follow_corpus_token(self, corpus_hashid, role_name, expiration=7): now = datetime.utcnow() payload = { 'exp': expiration, 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'User.follow_corpus', 'role_name': role_name, 'sub': corpus_hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def follow_corpus_by_token(self, token): try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'role_name', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'User.follow_corpus': return False corpus_hashid = payload.get('sub') corpus_id = hashids.decode(corpus_hashid) corpus = Corpus.query.get_or_404(corpus_id) if corpus is None: return False role_name = payload.get('role_name') role = CorpusFollowerRole.query.filter_by(name=role_name).first() if role is None: return False self.follow_corpus(corpus, role) # db.session.add(self) return True def to_json_serializeable(self, backrefs=False, relationships=False, filter_by_privacy_settings=False): json_serializeable = { 'id': self.hashid, 'confirmed': self.confirmed, 'avatar': url_for('users.user_avatar', user_id=self.id), 'email': self.email, 'last_seen': ( None if self.last_seen is None else f'{self.last_seen.isoformat()}Z' ), 'member_since': f'{self.member_since.isoformat()}Z', 'username': self.username, 'full_name': self.full_name, 'about_me': self.about_me, 'website': self.website, 'location': self.location, 'organization': self.organization, 'job_status_mail_notification_level': \ self.setting_job_status_mail_notification_level.name, 'profile_privacy_settings': { 'is_public': self.is_public, 'show_email': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL), 'show_last_seen': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN), 'show_member_since': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE) } } if backrefs: json_serializeable['role'] = \ self.role.to_json_serializeable(backrefs=True) if relationships: json_serializeable['corpus_follower_associations'] = { x.hashid: x.to_json_serializeable() for x in self.corpus_follower_associations } json_serializeable['corpora'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.corpora } json_serializeable['jobs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.jobs } json_serializeable['tesseract_ocr_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.tesseract_ocr_pipeline_models } json_serializeable['spacy_nlp_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.spacy_nlp_pipeline_models } if filter_by_privacy_settings: if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL): json_serializeable.pop('email') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN): json_serializeable.pop('last_seen') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE): json_serializeable.pop('member_since') return json_serializeable