from datetime import datetime, timedelta from flask import current_app, url_for from flask_login import UserMixin from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer from time import sleep from werkzeug.security import generate_password_hash, check_password_hash import xml.etree.ElementTree as ET from . import db, hashids, login import base64 import enum import os import shutil class HashidMixin: @property def hashid(self): return hashids.encode(self.id) class FileMixin: creation_date = db.Column(db.DateTime, default=datetime.utcnow) filename = db.Column(db.String(256)) last_edited_date = db.Column(db.DateTime, default=datetime.utcnow) mimetype = db.Column(db.String(255)) def file_mixin_to_dict(self, backrefs=False, relationships=False): return { 'creation_date': self.creation_date.isoformat() + 'Z', 'filename': self.filename, 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'mimetype': self.mimetype } class Permission(enum.IntEnum): ''' Defines User permissions as integers by the power of 2. User permission can be evaluated using the bitwise operator &. ''' ADMINISTRATE = 1 USE_API = 2 class Role(HashidMixin, db.Model): __tablename__ = 'roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields default = db.Column(db.Boolean, default=False, index=True) name = db.Column(db.String(64), unique=True) permissions = db.Column(db.Integer) # Relationships users = db.relationship('User', backref='role', lazy='dynamic') def __init__(self, **kwargs): super().__init__(**kwargs) if self.permissions is None: self.permissions = 0 def __repr__(self): return f'' def add_permission(self, permission): if not self.has_permission(permission): self.permissions += permission def has_permission(self, permission): return self.permissions & permission == permission def remove_permission(self, permission): if self.has_permission(permission): self.permissions -= permission def reset_permissions(self): self.permissions = 0 def to_dict(self, backrefs=False, relationships=False): dict_role = { 'id': self.hashid, 'default': self.default, 'name': self.name, 'permissions': self.permissions } if relationships: dict_role['users']: { x.to_dict(backrefs=False, relationships=True) for x in self.users } return dict_role @staticmethod def insert_roles(): roles = { 'User': [], 'Administrator': [Permission.USE_API, Permission.ADMINISTRATE] } default_role_name = 'User' for role_name, permissions in roles.items(): role = Role.query.filter_by(name=role_name).first() if role is None: role = Role(name=role_name) role.reset_permissions() for permission in permissions: role.add_permission(permission) role.default = role.name == default_role_name db.session.add(role) db.session.commit() class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields confirmed = db.Column(db.Boolean, default=False) email = db.Column(db.String(254), unique=True, index=True) last_seen = db.Column(db.DateTime(), default=datetime.utcnow) member_since = db.Column(db.DateTime(), default=datetime.utcnow) password_hash = db.Column(db.String(128)) token = db.Column(db.String(32), index=True, unique=True) token_expiration = db.Column(db.DateTime) username = db.Column(db.String(64), unique=True, index=True) setting_dark_mode = db.Column(db.Boolean, default=False) setting_job_status_mail_notifications = db.Column( db.String(16), default='end') setting_job_status_site_notifications = db.Column( db.String(16), default='all') # Backrefs: role: Role # Relationships corpora = db.relationship( 'Corpus', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) jobs = db.relationship( 'Job', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) query_results = db.relationship( 'QueryResult', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) def __init__(self, **kwargs): super().__init__(**kwargs) if self.role is not None: return if self.email == current_app.config['NOPAQUE_ADMIN']: self.role = Role.query.filter_by(name='Administrator').first() else: self.role = Role.query.filter_by(default=True).first() def __repr__(self): return f'' @property def jsonpatch_path(self): return f'/users/{self.hashid}' @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) @property def path(self): return os.path.join( current_app.config.get('NOPAQUE_DATA_DIR'), str(self.id)) def can(self, permission): return self.role.has_permission(permission) def confirm(self, token): s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False if data.get('confirm') != self.hashid: return False self.confirmed = True db.session.add(self) return True def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def generate_confirmation_token(self, expiration=3600): s = TimedJSONWebSignatureSerializer( current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.hashid}).decode('utf-8') def generate_reset_token(self, expiration=3600): s = TimedJSONWebSignatureSerializer( current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.hashid}).decode('utf-8') def get_token(self, expires_in=3600): now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode('utf-8') self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def is_administrator(self): return self.can(Permission.ADMINISTRATE) def revoke_token(self): self.token_expiration = datetime.utcnow() - timedelta(seconds=1) def to_dict(self, backrefs=False, relationships=False): dict_user = { 'id': self.hashid, 'role_id': self.role.hashid, 'confirmed': self.confirmed, 'email': self.email, 'last_seen': self.last_seen.isoformat() + 'Z', 'member_since': self.member_since.isoformat() + 'Z', 'username': self.username, 'settings': { 'dark_mode': self.setting_dark_mode, 'job_status_mail_notifications': self.setting_job_status_mail_notifications, 'job_status_site_notifications': self.setting_job_status_site_notifications } } if backrefs: dict_user['role'] = self.role.to_dict( backrefs=True, relationships=False) if relationships: dict_user['corpora'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.corpora } dict_user['jobs'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.jobs } dict_user['query_results'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.query_results } return dict_user def verify_password(self, password): return check_password_hash(self.password_hash, password) @staticmethod def check_token(token): user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user @staticmethod def reset_password(token, new_password): s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False user = User.query.get(data.get('reset')) if user is None: return False user.password = new_password db.session.add(user) return True class JobInput(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_inputs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Backrefs: job: Job def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_input', job_id=self.job.id, job_input_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/inputs/{self.hashid}' @property def path(self): return os.path.join(self.job.path, self.filename) def to_dict(self, backrefs=False, relationships=False): dict_job_input = { 'id': self.hashid, 'job_id': self.job.hashid, 'download_url': self.download_url, 'url': self.url, **self.file_mixin_to_dict() } if backrefs: dict_job_input['job'] = self.job.to_dict( backrefs=True, relationships=False) return dict_job_input @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-input-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id class JobResult(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Backrefs: job: Job def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_result', job_id=self.job_id, job_result_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/results/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'output', self.filename) def to_dict(self, backrefs=False, relationships=False): dict_job_result = { 'id': self.hashid, 'job_id': self.job.hashid, 'download_url': self.download_url, 'url': self.url, **self.file_mixin_to_dict( backrefs=backrefs, relationships=relationships) } if backrefs: dict_job_result['job'] = self.job.to_dict( backrefs=True, relationships=False) return dict_job_result @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-result-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id class Job(HashidMixin, db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) ''' ' Service specific arguments as string list. ' Example: ["-l eng", "--binarize"] ''' service_args = db.Column(db.String(255)) service_version = db.Column(db.String(16)) status = db.Column(db.String(16)) title = db.Column(db.String(32)) # Backrefs: user: User # Relationships inputs = db.relationship( 'JobInput', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) results = db.relationship( 'JobResult', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/jobs/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'jobs', str(self.id)) @property def url(self): return url_for('jobs.job', job_id=self.id) @property def user_hashid(self): return self.user.hashid def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in ['complete', 'failed']: self.status = 'canceling' db.session.commit() while self.status != 'canceled': # In case the daemon handled a job in any way if self.status != 'canceling': self.status = 'canceling' db.session.commit() sleep(1) db.session.refresh(self) shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def restart(self): ''' Restart a job - only if the status is complete or failed ''' if self.status not in ['complete', 'failed']: raise Exception('Could not restart job: status is not "complete/failed"') # noqa shutil.rmtree(os.path.join(self.path, 'output'), ignore_errors=True) shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) # noqa for result in self.results: db.session.delete(result) self.end_date = None self.status = 'submitted' def to_dict(self, backrefs=False, relationships=False): dict_job = { 'id': self.hashid, 'user_id': self.user.hashid, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'end_date': None if self.end_date is None else f'{self.end_date.isoformat()}Z', # noqa 'service': self.service, 'service_args': self.service_args, 'service_version': self.service_version, 'status': self.status, 'title': self.title, 'url': self.url } if backrefs: dict_job['user'] = self.user.to_dict( backrefs=True, relationships=False) if relationships: dict_job['inputs'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.inputs } dict_job['results'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.results } return dict_job class CorpusFile(FileMixin, HashidMixin, db.Model): __tablename__ = 'corpus_files' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) # Fields address = db.Column(db.String(255)) author = db.Column(db.String(255)) booktitle = db.Column(db.String(255)) chapter = db.Column(db.String(255)) editor = db.Column(db.String(255)) institution = db.Column(db.String(255)) journal = db.Column(db.String(255)) pages = db.Column(db.String(255)) publisher = db.Column(db.String(255)) publishing_year = db.Column(db.Integer) school = db.Column(db.String(255)) title = db.Column(db.String(255)) # Backrefs: corpus: Corpus @property def download_url(self): return url_for( 'corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def jsonpatch_path(self): return f'/{self.corpus.jsonpatch_path}/files/{self.hashid}' @property def path(self): return os.path.join(self.corpus.path, self.filename) @property def url(self): return url_for( 'corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def user_hashid(self): return self.corpus.user.hashid @property def user_id(self): return self.corpus.user_id def delete(self): try: os.remove(self.path) except OSError: current_app.logger.error( 'Removing {} led to an OSError!'.format(self.path) ) pass db.session.delete(self) self.corpus.status = 'unprepared' def to_dict(self, backrefs=False, relationships=False): dict_corpus_file = { 'id': self.hashid, 'corpus_id': self.corpus.hashid, 'download_url': self.download_url, 'url': self.url, 'address': self.address, 'author': self.author, 'booktitle': self.booktitle, 'chapter': self.chapter, 'editor': self.editor, 'institution': self.institution, 'journal': self.journal, 'pages': self.pages, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'school': self.school, 'title': self.title, **self.file_mixin_to_dict( backrefs=backrefs, relationships=relationships) } if backrefs: dict_corpus_file['corpus'] = self.corpus.to_dict( backrefs=True, relationships=False) class Corpus(HashidMixin, db.Model): ''' Class to define a corpus. ''' __tablename__ = 'corpora' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) last_edited_date = db.Column(db.DateTime(), default=datetime.utcnow) status = db.Column(db.String(16), default='unprepared') title = db.Column(db.String(32)) num_analysis_sessions = db.Column(db.Integer, default=0) num_tokens = db.Column(db.Integer, default=0) archive_file = db.Column(db.String(255)) # Backrefs: user: User # Relationships files = db.relationship( 'CorpusFile', backref='corpus', lazy='dynamic', cascade='all, delete-orphan' ) # Python class variables max_num_tokens = 2147483647 def __repr__(self): return f'' @property def analysis_url(self): return url_for('corpora.analyse_corpus', corpus_id=self.id) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/corpora/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'corpora', str(self.id)) @property def url(self): return url_for('corpora.corpus', corpus_id=self.id) @property def user_hashid(self): return self.user.hashid def build(self): output_dir = os.path.join(self.path, 'merged') shutil.rmtree(output_dir, ignore_errors=True) os.mkdir(output_dir) output_file = os.path.join(output_dir, 'corpus.vrt') corpus_element = ET.fromstring('\n') for corpus_file in self.files: element_tree = ET.parse(corpus_file.path) text_node = element_tree.find('text') text_node.set('address', corpus_file.address or 'NULL') text_node.set('author', corpus_file.author) text_node.set('booktitle', corpus_file.booktitle or 'NULL') text_node.set('chapter', corpus_file.chapter or 'NULL') text_node.set('editor', corpus_file.editor or 'NULL') text_node.set('institution', corpus_file.institution or 'NULL') text_node.set('journal', corpus_file.journal or 'NULL') text_node.set('pages', corpus_file.pages or 'NULL') text_node.set('publisher', corpus_file.publisher or 'NULL') text_node.set('publishing_year', str(corpus_file.publishing_year)) text_node.set('school', corpus_file.school or 'NULL') text_node.set('title', corpus_file.title) corpus_element.insert(1, text_node) ET.ElementTree(corpus_element).write(output_file, encoding='utf-8') self.last_edited_date = datetime.utcnow() self.status = 'submitted' def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def to_dict(self, backrefs=False, relationships=False): dict_corpus = { 'id': self.hashid, 'user_id': self.user.hashid, 'analysis_url': self.analysis_url, 'url': self.url, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'max_num_tokens': self.max_num_tokens, 'num_analysis_sessions': self.num_analysis_sessions, 'num_tokens': self.num_tokens, 'status': self.status, 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'title': self.title } if backrefs: dict_corpus['user'] = self.user.to_dict( backrefs=True, relationships=False) if relationships: dict_corpus['files'] = { x.id: x.to_dict(backrefs=False, relationships=True) for x in self.files } return dict_corpus class QueryResult(FileMixin, HashidMixin, db.Model): __tablename__ = 'query_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields description = db.Column(db.String(255)) query_metadata = db.Column(db.JSON()) title = db.Column(db.String(32)) # Backrefs: user: User def __repr__(self): ''' String representation of the QueryResult. For human readability. ''' return f'' @property def download_url(self): return url_for( 'corpora.download_query_result', query_result_id=self.id) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/query_results/{self.hashid}' @property def path(self): return os.path.join( self.user.path, 'query_results', str(self.id), self.filename) @property def url(self): return url_for('corpora.query_result', query_result_id=self.id) @property def user_hashid(self): return self.user.hashid def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def to_dict(self, backrefs=False, relationships=False): dict_query_result = { 'id': self.hashid, 'user_id': self.user.hashid, 'download_url': self.download_url, 'url': self.url, 'corpus_title': self.query_metadata['corpus_name'], 'description': self.description, 'filename': self.filename, 'query': self.query_metadata['query'], 'query_metadata': self.query_metadata, 'title': self.title, **self.file_mixin_to_dict( backrefs=backrefs, relationships=relationships) } if backrefs: dict_query_result['user'] = self.user.to_dict( backrefs=True, relationships=False) @login.user_loader def load_user(user_id): return User.query.get(int(user_id))