from datetime import datetime, timedelta from flask import current_app, url_for from flask_login import UserMixin from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer from time import sleep from werkzeug.security import generate_password_hash, check_password_hash import xml.etree.ElementTree as ET from . import db, login import base64 import os import shutil class Permission: ''' Defines User permissions as integers by the power of 2. User permission can be evaluated using the bitwise operator &. 3 equals to CREATE_JOB and DELETE_JOB and so on. ''' MANAGE_CORPORA = 1 MANAGE_JOBS = 2 # PERMISSION_NAME = 4 # PERMISSION_NAME = 8 ADMIN = 16 class Role(db.Model): ''' Model for the different roles Users can have. Is a one-to-many relationship. A Role can be associated with many User rows. ''' __tablename__ = 'roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields default = db.Column(db.Boolean, default=False, index=True) name = db.Column(db.String(64), unique=True) permissions = db.Column(db.Integer) # Relationships users = db.relationship('User', backref='role', lazy='dynamic') def to_dict(self, include_relationships=True): return {'id': self.id, 'default': self.default, 'name': self.name, 'permissions': self.permissions} def __init__(self, **kwargs): super(Role, self).__init__(**kwargs) if self.permissions is None: self.permissions = 0 def __repr__(self): ''' String representation of the Role. For human readability. ''' return ''.format(self.name) def add_permission(self, perm): ''' Add new permission to Role. Input is a Permission. ''' if not self.has_permission(perm): self.permissions += perm def remove_permission(self, perm): ''' Removes permission from a Role. Input a Permission. ''' if self.has_permission(perm): self.permissions -= perm def reset_permissions(self): ''' Resets permissions to zero. Zero equals no permissions at all. ''' self.permissions = 0 def has_permission(self, perm): ''' Checks if a Role has a specific Permission. Does this with the bitwise operator. ''' return self.permissions & perm == perm @staticmethod def insert_roles(): ''' Inserts roles into the database. This has to be executed befor Users are added to the database. Otherwiese Users will not have a Role assigned to them. Order of the roles dictionary determines the ID of each role. Users have the ID 1 and Administrators have the ID 2. ''' roles = {'User': [Permission.MANAGE_CORPORA, Permission.MANAGE_JOBS], 'Administrator': [Permission.MANAGE_CORPORA, Permission.MANAGE_JOBS, Permission.ADMIN]} default_role = 'User' for r in roles: role = Role.query.filter_by(name=r).first() if role is None: role = Role(name=r) role.reset_permissions() for perm in roles[r]: role.add_permission(perm) role.default = (role.name == default_role) db.session.add(role) db.session.commit() class User(UserMixin, db.Model): ''' Model for Users that are registered to Opaque. ''' __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields confirmed = db.Column(db.Boolean, default=False) email = db.Column(db.String(254), unique=True, index=True) last_seen = db.Column(db.DateTime(), default=datetime.utcnow) member_since = db.Column(db.DateTime(), default=datetime.utcnow) password_hash = db.Column(db.String(128)) setting_dark_mode = db.Column(db.Boolean, default=False) setting_job_status_mail_notifications = db.Column(db.String(16), default='end') setting_job_status_site_notifications = db.Column(db.String(16), default='all') token = db.Column(db.String(32), index=True, unique=True) token_expiration = db.Column(db.DateTime) username = db.Column(db.String(64), unique=True, index=True) # Relationships corpora = db.relationship('Corpus', backref='creator', lazy='dynamic', cascade='save-update, merge, delete') jobs = db.relationship('Job', backref='creator', lazy='dynamic', cascade='save-update, merge, delete') query_results = db.relationship('QueryResult', backref='creator', cascade='save-update, merge, delete', lazy='dynamic') @property def path(self): return os.path.join(current_app.config['NOPAQUE_DATA_DIR'], str(self.id)) @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def to_dict(self, include_relationships=True): dict_user = { 'id': self.id, 'role_id': self.role_id, 'confirmed': self.confirmed, 'email': self.email, 'last_seen': self.last_seen.isoformat() + 'Z', 'member_since': self.member_since.isoformat() + 'Z', 'settings': {'dark_mode': self.setting_dark_mode, 'job_status_mail_notifications': self.setting_job_status_mail_notifications, 'job_status_site_notifications': self.setting_job_status_site_notifications}, 'username': self.username, 'role': self.role.to_dict() } if include_relationships: dict_user['corpora'] = {corpus.id: corpus.to_dict() for corpus in self.corpora} dict_user['jobs'] = {job.id: job.to_dict() for job in self.jobs} dict_user['query_results'] = { query_result.id: query_result.to_dict() for query_result in self.query_results } return dict_user def __repr__(self): ''' String representation of the User. For human readability. ''' return ''.format(self.username) def __init__(self, **kwargs): super(User, self).__init__(**kwargs) if self.role is None: if self.email == current_app.config['NOPAQUE_ADMIN']: self.role = Role.query.filter_by(name='Administrator').first() if self.role is None: self.role = Role.query.filter_by(default=True).first() def generate_confirmation_token(self, expiration=3600): ''' Generates a confirmation token for user confirmation via email. ''' s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.id}).decode('utf-8') def generate_reset_token(self, expiration=3600): ''' Generates a reset token for password reset via email. ''' s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.id}).decode('utf-8') def confirm(self, token): ''' Confirms User if the given token is valid and not expired. ''' s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) return True @staticmethod def reset_password(token, new_password): ''' Resets password for User if the given token is valid and not expired. ''' s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False user = User.query.get(data.get('reset')) if user is None: return False user.password = new_password db.session.add(user) return True def verify_password(self, password): return check_password_hash(self.password_hash, password) def can(self, perm): ''' Checks if a User with its current role can doe something. Checks if the associated role actually has the needed Permission. ''' return self.role is not None and self.role.has_permission(perm) def is_administrator(self): ''' Checks if User has Admin permissions. ''' return self.can(Permission.ADMIN) def delete(self): ''' Delete the user and its corpora and jobs from database and filesystem. ''' shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def get_token(self, expires_in=3600): now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode('utf-8') self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def revoke_token(self): self.token_expiration = datetime.utcnow() - timedelta(seconds=1) @staticmethod def check_token(token): user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user class JobInput(db.Model): ''' Class to define JobInputs. ''' __tablename__ = 'job_inputs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Fields filename = db.Column(db.String(255)) @property def download_url(self): return url_for('jobs.download_job_input', job_id=self.job_id, job_input_id=self.id) @property def jsonpatch_path(self): return '/jobs/{}/inputs/{}'.format(self.job_id, self.id) @property def path(self): return os.path.join(self.job.path, self.filename) @property def url(self): return url_for('jobs.job', job_id=self.job_id, _anchor='job-{}-input-{}'.format(self.job_id, self.id)) @property def user_id(self): return self.job.user_id def __repr__(self): ''' String representation of the JobInput. For human readability. ''' return ''.format(self.filename) def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, 'job_id': self.job_id, 'filename': self.filename} class JobResult(db.Model): ''' Class to define JobResults. ''' __tablename__ = 'job_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Fields filename = db.Column(db.String(255)) @property def download_url(self): return url_for('jobs.download_job_result', job_id=self.job_id, job_result_id=self.id) @property def jsonpatch_path(self): return '/jobs/{}/results/{}'.format(self.job_id, self.id) @property def path(self): return os.path.join(self.job.path, 'output', self.filename) @property def url(self): return url_for('jobs.job', job_id=self.job_id, _anchor='job-{}-result-{}'.format(self.job_id, self.id)) @property def user_id(self): return self.job.user_id def __repr__(self): ''' String representation of the JobResult. For human readability. ''' return ''.format(self.filename) def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, 'job_id': self.job_id, 'filename': self.filename} class Job(db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) ''' ' Service specific arguments as string list. ' Example: ["-l eng", "--binarize"] ''' service_args = db.Column(db.String(255)) service_version = db.Column(db.String(16)) status = db.Column(db.String(16)) title = db.Column(db.String(32)) # Relationships inputs = db.relationship('JobInput', backref='job', lazy='dynamic', cascade='save-update, merge, delete') results = db.relationship('JobResult', backref='job', lazy='dynamic', cascade='save-update, merge, delete') @property def jsonpatch_path(self): return '/jobs/{}'.format(self.id) @property def path(self): return os.path.join(self.creator.path, 'jobs', str(self.id)) @property def url(self): return url_for('jobs.job', job_id=self.id) def __repr__(self): ''' String representation of the Job. For human readability. ''' return ''.format(self.title) def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in ['complete', 'failed']: self.status = 'canceling' db.session.commit() while self.status != 'canceled': # In case the daemon handled a job in any way if self.status != 'canceling': self.status = 'canceling' db.session.commit() sleep(1) db.session.refresh(self) shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def restart(self): ''' Restart a job - only if the status is complete or failed ''' if self.status not in ['complete', 'failed']: raise Exception('Could not restart job: status is not "complete/failed"') # noqa shutil.rmtree(os.path.join(self.path, 'output'), ignore_errors=True) shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) # noqa for result in self.results: db.session.delete(result) self.end_date = None self.status = 'submitted' def to_dict(self, include_relationships=True): dict_job = { 'url': self.url, 'id': self.id, 'user_id': self.user_id, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'end_date': self.end_date.isoformat() + 'Z' if self.end_date else None, 'service': self.service, 'service_args': self.service_args, 'service_version': self.service_version, 'status': self.status, 'title': self.title, } if include_relationships: dict_job['inputs'] = {input.id: input.to_dict() for input in self.inputs} dict_job['results'] = {result.id: result.to_dict() for result in self.results} return dict_job class CorpusFile(db.Model): ''' Class to define Files. ''' __tablename__ = 'corpus_files' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) # Fields address = db.Column(db.String(255)) author = db.Column(db.String(255)) booktitle = db.Column(db.String(255)) chapter = db.Column(db.String(255)) editor = db.Column(db.String(255)) filename = db.Column(db.String(255)) institution = db.Column(db.String(255)) journal = db.Column(db.String(255)) pages = db.Column(db.String(255)) publisher = db.Column(db.String(255)) publishing_year = db.Column(db.Integer) school = db.Column(db.String(255)) title = db.Column(db.String(255)) @property def download_url(self): return url_for('corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id) @property def jsonpatch_path(self): return '/corpora/{}/files/{}'.format(self.corpus_id, self.id) @property def path(self): return os.path.join(self.corpus.path, self.filename) @property def url(self): return url_for('corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id) @property def user_id(self): return self.corpus.user_id def delete(self): try: os.remove(self.path) except OSError: current_app.logger.error( 'Removing {} led to an OSError!'.format(self.path) ) pass db.session.delete(self) self.corpus.status = 'unprepared' def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, 'corpus_id': self.corpus_id, 'address': self.address, 'author': self.author, 'booktitle': self.booktitle, 'chapter': self.chapter, 'editor': self.editor, 'filename': self.filename, 'institution': self.institution, 'journal': self.journal, 'pages': self.pages, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'school': self.school, 'title': self.title} class Corpus(db.Model): ''' Class to define a corpus. ''' __tablename__ = 'corpora' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) last_edited_date = db.Column(db.DateTime(), default=datetime.utcnow) status = db.Column(db.String(16), default='unprepared') title = db.Column(db.String(32)) num_analysis_sessions = db.Column(db.Integer, default=0) num_tokens = db.Column(db.Integer, default=0) archive_file = db.Column(db.String(255)) # Relationships files = db.relationship('CorpusFile', backref='corpus', lazy='dynamic', cascade='save-update, merge, delete') # Python class variables max_num_tokens = 2147483647 @property def analysis_url(self): return url_for('corpora.analyse_corpus', corpus_id=self.id) @property def jsonpatch_path(self): return '/corpora/{}'.format(self.id) @property def path(self): return os.path.join(self.creator.path, 'corpora', str(self.id)) @property def url(self): return url_for('corpora.corpus', corpus_id=self.id) def to_dict(self, include_relationships=True): dict_corpus = { 'analysis_url': self.analysis_url, 'url': self.url, 'id': self.id, 'user_id': self.user_id, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'max_num_tokens': self.max_num_tokens, 'num_analysis_sessions': self.num_analysis_sessions, 'num_tokens': self.num_tokens, 'status': self.status, 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'title': self.title } if include_relationships: dict_corpus['files'] = {file.id: file.to_dict() for file in self.files} return dict_corpus def build(self): output_dir = os.path.join(self.path, 'merged') shutil.rmtree(output_dir, ignore_errors=True) os.mkdir(output_dir) output_file = os.path.join(output_dir, 'corpus.vrt') corpus_element = ET.fromstring('\n') for corpus_file in self.files: element_tree = ET.parse(corpus_file.path) text_node = element_tree.find('text') text_node.set('address', corpus_file.address or 'NULL') text_node.set('author', corpus_file.author) text_node.set('booktitle', corpus_file.booktitle or 'NULL') text_node.set('chapter', corpus_file.chapter or 'NULL') text_node.set('editor', corpus_file.editor or 'NULL') text_node.set('institution', corpus_file.institution or 'NULL') text_node.set('journal', corpus_file.journal or 'NULL') text_node.set('pages', corpus_file.pages or 'NULL') text_node.set('publisher', corpus_file.publisher or 'NULL') text_node.set('publishing_year', str(corpus_file.publishing_year)) text_node.set('school', corpus_file.school or 'NULL') text_node.set('title', corpus_file.title) corpus_element.insert(1, text_node) ET.ElementTree(corpus_element).write(output_file, encoding='utf-8') self.last_edited_date = datetime.utcnow() self.status = 'submitted' def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def __repr__(self): ''' String representation of the corpus. For human readability. ''' return ''.format(self.title) class QueryResult(db.Model): ''' Class to define a corpus analysis result. ''' __tablename__ = 'query_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields description = db.Column(db.String(255)) filename = db.Column(db.String(255)) query_metadata = db.Column(db.JSON()) title = db.Column(db.String(32)) @property def download_url(self): return url_for('corpora.download_query_result', query_result_id=self.id) @property def jsonpatch_path(self): return '/query_results/{}'.format(self.id) @property def path(self): return os.path.join( self.creator.path, 'query_results', str(self.id), self.filename) @property def url(self): return url_for('corpora.query_result', query_result_id=self.id) def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def to_dict(self, include_relationships=True): return {'download_url': self.download_url, 'url': self.url, 'id': self.id, 'user_id': self.user_id, 'corpus_title': self.query_metadata['corpus_name'], 'description': self.description, 'filename': self.filename, 'query': self.query_metadata['query'], 'query_metadata': self.query_metadata, 'title': self.title} def __repr__(self): ''' String representation of the QueryResult. For human readability. ''' return ''.format(self.title) @login.user_loader def load_user(user_id): return User.query.get(int(user_id))