from datetime import datetime, timedelta from enum import Enum, IntEnum from flask import current_app, url_for from flask_hashids import HashidMixin from flask_login import UserMixin from time import sleep from tqdm import tqdm from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename import json import jwt import os import requests import secrets import shutil import xml.etree.ElementTree as ET import yaml from app import db, hashids, login, mail, socketio from app.converters.vrt import normalize_vrt_file from app.email import create_message ############################################################################## # enums # ############################################################################## # region enums class CorpusStatus(IntEnum): UNPREPARED = 1 SUBMITTED = 2 QUEUED = 3 BUILDING = 4 BUILT = 5 FAILED = 6 STARTING_ANALYSIS_SESSION = 7 RUNNING_ANALYSIS_SESSION = 8 CANCELING_ANALYSIS_SESSION = 9 class JobStatus(IntEnum): INITIALIZING = 1 SUBMITTED = 2 QUEUED = 3 RUNNING = 4 CANCELING = 5 CANCELED = 6 COMPLETED = 7 FAILED = 8 class Permission(IntEnum): ''' Defines User permissions as integers by the power of 2. User permission can be evaluated using the bitwise operator &. ''' ADMINISTRATE = 1 CONTRIBUTE = 2 USE_API = 4 class UserSettingJobStatusMailNotificationLevel(IntEnum): NONE = 1 END = 2 ALL = 3 class ProfilePrivacySettings(IntEnum): SHOW_EMAIL = 1 SHOW_LAST_SEEN = 2 SHOW_MEMBER_SINCE = 4 # endregion enums ############################################################################## # mixins # ############################################################################## # region mixins class FileMixin: ''' Mixin for db.Model classes. All file related models should use this. ''' creation_date = db.Column(db.DateTime, default=datetime.utcnow) filename = db.Column(db.String(255)) mimetype = db.Column(db.String(255)) def file_mixin_to_json_serializeable(self, backrefs=False, relationships=False): return { 'creation_date': f'{self.creation_date.isoformat()}Z', 'filename': self.filename, 'mimetype': self.mimetype } @classmethod def create(cls, file_storage, **kwargs): filename = kwargs.pop('filename', file_storage.filename) mimetype = kwargs.pop('mimetype', file_storage.mimetype) obj = cls( filename=secure_filename(filename), mimetype=mimetype, **kwargs ) db.session.add(obj) db.session.flush(objects=[obj]) db.session.refresh(obj) try: file_storage.save(obj.path) except (AttributeError, OSError) as e: current_app.logger.error(e) db.session.rollback() raise e return obj # endregion mixins ############################################################################## # type_decorators # ############################################################################## # region type_decorators class IntEnumColumn(db.TypeDecorator): impl = db.Integer def __init__(self, enum_type, *args, **kwargs): super().__init__(*args, **kwargs) self.enum_type = enum_type def process_bind_param(self, value, dialect): if isinstance(value, self.enum_type) and isinstance(value.value, int): return value.value elif isinstance(value, int): return self.enum_type(value).value else: return TypeError() def process_result_value(self, value, dialect): return self.enum_type(value) class ContainerColumn(db.TypeDecorator): impl = db.String def __init__(self, container_type, *args, **kwargs): super().__init__(*args, **kwargs) self.container_type = container_type def process_bind_param(self, value, dialect): if isinstance(value, self.container_type): return json.dumps(value) elif (isinstance(value, str) and isinstance(json.loads(value), self.container_type)): return value else: return TypeError() def process_result_value(self, value, dialect): return json.loads(value) # endregion type_decorators ############################################################################## # Models # ############################################################################## # region models class Role(HashidMixin, db.Model): __tablename__ = 'roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.Integer, default=0) # Relationships users = db.relationship('User', backref='role', lazy='dynamic') def __repr__(self): return f'' def add_permission(self, permission): if not self.has_permission(permission): self.permissions += permission def has_permission(self, permission): return self.permissions & permission == permission def remove_permission(self, permission): if self.has_permission(permission): self.permissions -= permission def reset_permissions(self): self.permissions = 0 def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'default': self.default, 'name': self.name, 'permissions': self.permissions } if relationships: json_serializeable['users'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.users } return json_serializeable @staticmethod def insert_defaults(): roles = { 'User': [], 'API user': [Permission.USE_API], 'Contributor': [Permission.CONTRIBUTE], 'Administrator': [ Permission.ADMINISTRATE, Permission.CONTRIBUTE, Permission.USE_API ], 'System user': [] } default_role_name = 'User' for role_name, permissions in roles.items(): role = Role.query.filter_by(name=role_name).first() if role is None: role = Role(name=role_name) role.reset_permissions() for permission in permissions: role.add_permission(permission) role.default = role.name == default_role_name db.session.add(role) db.session.commit() class Token(db.Model): __tablename__ = 'tokens' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields access_token = db.Column(db.String(64), index=True) access_expiration = db.Column(db.DateTime) refresh_token = db.Column(db.String(64), index=True) refresh_expiration = db.Column(db.DateTime) # Backrefs: user: User def expire(self): self.access_expiration = datetime.utcnow() self.refresh_expiration = datetime.utcnow() @staticmethod def clean(): """Remove any tokens that have been expired for more than a day.""" yesterday = datetime.utcnow() - timedelta(days=1) Token.query.filter(Token.refresh_expiration < yesterday).delete() class Avatar(HashidMixin, FileMixin, db.Model): __tablename__ = 'avatars' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True) @property def path(self): return os.path.join(self.user.path, 'avatar') def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.id } return json_serializeable class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields email = db.Column(db.String(254), index=True, unique=True) username = db.Column(db.String(64), index=True, unique=True) password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) member_since = db.Column(db.DateTime(), default=datetime.utcnow) setting_job_status_mail_notification_level = db.Column( IntEnumColumn(UserSettingJobStatusMailNotificationLevel), default=UserSettingJobStatusMailNotificationLevel.END ) last_seen = db.Column(db.DateTime()) full_name = db.Column(db.String(64)) about_me = db.Column(db.String(256)) location = db.Column(db.String(64)) website = db.Column(db.String(128)) organization = db.Column(db.String(128)) is_public = db.Column(db.Boolean, default=False) profile_privacy_settings = db.Column(db.Integer(), default=0) # Backrefs: role: Role # Relationships avatar = db.relationship( 'Avatar', backref='user', cascade='all, delete-orphan', uselist=False ) tesseract_ocr_pipeline_models = db.relationship( 'TesseractOCRPipelineModel', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) spacy_nlp_pipeline_models = db.relationship( 'SpaCyNLPPipelineModel', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) corpora = db.relationship( 'Corpus', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) jobs = db.relationship( 'Job', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) tokens = db.relationship( 'Token', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) def __init__(self, **kwargs): super().__init__(**kwargs) if self.role is not None: return if self.email == current_app.config['NOPAQUE_ADMIN']: self.role = Role.query.filter_by(name='Administrator').first() else: self.role = Role.query.filter_by(default=True).first() def __repr__(self): return f'' @property def jsonpatch_path(self): return f'/users/{self.hashid}' @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) @property def path(self): return os.path.join( current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id)) @staticmethod def create(**kwargs): user = User(**kwargs) db.session.add(user) db.session.flush(objects=[user]) db.session.refresh(user) try: os.mkdir(user.path) os.mkdir(os.path.join(user.path, 'spacy_nlp_pipeline_models')) os.mkdir(os.path.join(user.path, 'tesseract_ocr_pipeline_models')) os.mkdir(os.path.join(user.path, 'corpora')) os.mkdir(os.path.join(user.path, 'jobs')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return user @staticmethod def insert_defaults(): nopaque_user = User.query.filter_by(username='nopaque').first() system_user_role = Role.query.filter_by(name='System user').first() if nopaque_user is None: nopaque_user = User.create( username='nopaque', role=system_user_role ) db.session.add(nopaque_user) elif nopaque_user.role != system_user_role: nopaque_user.role = system_user_role db.session.commit() @staticmethod def reset_password(token, new_password): try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) except jwt.PyJWTError: return False if payload.get('purpose') != 'User.reset_password': return False user_hashid = payload.get('sub') user_id = hashids.decode(user_hashid) user = User.query.get(user_id) if user is None: return False user.password = new_password db.session.add(user) return True @staticmethod def verify_access_token(access_token, refresh_token=None): token = Token.query.filter(Token.access_token == access_token).first() if token is not None: if token.access_expiration > datetime.utcnow(): token.user.ping() db.session.commit() if token.user.role.name != 'System user': return token.user @staticmethod def verify_refresh_token(refresh_token, access_token): token = Token.query.filter((Token.refresh_token == refresh_token) & (Token.access_token == access_token)).first() if token is not None: if token.refresh_expiration > datetime.utcnow(): return token # someone tried to refresh with an expired token # revoke all tokens from this user as a precaution token.user.revoke_auth_tokens() db.session.commit() def can(self, permission): return self.role.has_permission(permission) def confirm(self, confirmation_token): try: payload = jwt.decode( confirmation_token, current_app.config['SECRET_KEY'], algorithms=['HS256'], issuer=current_app.config['SERVER_NAME'], options={'require': ['exp', 'iat', 'iss', 'purpose', 'sub']} ) current_app.logger.warning(payload) except jwt.PyJWTError: return False if payload.get('purpose') != 'user.confirm': return False if payload.get('sub') != self.hashid: return False self.confirmed = True db.session.add(self) return True def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def generate_auth_token(self): return Token( access_token=secrets.token_urlsafe(), access_expiration=datetime.utcnow() + timedelta(minutes=15), refresh_token=secrets.token_urlsafe(), refresh_expiration=datetime.utcnow() + timedelta(days=7), user=self ) def generate_confirm_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'user.confirm', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def generate_reset_password_token(self, expiration=3600): now = datetime.utcnow() payload = { 'exp': now + timedelta(seconds=expiration), 'iat': now, 'iss': current_app.config['SERVER_NAME'], 'purpose': 'User.reset_password', 'sub': self.hashid } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) def is_administrator(self): return self.can(Permission.ADMINISTRATE) def ping(self): self.last_seen = datetime.utcnow() def revoke_auth_tokens(self): for token in self.tokens: db.session.delete(token) def verify_password(self, password): if self.role.name == 'System user': return False return check_password_hash(self.password_hash, password) #region Profile Privacy settings def has_profile_privacy_setting(self, setting): return self.profile_privacy_settings & setting == setting def add_profile_privacy_setting(self, setting): if not self.has_profile_privacy_setting(setting): self.profile_privacy_settings += setting def remove_profile_privacy_setting(self, setting): if self.has_profile_privacy_setting(setting): self.profile_privacy_settings -= setting def reset_profile_privacy_settings(self): self.profile_privacy_settings = 0 #endregion Profile Privacy settings def to_json_serializeable( self, backrefs=False, relationships=False, filter_by_privacy_settings=False, include_corpus_relationships=False, include_job_relationships=False, include_tesseract_ocr_pipeline_model_relationships=False, include_spacy_nlp_pipeline_model_relationships=False, include_avatar_relationship=False ): json_serializeable = { 'id': self.hashid, 'confirmed': self.confirmed, 'email': self.email, 'last_seen': ( None if self.last_seen is None else f'{self.last_seen.isoformat()}Z' ), 'member_since': f'{self.member_since.isoformat()}Z', 'username': self.username, 'full_name': self.full_name, 'about_me': self.about_me, 'website': self.website, 'location': self.location, 'organization': self.organization, 'job_status_mail_notification_level': \ self.setting_job_status_mail_notification_level.name, 'is_public': self.is_public, 'show_email': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL), 'show_last_seen': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN), 'show_member_since': self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE) } if backrefs: json_serializeable['role'] = \ self.role.to_json_serializeable(backrefs=True) if relationships or include_corpus_relationships: json_serializeable['corpora'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.corpora } if relationships or include_job_relationships: json_serializeable['jobs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.jobs } if relationships or include_tesseract_ocr_pipeline_model_relationships: json_serializeable['tesseract_ocr_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.tesseract_ocr_pipeline_models } if relationships or include_spacy_nlp_pipeline_model_relationships: json_serializeable['spacy_nlp_pipeline_models'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.spacy_nlp_pipeline_models } if relationships or include_avatar_relationship: json_serializeable['avatar'] = ( None if self.avatar is None else self.avatar.to_json_serializeable(relationships=True) ) if filter_by_privacy_settings: if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_EMAIL): json_serializeable.pop('email') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_LAST_SEEN): json_serializeable.pop('last_seen') if not self.has_profile_privacy_setting(ProfilePrivacySettings.SHOW_MEMBER_SINCE): json_serializeable.pop('member_since') return json_serializeable class TesseractOCRPipelineModel(FileMixin, HashidMixin, db.Model): __tablename__ = 'tesseract_ocr_pipeline_models' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields title = db.Column(db.String(64)) description = db.Column(db.String(255)) version = db.Column(db.String(16)) compatible_service_versions = db.Column(ContainerColumn(list, 255)) publisher = db.Column(db.String(128)) publisher_url = db.Column(db.String(512)) publishing_url = db.Column(db.String(512)) publishing_year = db.Column(db.Integer) is_public = db.Column(db.Boolean, default=False) # Backrefs: user: User @property def path(self): return os.path.join( self.user.path, 'tesseract_ocr_pipeline_models', str(self.id) ) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/tesseract_ocr_pipeline_models/{self.hashid}' @property def url(self): return url_for( 'contributions.tesseract_ocr_pipeline_model', tesseract_ocr_pipeline_model_id=self.id ) @property def user_hashid(self): return self.user.hashid @staticmethod def insert_defaults(): nopaque_user = User.query.filter_by(username='nopaque').first() defaults_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'TesseractOCRPipelineModel.defaults.yml' ) with open(defaults_file, 'r') as f: defaults = yaml.safe_load(f) for m in defaults: model = TesseractOCRPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa if model is not None: model.compatible_service_versions = m['compatible_service_versions'] model.description = m['description'] model.publisher = m['publisher'] model.publisher_url = m['publisher_url'] model.publishing_url = m['publishing_url'] model.publishing_year = m['publishing_year'] model.is_public = True model.title = m['title'] model.version = m['version'] continue model = TesseractOCRPipelineModel( compatible_service_versions=m['compatible_service_versions'], description=m['description'], publisher=m['publisher'], publisher_url=m['publisher_url'], publishing_url=m['publishing_url'], publishing_year=m['publishing_year'], is_public=True, title=m['title'], user=nopaque_user, version=m['version'] ) db.session.add(model) db.session.flush(objects=[model]) db.session.refresh(model) model.filename = f'{model.id}.traineddata' r = requests.get(m['url'], stream=True) pbar = tqdm( desc=f'{model.title} ({model.filename})', unit="B", unit_scale=True, unit_divisor=1024, total=int(r.headers['Content-Length']) ) pbar.clear() with open(model.path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks pbar.update(len(chunk)) f.write(chunk) pbar.close() db.session.commit() def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'compatible_service_versions': self.compatible_service_versions, 'description': self.description, 'publisher': self.publisher, 'publisher_url': self.publisher_url, 'publishing_url': self.publishing_url, 'publishing_year': self.publishing_year, 'is_public': self.is_public, 'title': self.title, 'version': self.version, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) return json_serializeable class SpaCyNLPPipelineModel(FileMixin, HashidMixin, db.Model): __tablename__ = 'spacy_nlp_pipeline_models' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields title = db.Column(db.String(64)) description = db.Column(db.String(255)) version = db.Column(db.String(16)) compatible_service_versions = db.Column(ContainerColumn(list, 255)) publisher = db.Column(db.String(128)) publisher_url = db.Column(db.String(512)) publishing_url = db.Column(db.String(512)) publishing_year = db.Column(db.Integer) pipeline_name = db.Column(db.String(64)) is_public = db.Column(db.Boolean, default=False) # Backrefs: user: User @property def path(self): return os.path.join( self.user.path, 'spacy_nlp_pipeline_models', str(self.id) ) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/spacy_nlp_pipeline_models/{self.hashid}' @property def url(self): return url_for( 'contributions.spacy_nlp_pipeline_model', spacy_nlp_pipeline_model_id=self.id ) @property def user_hashid(self): return self.user.hashid @staticmethod def insert_defaults(): nopaque_user = User.query.filter_by(username='nopaque').first() defaults_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'SpaCyNLPPipelineModel.defaults.yml' ) with open(defaults_file, 'r') as f: defaults = yaml.safe_load(f) for m in defaults: model = SpaCyNLPPipelineModel.query.filter_by(title=m['title'], version=m['version']).first() # noqa if model is not None: model.compatible_service_versions = m['compatible_service_versions'] model.description = m['description'] model.publisher = m['publisher'] model.publisher_url = m['publisher_url'] model.publishing_url = m['publishing_url'] model.publishing_year = m['publishing_year'] model.is_public = True model.title = m['title'] model.version = m['version'] model.pipeline_name = m['pipeline_name'] continue model = SpaCyNLPPipelineModel( compatible_service_versions=m['compatible_service_versions'], description=m['description'], publisher=m['publisher'], publisher_url=m['publisher_url'], publishing_url=m['publishing_url'], publishing_year=m['publishing_year'], is_public=True, title=m['title'], user=nopaque_user, version=m['version'], pipeline_name=m['pipeline_name'] ) db.session.add(model) db.session.flush(objects=[model]) db.session.refresh(model) model.filename = m['url'].split('/')[-1] r = requests.get(m['url'], stream=True) pbar = tqdm( desc=f'{model.title} ({model.filename})', unit="B", unit_scale=True, unit_divisor=1024, total=int(r.headers['Content-Length']) ) pbar.clear() with open(model.path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks pbar.update(len(chunk)) f.write(chunk) pbar.close() db.session.commit() def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'compatible_service_versions': self.compatible_service_versions, 'description': self.description, 'publisher': self.publisher, 'publisher_url': self.publisher_url, 'publishing_url': self.publishing_url, 'publishing_year': self.publishing_year, 'pipeline_name': self.pipeline_name, 'is_public': self.is_public, 'title': self.title, 'version': self.version, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['user'] = self.user.to_json_serializeable(backrefs=True) return json_serializeable class JobInput(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_inputs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Backrefs: job: Job def __repr__(self): return f'' @property def content_url(self): return url_for( 'jobs.download_job_input', job_id=self.job.id, job_input_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/inputs/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'inputs', str(self.id)) @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-input-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, **self.file_mixin_to_json_serializeable() } if backrefs: json_serializeable['job'] = \ self.job.to_json_serializeable(backrefs=True) return json_serializeable class JobResult(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Fields description = db.Column(db.String(255)) # Backrefs: job: Job def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_result', job_id=self.job_id, job_result_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/results/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'results', str(self.id)) @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-result-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'description': self.description, **self.file_mixin_to_json_serializeable( backrefs=backrefs, relationships=relationships ) } if backrefs: json_serializeable['job'] = \ self.job.to_json_serializeable(backrefs=True) return json_serializeable class Job(HashidMixin, db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = \ db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) service_args = db.Column(ContainerColumn(dict, 255)) service_version = db.Column(db.String(16)) status = db.Column( IntEnumColumn(JobStatus), default=JobStatus.INITIALIZING ) title = db.Column(db.String(32)) # Backrefs: user: User # Relationships inputs = db.relationship( 'JobInput', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) results = db.relationship( 'JobResult', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/jobs/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'jobs', str(self.id)) @property def url(self): return url_for('jobs.job', job_id=self.id) @property def user_hashid(self): return self.user.hashid @staticmethod def create(**kwargs): job = Job(**kwargs) db.session.add(job) db.session.flush(objects=[job]) db.session.refresh(job) try: os.mkdir(job.path) os.mkdir(os.path.join(job.path, 'inputs')) os.mkdir(os.path.join(job.path, 'pipeline_data')) os.mkdir(os.path.join(job.path, 'results')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return job def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa self.status = JobStatus.CANCELING db.session.commit() while self.status != JobStatus.CANCELED: # In case the daemon handled a job in any way if self.status != JobStatus.CANCELING: self.status = JobStatus.CANCELING db.session.commit() sleep(1) db.session.refresh(self) try: shutil.rmtree(self.path) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e db.session.delete(self) def restart(self): ''' Restart a job - only if the status is failed ''' if self.status != JobStatus.FAILED: raise Exception('Job status is not "failed"') shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True) shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) for result in self.results: db.session.delete(result) self.end_date = None self.status = JobStatus.SUBMITTED def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'creation_date': f'{self.creation_date.isoformat()}Z', 'description': self.description, 'end_date': ( None if self.end_date is None else f'{self.end_date.isoformat()}Z' ), 'service': self.service, 'service_args': self.service_args, 'service_version': self.service_version, 'status': self.status.name, 'title': self.title, 'url': self.url } if backrefs: json_serializeable['user'] = \ self.user.to_json_serializeable(backrefs=True) if relationships: json_serializeable['inputs'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.inputs } json_serializeable['results'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.results } return json_serializeable class CorpusFile(FileMixin, HashidMixin, db.Model): __tablename__ = 'corpus_files' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) # Fields author = db.Column(db.String(255)) description = db.Column(db.String(255)) publishing_year = db.Column(db.Integer) title = db.Column(db.String(255)) address = db.Column(db.String(255)) booktitle = db.Column(db.String(255)) chapter = db.Column(db.String(255)) editor = db.Column(db.String(255)) institution = db.Column(db.String(255)) journal = db.Column(db.String(255)) pages = db.Column(db.String(255)) publisher = db.Column(db.String(255)) school = db.Column(db.String(255)) # Backrefs: corpus: Corpus @property def download_url(self): return url_for( 'corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def jsonpatch_path(self): return f'{self.corpus.jsonpatch_path}/files/{self.hashid}' @property def path(self): return os.path.join(self.corpus.path, 'files', str(self.id)) @property def url(self): return url_for( 'corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def user_hashid(self): return self.corpus.user.hashid @property def user_id(self): return self.corpus.user_id def delete(self): try: os.remove(self.path) except OSError as e: current_app.logger.error(e) db.session.delete(self) self.corpus.status = CorpusStatus.UNPREPARED def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'url': self.url, 'address': self.address, 'author': self.author, 'booktitle': self.booktitle, 'chapter': self.chapter, 'editor': self.editor, 'institution': self.institution, 'journal': self.journal, 'pages': self.pages, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'school': self.school, 'title': self.title, **self.file_mixin_to_json_serializeable( backrefs=backrefs, relationships=relationships ) } if backrefs: json_serializeable['corpus'] = \ self.corpus.to_json_serializeable(backrefs=True) return json_serializeable class Corpus(HashidMixin, db.Model): ''' Class to define a corpus. ''' __tablename__ = 'corpora' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) status = db.Column( IntEnumColumn(CorpusStatus), default=CorpusStatus.UNPREPARED ) title = db.Column(db.String(32)) num_analysis_sessions = db.Column(db.Integer, default=0) num_tokens = db.Column(db.Integer, default=0) is_public = db.Column(db.Boolean, default=False) # Backrefs: user: User # Relationships files = db.relationship( 'CorpusFile', backref='corpus', lazy='dynamic', cascade='all, delete-orphan' ) # "static" attributes max_num_tokens = 2_147_483_647 def __repr__(self): return f'' @property def analysis_url(self): return url_for('corpora.analyse_corpus', corpus_id=self.id) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/corpora/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'corpora', str(self.id)) @property def url(self): return url_for('corpora.corpus', corpus_id=self.id) @property def user_hashid(self): return self.user.hashid @staticmethod def create(**kwargs): corpus = Corpus(**kwargs) db.session.add(corpus) db.session.flush(objects=[corpus]) db.session.refresh(corpus) try: os.mkdir(corpus.path) os.mkdir(os.path.join(corpus.path, 'files')) os.mkdir(os.path.join(corpus.path, 'cwb')) os.mkdir(os.path.join(corpus.path, 'cwb', 'data')) os.mkdir(os.path.join(corpus.path, 'cwb', 'registry')) except OSError as e: current_app.logger.error(e) db.session.rollback() raise e return corpus def build(self): corpus_element = ET.fromstring('\n') for corpus_file in self.files: normalized_vrt_path = os.path.join(self.path, 'cwb', f'{corpus_file.id}.norm.vrt') try: normalize_vrt_file(corpus_file.path, normalized_vrt_path) except: self.status = CorpusStatus.FAILED return element_tree = ET.parse(normalized_vrt_path) text_element = element_tree.getroot() text_element.set('author', corpus_file.author) text_element.set('title', corpus_file.title) text_element.set( 'publishing_year', f'{corpus_file.publishing_year}' ) text_element.set('address', corpus_file.address or 'NULL') text_element.set('booktitle', corpus_file.booktitle or 'NULL') text_element.set('chapter', corpus_file.chapter or 'NULL') text_element.set('editor', corpus_file.editor or 'NULL') text_element.set('institution', corpus_file.institution or 'NULL') text_element.set('journal', corpus_file.journal or 'NULL') text_element.set('pages', f'{corpus_file.pages}' or 'NULL') text_element.set('publisher', corpus_file.publisher or 'NULL') text_element.set('school', corpus_file.school or 'NULL') text_element.tail = '\n' # corpus_element.insert(1, text_element) corpus_element.append(text_element) ET.ElementTree(corpus_element).write( os.path.join(self.path, 'cwb', 'corpus.vrt'), encoding='utf-8' ) self.status = CorpusStatus.SUBMITTED def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def to_json_serializeable(self, backrefs=False, relationships=False): json_serializeable = { 'id': self.hashid, 'creation_date': f'{self.creation_date.isoformat()}Z', 'description': self.description, 'max_num_tokens': self.max_num_tokens, 'num_analysis_sessions': self.num_analysis_sessions, 'num_tokens': self.num_tokens, 'status': self.status.name, 'title': self.title, 'is_public': self.is_public } if backrefs: json_serializeable['user'] = self.user.to_json_serializeable(backrefs=True) if relationships: json_serializeable['files'] = { x.hashid: x.to_json_serializeable(relationships=True) for x in self.files } return json_serializeable # endregion models ############################################################################## # event_handlers # ############################################################################## # region event_handlers @db.event.listens_for(Corpus, 'after_delete') @db.event.listens_for(CorpusFile, 'after_delete') @db.event.listens_for(Job, 'after_delete') @db.event.listens_for(JobInput, 'after_delete') @db.event.listens_for(JobResult, 'after_delete') def ressource_after_delete(mapper, connection, ressource): jsonpatch = [{'op': 'remove', 'path': ressource.jsonpatch_path}] room = f'users.{ressource.user_hashid}' socketio.emit('users.patch', jsonpatch, room=room) room = f'/users/{ressource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Corpus, 'after_insert') @db.event.listens_for(CorpusFile, 'after_insert') @db.event.listens_for(Job, 'after_insert') @db.event.listens_for(JobInput, 'after_insert') @db.event.listens_for(JobResult, 'after_insert') def ressource_after_insert_handler(mapper, connection, ressource): value = ressource.to_json_serializeable() for attr in mapper.relationships: value[attr.key] = {} jsonpatch = [ {'op': 'add', 'path': ressource.jsonpatch_path, 'value': value} ] room = f'/users/{ressource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Corpus, 'after_update') @db.event.listens_for(CorpusFile, 'after_update') @db.event.listens_for(Job, 'after_update') @db.event.listens_for(JobInput, 'after_update') @db.event.listens_for(JobResult, 'after_update') def ressource_after_update_handler(mapper, connection, ressource): jsonpatch = [] for attr in db.inspect(ressource).attrs: if attr.key in mapper.relationships: continue if not attr.load_history().has_changes(): continue if isinstance(attr.value, datetime): value = f'{attr.value.isoformat()}Z' elif isinstance(attr.value, Enum): value = attr.value.name else: value = attr.value jsonpatch.append( { 'op': 'replace', 'path': f'{ressource.jsonpatch_path}/{attr.key}', 'value': value } ) if jsonpatch: room = f'/users/{ressource.user_hashid}' socketio.emit('PATCH', jsonpatch, room=room) @db.event.listens_for(Job, 'after_update') def job_after_update_handler(mapper, connection, job): for attr in db.inspect(job).attrs: if attr.key != 'status': continue if not attr.load_history().has_changes(): return if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.NONE: return if job.user.setting_job_status_mail_notification_level == UserSettingJobStatusMailNotificationLevel.END: if job.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: return msg = create_message( job.user.email, f'Status update for your Job "{job.title}"', 'tasks/email/notification', job=job ) mail.send(msg) # endregion event_handlers ############################################################################## # misc # ############################################################################## # region misc @login.user_loader def load_user(user_id): return User.query.get(int(user_id)) # endregion misc