from datetime import datetime, timedelta from enum import Enum, IntEnum from flask import abort, current_app, url_for from flask_hashids import HashidMixin from flask_login import UserMixin from sqlalchemy.ext.associationproxy import association_proxy from time import sleep from tqdm import tqdm from typing import Union from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename import json import jwt import os import re import requests import secrets import shutil import xml.etree.ElementTree as ET import yaml from app import db, hashids, login, mail, socketio from app.converters.vrt import normalize_vrt_file from app.email import create_message ############################################################################## # enums # ############################################################################## # region enums class CorpusStatus(IntEnum): UNPREPARED = 1 SUBMITTED = 2 QUEUED = 3 BUILDING = 4 BUILT = 5 FAILED = 6 STARTING_ANALYSIS_SESSION = 7 RUNNING_ANALYSIS_SESSION = 8 CANCELING_ANALYSIS_SESSION = 9 @staticmethod def get(corpus_status: Union['CorpusStatus', int, str]) -> 'CorpusStatus': if isinstance(corpus_status, CorpusStatus): return corpus_status if isinstance(corpus_status, int): return CorpusStatus(corpus_status) if isinstance(corpus_status, str): return CorpusStatus[corpus_status] raise TypeError('corpus_status must be CorpusStatus, int, or str') class JobStatus(IntEnum): INITIALIZING = 1 SUBMITTED = 2 QUEUED = 3 RUNNING = 4 CANCELING = 5 CANCELED = 6 COMPLETED = 7 FAILED = 8 @staticmethod def get(job_status: Union['JobStatus', int, str]) -> 'JobStatus': if isinstance(job_status, JobStatus): return job_status if isinstance(job_status, int): return JobStatus(job_status) if isinstance(job_status, str): return JobStatus[job_status] raise TypeError('job_status must be JobStatus, int, or str') class Permission(IntEnum): ''' Defines User permissions as integers by the power of 2. User permission can be evaluated using the bitwise operator &. ''' ADMINISTRATE = 1 CONTRIBUTE = 2 USE_API = 4 @staticmethod def get(permission: Union['Permission', int, str]) -> 'Permission': if isinstance(permission, Permission): return permission if isinstance(permission, int): return Permission(permission) if isinstance(permission, str): return Permission[permission] raise TypeError('permission must be Permission, int, or str') class UserSettingJobStatusMailNotificationLevel(IntEnum): NONE = 1 END = 2 ALL = 3 class ProfilePrivacySettings(IntEnum): SHOW_EMAIL = 1 SHOW_LAST_SEEN = 2 SHOW_MEMBER_SINCE = 4 @staticmethod def get(profile_privacy_setting: Union['ProfilePrivacySettings', int, str]) -> 'ProfilePrivacySettings': if isinstance(profile_privacy_setting, ProfilePrivacySettings): return profile_privacy_setting if isinstance(profile_privacy_setting, int): return ProfilePrivacySettings(profile_privacy_setting) if isinstance(profile_privacy_setting, str): return ProfilePrivacySettings[profile_privacy_setting] raise TypeError('profile_privacy_setting must be ProfilePrivacySettings, int, or str') class CorpusFollowerPermission(IntEnum): VIEW = 1 MANAGE_FILES = 2 MANAGE_FOLLOWERS = 4 MANAGE_CORPUS = 8 @staticmethod def get(corpus_follower_permission: Union['CorpusFollowerPermission', int, str]) -> 'CorpusFollowerPermission': if isinstance(corpus_follower_permission, CorpusFollowerPermission): return corpus_follower_permission if isinstance(corpus_follower_permission, int): return CorpusFollowerPermission(corpus_follower_permission) if isinstance(corpus_follower_permission, str): return CorpusFollowerPermission[corpus_follower_permission] raise TypeError('corpus_follower_permission must be CorpusFollowerPermission, int, or str') # endregion enums ############################################################################## # mixins # ############################################################################## # region mixins class FileMixin: ''' Mixin for db.Model classes. All file related models should use this. ''' creation_date = db.Column(db.DateTime, default=datetime.utcnow) filename = db.Column(db.String(255)) mimetype = db.Column(db.String(255)) def file_mixin_to_json_serializeable(self, backrefs=False, relationships=False): return { 'creation_date': f'{self.creation_date.isoformat()}Z', 'filename': self.filename, 'mimetype': self.mimetype } @classmethod def create(cls, file_storage, **kwargs): filename = kwargs.pop('filename', file_storage.filename) mimetype = kwargs.pop('mimetype', file_storage.mimetype) obj = cls( filename=secure_filename(filename), mimetype=mimetype, **kwargs ) db.session.add(obj) db.session.flush(objects=[obj]) db.session.refresh(obj) try: file_storage.save(obj.path) except (AttributeError, OSError) as e: current_app.logger.error(e) db.session.rollback() raise e return obj # endregion mixins ############################################################################## # type_decorators # ############################################################################## # region type_decorators class IntEnumColumn(db.TypeDecorator): impl = db.Integer def __init__(self, enum_type, *args, **kwargs): super().__init__(*args, **kwargs) self.enum_type = enum_type def process_bind_param(self, value, dialect): if isinstance(value, self.enum_type) and isinstance(value.value, int): return value.value elif isinstance(value, int): return self.enum_type(value).value elif isinstance(value, str): return self.enum_type[value].value else: return TypeError() def process_result_value(self, value, dialect): return self.enum_type(value) class ContainerColumn(db.TypeDecorator): impl = db.String def __init__(self, container_type, *args, **kwargs): super().__init__(*args, **kwargs) self.container_type = container_type def process_bind_param(self, value, dialect): if isinstance(value, self.container_type): return json.dumps(value) elif isinstance(value, str) and isinstance(json.loads(value), self.container_type): return value else: return TypeError() def process_result_value(self, value, dialect): return json.loads(value) # endregion type_decorators ############################################################################## # Models # ############################################################################## # region models class Role(HashidMixin, db.Model): __tablename__ = 'roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.Integer, default=0) # Relationships users = db.relationship('User', back_populates='role', lazy='dynamic') def __repr__(self): return f'' def has_permission(self, permission: Union[Permission, int, str]): p = Permission.get(permission) return self.permissions & p.value == p.value def add_permission(self, permission: Union[Permission, int, str]): p = Permission.get(permission) if not self.has_permission(p): self.permissions += p.value def remove_permission(self, permission: Union[Permission, int, str]): p = Permission.get(permission) if self.has_permission(p): self.permissions -= p.value def reset_permissions(self): self.permissions = 0 def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'default': self.default, 'name': self.name, 'permissions': [ x.name for x in Permission if self.has_permission(x.value) ] } if backrefs: pass if relationships: json_serializeable['users'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.users } return json_serializeable @staticmethod def insert_defaults(): roles = { 'User': [], 'API user': [Permission.USE_API], 'Contributor': [Permission.CONTRIBUTE], 'Administrator': [ Permission.ADMINISTRATE, Permission.CONTRIBUTE, Permission.USE_API ], 'System user': [] } default_role_name = 'User' for role_name, permissions in roles.items(): role = Role.query.filter_by(name=role_name).first() if role is None: role = Role(name=role_name) role.reset_permissions() for permission in permissions: role.add_permission(permission) role.default = role.name == default_role_name db.session.add(role) db.session.commit() class Token(db.Model): __tablename__ = 'tokens' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields access_token = db.Column(db.String(64), index=True) access_expiration = db.Column(db.DateTime) refresh_token = db.Column(db.String(64), index=True) refresh_expiration = db.Column(db.DateTime) # Relationships user = db.relationship('User', back_populates='tokens') def expire(self): self.access_expiration = datetime.utcnow() self.refresh_expiration = datetime.utcnow() def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'access_token': self.access_token, 'access_expiration': ( None if self.access_expiration is None else f'{self.access_expiration.isoformat()}Z' ), 'refresh_token': self.refresh_token, 'refresh_expiration': ( None if self.refresh_expiration is None else f'{self.refresh_expiration.isoformat()}Z' ) } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable @staticmethod def clean(): """Remove any tokens that have been expired for more than a day.""" yesterday = datetime.utcnow() - timedelta(days=1) Token.query.filter(Token.refresh_expiration < yesterday).delete() class Avatar(HashidMixin, FileMixin, db.Model): __tablename__ = 'avatars' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Relationships user = db.relationship('User', back_populates='avatar') @property def path(self): return os.path.join(self.user.path, 'avatar') def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class CorpusFollowerRole(HashidMixin, db.Model): __tablename__ = 'corpus_follower_roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.Integer, default=0) # Relationships corpus_follower_associations = db.relationship( 'CorpusFollowerAssociation', back_populates='role' ) def __repr__(self): return f'' def has_permission(self, permission: Union[CorpusFollowerPermission, int, str]): perm = CorpusFollowerPermission.get(permission) return self.permissions & perm.value == perm.value def add_permission(self, permission: Union[CorpusFollowerPermission, int, str]): perm = CorpusFollowerPermission.get(permission) if not self.has_permission(perm): self.permissions += perm.value def remove_permission(self, permission: Union[CorpusFollowerPermission, int, str]): perm = CorpusFollowerPermission.get(permission) if self.has_permission(perm): self.permissions -= perm.value def reset_permissions(self): self.permissions = 0 def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'default': self.default, 'name': self.name, 'permissions': [ x.name for x in CorpusFollowerPermission if self.has_permission(x) ] } if backrefs: pass if relationships: json_serializeable['corpus_follower_association'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.corpus_follower_association } return json_serializeable @staticmethod def insert_defaults(): roles = { 'Anonymous': [], 'Viewer': [ CorpusFollowerPermission.VIEW ], 'Contributor': [ CorpusFollowerPermission.VIEW, CorpusFollowerPermission.MANAGE_FILES ], 'Administrator': [ CorpusFollowerPermission.VIEW, CorpusFollowerPermission.MANAGE_FILES, CorpusFollowerPermission.MANAGE_FOLLOWERS, CorpusFollowerPermission.MANAGE_CORPUS ] } default_role_name = 'Viewer' for role_name, permissions in roles.items(): role = CorpusFollowerRole.query.filter_by(name=role_name).first() if role is None: role = CorpusFollowerRole(name=role_name) role.reset_permissions() for permission in permissions: role.add_permission(permission) role.default = role.name == default_role_name db.session.add(role) db.session.commit() class CorpusFollowerAssociation(HashidMixin, db.Model): __tablename__ = 'corpus_follower_associations' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) follower_id = db.Column(db.Integer, db.ForeignKey('users.id')) role_id = db.Column(db.Integer, db.ForeignKey('corpus_follower_roles.id')) # Relationships corpus = db.relationship( 'Corpus', back_populates='corpus_follower_associations' ) follower = db.relationship( 'User', back_populates='corpus_follower_associations' ) role = db.relationship( 'CorpusFollowerRole', back_populates='corpus_follower_associations' ) def __init__(self, **kwargs): if 'role' not in kwargs: kwargs['role'] = CorpusFollowerRole.query.filter_by(default=True).first() super().__init__(**kwargs) def __repr__(self): return f'' def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'corpus': self.corpus.to_json_serializeable(backrefs=True), 'follower': self.follower.to_json_serializeable(), 'role': self.role.to_json_serializeable() } if backrefs: pass if relationships: pass return json_serializeable class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields email = db.Column(db.String(254), index=True, unique=True) username = db.Column(db.String(64), index=True, unique=True) username_pattern = re.compile(r'^[A-Za-zÄÖÜäöüß0-9_.]*$') password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) terms_of_use_accepted = db.Column(db.Boolean, default=False) member_since = db.Column(db.DateTime(), default=datetime.utcnow) setting_job_status_mail_notification_level = db.Column( IntEnumColumn(UserSettingJobStatusMailNotificationLevel), default=UserSettingJobStatusMailNotificationLevel.END ) last_seen = db.Column(db.DateTime()) full_name = db.Column(db.String(64)) about_me = db.Column(db.String(256)) location = db.Column(db.String(64)) website = db.Column(db.String(128)) organization = db.Column(db.String(128)) is_public = db.Column(db.Boolean, default=False) profile_privacy_settings = db.Column(db.Integer(), default=0) # Relationships avatar = db.relationship( 'Avatar', back_populates='user', cascade='all, delete-orphan', uselist=False ) corpora = db.relationship( 'Corpus', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) corpus_follower_associations = db.relationship( 'CorpusFollowerAssociation', back_populates='follower', cascade='all, delete-orphan' ) followed_corpora = association_proxy( 'corpus_follower_associations', 'corpus', creator=lambda c: CorpusFollowerAssociation(corpus=c) ) jobs = db.relationship( 'Job', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) role = db.relationship( 'Role', back_populates='users' ) spacy_nlp_pipeline_models = db.relationship( 'SpaCyNLPPipelineModel', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) tesseract_ocr_pipeline_models = db.relationship( 'TesseractOCRPipelineModel', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) tokens = db.relationship( 'Token', back_populates='user', cascade='all, delete-orphan', lazy='dynamic' ) def __init__(self, **kwargs): if 'role' not in kwargs: kwargs['role'] = ( Role.query.filter_by(name='Administrator').first() if kwargs['email'] == current_app.config['NOPAQUE_ADMIN'] else Role.query.filter_by(default=True).first() ) super().__init__(**kwargs) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'/users/{self.hashid}' @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) @property def path(self): return os.path.join( current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id)) @staticmethod def create(**kwargs): user = User(**kwargs) db.session.add(user) db.session.flush(objects=[user]) db.session.refresh(user) try: os.mkdir(user.path) os.mkdir(os.path.join(user.path, 'spacy_nlp_pipeline_models')) os.mkdir(os.path.join(user.path, 'tesseract_ocr_pipeline_models')) os.mkdir(os.path.join(user.path, 'corpora')) os.mkdir(os.path.join(user.path, 'jobs')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return user @staticmethod def insert_defaults(): nopaque_user = User.query.filter_by(username='nopaque').first() system_user_role = Role.query.filter_by(name='System user').first() if nopaque_user is None: nopaque_user = User.create( username='nopaque', role=system_user_role ) db.session.add(nopaque_user) elif nopaque_user.role != system_user_role: nopaque_user.role = system_user_role db.session.commit() @staticmethod def reset_password(token, new_password): try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'User.reset_password': return False user_hashid = payload.get('sub') user_id = hashids.decode(user_hashid) user = User.query.get(user_id) if user is None: return False user.password = new_password db.session.add(user) return True @staticmethod def verify_access_token(access_token, refresh_token=None): token = Token.query.filter(Token.access_token == access_token).first() if token is not None: if token.access_expiration > datetime.utcnow(): token.user.ping() db.session.commit() if token.user.role.name != 'System user': return token.user @staticmethod def verify_refresh_token(refresh_token, access_token): token = Token.query.filter((Token.refresh_token == refresh_token) & (Token.access_token == access_token)).first() if token is not None: if token.refresh_expiration > datetime.utcnow(): return token # someone tried to refresh with an expired token # revoke all tokens from this user as a precaution token.user.revoke_auth_tokens() db.session.commit() def can(self, permission): return self.role is not None and self.role.has_permission(permission) def confirm(self, confirmation_token): try: payload = jwt.decode( confirmation_token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'user.confirm': return False if payload.get('sub') != self.hashid: return False self.confirmed = True db.session.add(self) return True def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def generate_auth_token(self): return Token( access_token=secrets.token_urlsafe(), access_expiration=datetime.utcnow() + timedelta(minutes=15), refresh_token=secrets.token_urlsafe(), refresh_expiration=datetime.utcnow() + timedelta(days=7), user=self ) def generate_confirm_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'user.confirm', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def generate_reset_password_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'User.reset_password', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def is_administrator(self): return self.can(Permission.ADMINISTRATE) def ping(self): self.last_seen = datetime.utcnow() def revoke_auth_tokens(self): for token in self.tokens: db.session.delete(token) def verify_password(self, password): if self.role.name == 'System user': return False return check_password_hash(self.password_hash, password) #region Profile Privacy settings def has_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) return self.profile_privacy_settings & s.value == s.value def add_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) if not self.has_profile_privacy_setting(s): self.profile_privacy_settings += s.value def remove_profile_privacy_setting(self, setting): s = ProfilePrivacySettings.get(setting) if self.has_profile_privacy_setting(s): self.profile_privacy_settings -= s.value def reset_profile_privacy_settings(self): self.profile_privacy_settings = 0 #endregion Profile Privacy settings def follow_corpus(self, corpus, role=None): if role is None: cfr = CorpusFollowerRole.query.filter_by(default=True).first() else: cfr = role if self.is_following_corpus(corpus): cfa = CorpusFollowerAssociation.query.filter_by(corpus=corpus, follower=self).first() if cfa.role != cfr: cfa.role = cfr else: cfa = CorpusFollowerAssociation(corpus=corpus, role=cfr, follower=self) db.session.add(cfa) def unfollow_corpus(self, corpus): if not self.is_following_corpus(corpus): return self.followed_corpora.remove(corpus) def is_following_corpus(self, corpus): return corpus in self.followed_corpora def generate_follow_corpus_token(self, corpus_hashid, role_name, expiration=7): now = datetime.utcnow() payload = { 'exp': expiration, 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'User.follow_corpus', 'role_name': role_name, 'sub': corpus_hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def follow_corpus_by_token(self, token): try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'role_name', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'User.follow_corpus': return False corpus_hashid = payload.get('sub') corpus_id = hashids.decode(corpus_hashid) corpus = Corpus.query.get_or_404(corpus_id) if corpus is None: return False role_name = payload.get('role_name') role = CorpusFollowerRole.query.filter_by(name=role_name).first() if role is None: return False self.follow_corpus(corpus, role) # db.session.add(self) return True def to_json_serializeable(self, backrefs=False, relationships=False, filter_by_privacy_settings=False): json_serializeable = { 'id': self.hashid, 'confirmed': self.confirmed, 'avatar': url_for('users.user_avatar', user_id=self.id), 'email': self.email, 'last_seen': ( None if self.last_seen is None else f'{self.last_seen.isoformat()}Z' ), 'member_since': f'{self.member_since.isoformat()}Z', 'username': self.username, 'full_name': self.full_name, 'about_me': self.about_me, 'website': self.website, 'location': self.location, 'organization': self.organization, 'job_status_mail_notification_level': \ self.setting_job_status_mail_notification_level.name, 'profile_privacy_settings': { 'is_public': self.is_public, 'show_email': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL), 'show_last_seen': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN), 'show_member_since': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE) } } if backrefs: json_serializeable['role'] = \ self.role.to_json_serializeable(backrefs=True) if relationships: json_serializeable['corpus_follower_associations'] = { x.hashid: x.to_json_serializeable() for x in self.corpus_follower_associations } json_serializeable['corpora'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.corpora } json_serializeable['jobs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.jobs } json_serializeable['tesseract_ocr_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.tesseract_ocr_pipeline_models } json_serializeable['spacy_nlp_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.spacy_nlp_pipeline_models } if filter_by_privacy_settings: if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL): json_serializeable.pop('email') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN): json_serializeable.pop('last_seen') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE): json_serializeable.pop('member_since') return json_serializeable class TesseractOCRPipelineModel(FileMixin, HashidMixin, db.Model): __tablename__ = 'tesseract_ocr_pipeline_models' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields title = db.Column(db.String(64)) description = db.Column(db.String(255)) version = db.Column(db.String(16)) compatible_service_versions = db.Column(ContainerColumn(list, 255)) publisher = db.Column(db.String(128)) publisher_url = db.Column(db.String(512)) publishing_url = db.Column(db.String(512)) publishing_year = db.Column(db.Integer) is_public = db.Column(db.Boolean, default=False) # Relationships user = db.relationship('User', back_populates='tesseract_ocr_pipeline_models') @property def path(self): return os.path.join( self.user.path, 'tesseract_ocr_pipeline_models', str(self.id) ) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/tesseract_ocr_pipeline_models/{self.hashid}' @property def url(self): return url_for( 'contributions.tesseract_ocr_pipeline_model', tesseract_ocr_pipeline_model_id=self.id ) @property def user_hashid(self): return self.user.hashid @staticmethod def insert_defaults(force_download=False): nopaque_user = User.query.filter_by(username='nopaque').first() defaults_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'TesseractOCRPipelineModel.defaults.yml' ) with open(defaults_file, 'r') as f: defaults = yaml.safe_load(f) for m in defaults: model = TesseractOCRPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa if model is not None: model.compatible_service_versions = m['compatible_service_versions'] model.description = m['description'] model.filename = f'{model.id}.traineddata' model.publisher = m['publisher'] model.publisher_url = m['publisher_url'] model.publishing_url = m['publishing_url'] model.publishing_year = m['publishing_year'] model.is_public = True model.title = m['title'] model.version = m['version'] else: model = TesseractOCRPipelineModel( compatible_service_versions=m['compatible_service_versions'], description=m['description'], publisher=m['publisher'], publisher_url=m['publisher_url'], publishing_url=m['publishing_url'], publishing_year=m['publishing_year'], is_public=True, title=m['title'], user=nopaque_user, version=m['version'] ) db.session.add(model) db.session.flush(objects=[model]) db.session.refresh(model) model.filename = f'{model.id}.traineddata' if not os.path.exists(model.path) or force_download: r = requests.get(m['url'], stream=True) pbar = tqdm( desc=f'{model.title} ({model.filename})', unit="B", unit_scale=True, unit_divisor=1024, total=int(r.headers['Content-Length']) ) pbar.clear() with open(model.path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks pbar.update(len(chunk)) f.write(chunk) pbar.close() db.session.commit() def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'compatible_service_versions': self.compatible_service_versions, 'description': self.description, 'publisher': self.publisher, 'publisher_url': self.publisher_url, 'publishing_url': self.publishing_url, 'publishing_year': self.publishing_year, 'is_public': self.is_public, 'title': self.title, 'version': self.version, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class SpaCyNLPPipelineModel(FileMixin, HashidMixin, db.Model): __tablename__ = 'spacy_nlp_pipeline_models' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields title = db.Column(db.String(64)) description = db.Column(db.String(255)) version = db.Column(db.String(16)) compatible_service_versions = db.Column(ContainerColumn(list, 255)) publisher = db.Column(db.String(128)) publisher_url = db.Column(db.String(512)) publishing_url = db.Column(db.String(512)) publishing_year = db.Column(db.Integer) pipeline_name = db.Column(db.String(64)) is_public = db.Column(db.Boolean, default=False) # Relationships user = db.relationship('User', back_populates='spacy_nlp_pipeline_models') @property def path(self): return os.path.join( self.user.path, 'spacy_nlp_pipeline_models', str(self.id) ) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/spacy_nlp_pipeline_models/{self.hashid}' @property def url(self): return url_for( 'contributions.spacy_nlp_pipeline_model', spacy_nlp_pipeline_model_id=self.id ) @property def user_hashid(self): return self.user.hashid @staticmethod def insert_defaults(force_download=False): nopaque_user = User.query.filter_by(username='nopaque').first() defaults_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'SpaCyNLPPipelineModel.defaults.yml' ) with open(defaults_file, 'r') as f: defaults = yaml.safe_load(f) for m in defaults: model = SpaCyNLPPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa if model is not None: model.compatible_service_versions = m['compatible_service_versions'] model.description = m['description'] model.filename = m['url'].split('/')[-1] model.publisher = m['publisher'] model.publisher_url = m['publisher_url'] model.publishing_url = m['publishing_url'] model.publishing_year = m['publishing_year'] model.is_public = True model.title = m['title'] model.version = m['version'] model.pipeline_name = m['pipeline_name'] else: model = SpaCyNLPPipelineModel( compatible_service_versions=m['compatible_service_versions'], description=m['description'], filename=m['url'].split('/')[-1], publisher=m['publisher'], publisher_url=m['publisher_url'], publishing_url=m['publishing_url'], publishing_year=m['publishing_year'], is_public=True, title=m['title'], user=nopaque_user, version=m['version'], pipeline_name=m['pipeline_name'] ) db.session.add(model) db.session.flush(objects=[model]) db.session.refresh(model) if not os.path.exists(model.path) or force_download: r = requests.get(m['url'], stream=True) pbar = tqdm( desc=f'{model.title} ({model.filename})', unit="B", unit_scale=True, unit_divisor=1024, total=int(r.headers['Content-Length']) ) pbar.clear() with open(model.path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks pbar.update(len(chunk)) f.write(chunk) pbar.close() db.session.commit() def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'compatible_service_versions': self.compatible_service_versions, 'description': self.description, 'publisher': self.publisher, 'publisher_url': self.publisher_url, 'publishing_url': self.publishing_url, 'publishing_year': self.publishing_year, 'pipeline_name': self.pipeline_name, 'is_public': self.is_public, 'title': self.title, 'version': self.version, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class JobInput(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_inputs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Relationships job = db.relationship( 'Job', back_populates='inputs' ) def __repr__(self): return f'' @property def content_url(self): return url_for( 'jobs.download_job_input', job_id=self.job.id, job_input_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/inputs/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'inputs', str(self.id)) @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-input-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user.id def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['job'] = \ self.job.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class JobResult(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Fields description = db.Column(db.String(255)) # Relationships job = db.relationship( 'Job', back_populates='results' ) def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_result', job_id=self.job_id, job_result_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/results/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'results', str(self.id)) @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-result-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user.id def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'description': self.description, **self.file_mixin_to_json_serializeable( backrefs=backrefs, relationships=relationships ) } if backrefs: json_serializeable['job'] = \ self.job.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class Job(HashidMixin, db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = \ db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) service_args = db.Column(ContainerColumn(dict, 255)) service_version = db.Column(db.String(16)) status = db.Column( IntEnumColumn(JobStatus), default=JobStatus.INITIALIZING ) title = db.Column(db.String(32)) # Relationships inputs = db.relationship( 'JobInput', back_populates='job', cascade='all, delete-orphan', lazy='dynamic' ) results = db.relationship( 'JobResult', back_populates='job', cascade='all, delete-orphan', lazy='dynamic' ) user = db.relationship( 'User', back_populates='jobs' ) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/jobs/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'jobs', str(self.id)) @property def url(self): return url_for('jobs.job', job_id=self.id) @property def user_hashid(self): return self.user.hashid @staticmethod def create(**kwargs): job = Job(**kwargs) db.session.add(job) db.session.flush(objects=[job]) db.session.refresh(job) try: os.mkdir(job.path) os.mkdir(os.path.join(job.path, 'inputs')) os.mkdir(os.path.join(job.path, 'pipeline_data')) os.mkdir(os.path.join(job.path, 'results')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return job def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa self.status = JobStatus.CANCELING db.session.commit() while self.status != JobStatus.CANCELED: # In case the daemon handled a job in any way if self.status != JobStatus.CANCELING: self.status = JobStatus.CANCELING db.session.commit() sleep(1) db.session.refresh(self) try: shutil.rmtree(self.path) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e db.session.delete(self) def restart(self): ''' Restart a job - only if the status is failed ''' if self.status != JobStatus.FAILED: raise Exception('Job status is not "failed"') shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True) shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) for result in self.results: db.session.delete(result) self.end_date = None self.status = JobStatus.SUBMITTED def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'creation_date': f'{self.creation_date.isoformat()}Z', 'description': self.description, 'end_date': ( None if self.end_date is None else f'{self.end_date.isoformat()}Z' ), 'service': self.service, 'service_args': self.service_args, 'service_version': self.service_version, 'status': self.status.name, 'title': self.title } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: json_serializeable['inputs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.inputs } json_serializeable['results'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.results } return json_serializeable class CorpusFile(FileMixin, HashidMixin, db.Model): __tablename__ = 'corpus_files' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) # Fields author = db.Column(db.String(255)) description = db.Column(db.String(255)) publishing_year = db.Column(db.Integer) title = db.Column(db.String(255)) address = db.Column(db.String(255)) booktitle = db.Column(db.String(255)) chapter = db.Column(db.String(255)) editor = db.Column(db.String(255)) institution = db.Column(db.String(255)) journal = db.Column(db.String(255)) pages = db.Column(db.String(255)) publisher = db.Column(db.String(255)) school = db.Column(db.String(255)) # Relationships corpus = db.relationship( 'Corpus', back_populates='files' ) @property def download_url(self): return url_for( 'corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def jsonpatch_path(self): return f'{self.corpus.jsonpatch_path}/files/{self.hashid}' @property def path(self): return os.path.join(self.corpus.path, 'files', str(self.id)) @property def url(self): return url_for( 'corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def user_hashid(self): return self.corpus.user.hashid @property def user_id(self): return self.corpus.user_id def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) self.corpus.status = CorpusStatus.UNPREPARED def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'address': self.address, 'author': self.author, 'description': self.description, 'booktitle': self.booktitle, 'chapter': self.chapter, 'editor': self.editor, 'institution': self.institution, 'journal': self.journal, 'pages': self.pages, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'school': self.school, 'title': self.title, **self.file_mixin_to_json_serializeable( backrefs=backrefs, relationships=relationships ) } if backrefs: json_serializeable['corpus'] = \ self.corpus.to_json_serializeable(backrefs=True) if relationships: pass return json_serializeable class Corpus(HashidMixin, db.Model): ''' Class to define a corpus. ''' __tablename__ = 'corpora' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) status = db.Column( IntEnumColumn(CorpusStatus), default=CorpusStatus.UNPREPARED ) title = db.Column(db.String(32)) num_analysis_sessions = db.Column(db.Integer, default=0) num_tokens = db.Column(db.Integer, default=0) is_public = db.Column(db.Boolean, default=False) # Relationships files = db.relationship( 'CorpusFile', back_populates='corpus', lazy='dynamic', cascade='all, delete-orphan' ) corpus_follower_associations = db.relationship( 'CorpusFollowerAssociation', back_populates='corpus', cascade='all, delete-orphan' ) followers = association_proxy( 'corpus_follower_associations', 'follower', creator=lambda u: CorpusFollowerAssociation(follower=u) ) user = db.relationship('User', back_populates='corpora') # "static" attributes max_num_tokens = 2_147_483_647 def __repr__(self): return f'' @property def analysis_url(self): return url_for('corpora.analysis', corpus_id=self.id) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/corpora/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'corpora', str(self.id)) @property def url(self): return url_for('corpora.corpus', corpus_id=self.id) @property def user_hashid(self): return self.user.hashid @staticmethod def create(**kwargs): corpus = Corpus(**kwargs) db.session.add(corpus) db.session.flush(objects=[corpus]) db.session.refresh(corpus) try: os.mkdir(corpus.path) os.mkdir(os.path.join(corpus.path, 'files')) os.mkdir(os.path.join(corpus.path, 'cwb')) os.mkdir(os.path.join(corpus.path, 'cwb', 'data')) os.mkdir(os.path.join(corpus.path, 'cwb', 'registry')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return corpus def build(self): build_dir = os.path.join(self.path, 'cwb') shutil.rmtree(build_dir, ignore_errors=True) os.mkdir(build_dir) os.mkdir(os.path.join(build_dir, 'data')) os.mkdir(os.path.join(build_dir, 'registry')) corpus_element = ET.fromstring('\n') for corpus_file in self.files: normalized_vrt_path = os.path.join(build_dir, f'{corpus_file.id}.norm.vrt') try: normalize_vrt_file(corpus_file.path, normalized_vrt_path) except: self.status = CorpusStatus.FAILED return element_tree = ET.parse(normalized_vrt_path) text_element = element_tree.getroot() text_element.set('author', corpus_file.author) text_element.set('title', corpus_file.title) text_element.set( 'publishing_year', f'{corpus_file.publishing_year}' ) text_element.set('address', corpus_file.address or 'NULL') text_element.set('booktitle', corpus_file.booktitle or 'NULL') text_element.set('chapter', corpus_file.chapter or 'NULL') text_element.set('editor', corpus_file.editor or 'NULL') text_element.set('institution', corpus_file.institution or 'NULL') text_element.set('journal', corpus_file.journal or 'NULL') text_element.set('pages', f'{corpus_file.pages}' or 'NULL') text_element.set('publisher', corpus_file.publisher or 'NULL') text_element.set('school', corpus_file.school or 'NULL') text_element.tail = '\n' # corpus_element.insert(1, text_element) corpus_element.append(text_element) ET.ElementTree(corpus_element).write( os.path.join(build_dir, 'corpus.vrt'), encoding='utf-8' ) self.status = CorpusStatus.SUBMITTED def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'creation_date': f'{self.creation_date.isoformat()}Z', 'description': self.description, 'max_num_tokens': self.max_num_tokens, 'num_analysis_sessions': self.num_analysis_sessions, 'num_tokens': self.num_tokens, 'status': self.status.name, 'title': self.title, 'is_public': self.is_public } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: json_serializeable['corpus_follower_associations'] = { x.hashid: x.to_json_serializeable() for x in self.corpus_follower_associations } json_serializeable['files'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.files } return json_serializeable # endregion models ############################################################################## # event_handlers # ############################################################################## # region event_handlers @db.event.listens_for(Corpus, 'after_delete') @db.event.listens_for(CorpusFile, 'after_delete') @db.event.listens_for(Job, 'after_delete') @db.event.listens_for(JobInput, 'after_delete') @db.event.listens_for(JobResult, 'after_delete') @db.event.listens_for(SpaCyNLPPipelineModel, 'after_delete') @db.event.listens_for(TesseractOCRPipelineModel, 'after_delete') def resource_after_delete(mapper, connection, resource): jsonpatch = [ { 'op': 'remove', 'path': resource.jsonpatch_path } ] room = f'/users/{resource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(CorpusFollowerAssociation, 'after_delete') def cfa_after_delete_handler(mapper, connection, cfa): jsonpatch_path = f'/users/{cfa.corpus.user.hashid}/corpora/{cfa.corpus.hashid}/corpus_follower_associations/{cfa.hashid}' jsonpatch = [ { 'op': 'remove', 'path': jsonpatch_path } ] room = f'/users/{cfa.corpus.user.hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Corpus, 'after_insert') @db.event.listens_for(CorpusFile, 'after_insert') @db.event.listens_for(Job, 'after_insert') @db.event.listens_for(JobInput, 'after_insert') @db.event.listens_for(JobResult, 'after_insert') @db.event.listens_for(SpaCyNLPPipelineModel, 'after_insert') @db.event.listens_for(TesseractOCRPipelineModel, 'after_insert') def resource_after_insert_handler(mapper, connection, resource): jsonpatch_value = resource.to_json_serializeable() for attr in mapper.relationships: jsonpatch_value[attr.key] = {} jsonpatch = [ { 'op': 'add', 'path': resource.jsonpatch_path, 'value': jsonpatch_value } ] room = f'/users/{resource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(CorpusFollowerAssociation, 'after_insert') def cfa_after_insert_handler(mapper, connection, cfa): jsonpatch_value = cfa.to_json_serializeable() jsonpatch_path = f'/users/{cfa.corpus.user.hashid}/corpora/{cfa.corpus.hashid}/corpus_follower_associations/{cfa.hashid}' jsonpatch = [ { 'op': 'add', 'path': jsonpatch_path, 'value': jsonpatch_value } ] room = f'/users/{cfa.corpus.user.hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Corpus, 'after_update') @db.event.listens_for(CorpusFile, 'after_update') @db.event.listens_for(Job, 'after_update') @db.event.listens_for(JobInput, 'after_update') @db.event.listens_for(JobResult, 'after_update') @db.event.listens_for(SpaCyNLPPipelineModel, 'after_update') @db.event.listens_for(TesseractOCRPipelineModel, 'after_update') def resource_after_update_handler(mapper, connection, resource): jsonpatch = [] for attr in db.inspect(resource).attrs: if attr.key in mapper.relationships: continue if not attr.load_history().has_changes(): continue jsonpatch_path = f'{resource.jsonpatch_path}/{attr.key}' if isinstance(attr.value, datetime): jsonpatch_value = f'{attr.value.isoformat()}Z' elif isinstance(attr.value, Enum): jsonpatch_value = attr.value.name else: jsonpatch_value = attr.value jsonpatch.append( { 'op': 'replace', 'path': jsonpatch_path, 'value': jsonpatch_value } ) if jsonpatch: room = f'/users/{resource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Job, 'after_update') def job_after_update_handler(mapper, connection, job): for attr in db.inspect(job).attrs: if attr.key != 'status': continue if not attr.load_history().has_changes(): return if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.NONE: return if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.END: if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: return msg = create_message( job.user.email, f'Status update for your Job "{job.title}"', 'tasks/email/notification', job=job ) mail.send(msg) # endregion event_handlers ############################################################################## # misc # ############################################################################## # region misc @login.user_loader def load_user(user_id): return User.query.get(int(user_id)) # endregion misc