from datetime import datetime, timedelta from enum import IntEnum from flask import current_app, url_for from flask_hashids import HashidMixin from flask_login import UserMixin from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer from time import sleep from tqdm import tqdm from werkzeug.security import generate_password_hash, check_password_hash from . import db, login import base64 import json import os import requests import shutil import xml.etree.ElementTree as ET import yaml class CorpusStatus(IntEnum): UNPREPARED = 1 SUBMITTED = 2 QUEUED = 3 BUILDING = 4 BUILT = 5 FAILED = 6 STARTING_ANALYSIS_SESSION = 7 RUNNING_ANALYSIS_SESSION = 8 CANCELING_ANALYSIS_SESSION = 9 class JobStatus(IntEnum): INITIALIZING = 1 SUBMITTED = 2 QUEUED = 3 RUNNING = 4 CANCELING = 5 CANCELED = 6 COMPLETED = 7 FAILED = 8 class JobStatusMailNotificationLevel(IntEnum): NONE = 1 END = 2 ALL = 3 class Permission(IntEnum): ''' Defines User permissions as integers by the power of 2. User permission can be evaluated using the bitwise operator &. ''' ADMINISTRATE = 4 CONTRIBUTE = 2 USE_API = 1 class FileMixin: creation_date = db.Column(db.DateTime, default=datetime.utcnow) filename = db.Column(db.String(255)) last_edited_date = db.Column(db.DateTime, default=datetime.utcnow) mimetype = db.Column(db.String(255)) def file_mixin_to_dict(self, backrefs=False, relationships=False): return { 'creation_date': self.creation_date.isoformat() + 'Z', 'filename': self.filename, 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'mimetype': self.mimetype } class Role(HashidMixin, db.Model): __tablename__ = 'roles' # Primary key id = db.Column(db.Integer, primary_key=True) # Fields default = db.Column(db.Boolean, default=False, index=True) name = db.Column(db.String(64), unique=True) permissions = db.Column(db.Integer) # Relationships users = db.relationship('User', backref='role', lazy='dynamic') def __init__(self, **kwargs): super().__init__(**kwargs) if self.permissions is None: self.permissions = 0 def __repr__(self): return f'' def add_permission(self, permission): if not self.has_permission(permission): self.permissions += permission def has_permission(self, permission): return self.permissions & permission == permission def remove_permission(self, permission): if self.has_permission(permission): self.permissions -= permission def reset_permissions(self): self.permissions = 0 def to_dict(self, backrefs=False, relationships=False): dict_role = { 'id': self.hashid, 'default': self.default, 'name': self.name, 'permissions': self.permissions } if relationships: dict_role['users']: { x.to_dict(backrefs=False, relationships=True) for x in self.users } return dict_role @staticmethod def insert_defaults(): roles = { 'User': [], 'API user': [Permission.USE_API], 'Contributor': [Permission.CONTRIBUTE], 'Administrator': [ Permission.ADMINISTRATE, Permission.CONTRIBUTE, Permission.USE_API ] } default_role_name = 'User' for role_name, permissions in roles.items(): role = Role.query.filter_by(name=role_name).first() if role is None: role = Role(name=role_name) role.reset_permissions() for permission in permissions: role.add_permission(permission) role.default = role.name == default_role_name db.session.add(role) db.session.commit() class User(HashidMixin, UserMixin, db.Model): __tablename__ = 'users' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) # Fields confirmed = db.Column(db.Boolean, default=False) email = db.Column(db.String(254), unique=True, index=True) last_seen = db.Column(db.DateTime(), default=datetime.utcnow) member_since = db.Column(db.DateTime(), default=datetime.utcnow) password_hash = db.Column(db.String(128)) token = db.Column(db.String(32), index=True, unique=True) token_expiration = db.Column(db.DateTime) username = db.Column(db.String(64), unique=True, index=True) setting_dark_mode = db.Column(db.Boolean, default=False) setting_job_status_mail_notification_level_enum_value = db.Column( 'setting_job_status_mail_notification_level', db.Integer, default=2 ) # Backrefs: role: Role # Relationships tesseract_ocr_models = db.relationship( 'TesseractOCRModel', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) corpora = db.relationship( 'Corpus', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) jobs = db.relationship( 'Job', backref='user', cascade='all, delete-orphan', lazy='dynamic' ) def __init__(self, **kwargs): super().__init__(**kwargs) if self.role is not None: return if self.email == current_app.config['NOPAQUE_ADMIN']: self.role = Role.query.filter_by(name='Administrator').first() else: self.role = Role.query.filter_by(default=True).first() def __repr__(self): return f'' @property def jsonpatch_path(self): return f'/users/{self.hashid}' @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) @property def path(self): return os.path.join( current_app.config.get('NOPAQUE_DATA_DIR'), 'users', str(self.id)) @property def setting_job_status_mail_notification_level(self): return JobStatusMailNotificationLevel( self.setting_job_status_mail_notification_level_enum_value ) @setting_job_status_mail_notification_level.setter def setting_job_status_mail_notification_level(self, enum_member): if not isinstance(enum_member, JobStatusMailNotificationLevel): return TypeError() self.setting_job_status_mail_notification_level_enum_value = \ enum_member.value def can(self, permission): return self.role.has_permission(permission) def confirm(self, token): s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False if data.get('confirm') != self.hashid: return False self.confirmed = True db.session.add(self) return True def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def generate_confirmation_token(self, expiration=3600): s = TimedJSONWebSignatureSerializer( current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.hashid}).decode('utf-8') def generate_reset_token(self, expiration=3600): s = TimedJSONWebSignatureSerializer( current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.hashid}).decode('utf-8') def get_token(self, expires_in=3600): now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode('utf-8') self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def is_administrator(self): return self.can(Permission.ADMINISTRATE) def makedirs(self): os.mkdir(self.path) os.mkdir(os.path.join(self.path, 'tesseract_ocr_models')) os.mkdir(os.path.join(self.path, 'corpora')) os.mkdir(os.path.join(self.path, 'jobs')) def revoke_token(self): self.token_expiration = datetime.utcnow() - timedelta(seconds=1) def to_dict(self, backrefs=False, relationships=False): dict_user = { 'id': self.hashid, 'role_id': self.role.hashid, 'confirmed': self.confirmed, 'email': self.email, 'last_seen': self.last_seen.isoformat() + 'Z', 'member_since': self.member_since.isoformat() + 'Z', 'username': self.username, 'settings': { 'dark_mode': self.setting_dark_mode, 'job_status_mail_notification_level': self.setting_job_status_mail_notification_level.name } } if backrefs: dict_user['role'] = self.role.to_dict( backrefs=True, relationships=False) if relationships: dict_user['corpora'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.corpora } dict_user['jobs'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.jobs } dict_user['tesseract_ocr_models'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.tesseract_ocr_models } return dict_user def verify_password(self, password): return check_password_hash(self.password_hash, password) @staticmethod def check_token(token): user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user @staticmethod def insert_defaults(): if User.query.filter_by(username='nopaque').first() is not None: return user = User(username='nopaque') db.session.add(user) db.session.flush(objects=[user]) db.session.refresh(user) try: user.makedirs() except OSError as e: current_app.logger.error(e) db.session.rollback() db.session.commit() @staticmethod def reset_password(token, new_password): s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token.encode('utf-8')) except BadSignature: return False user = User.query.get(data.get('reset')) if user is None: return False user.password = new_password db.session.add(user) return True class TesseractOCRModel(FileMixin, HashidMixin, db.Model): __tablename__ = 'tesseract_ocr_models' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields compatible_service_versions = db.Column(db.String(255)) description = db.Column(db.String(255)) publisher = db.Column(db.String(128)) publishing_year = db.Column(db.Integer) title = db.Column(db.String(64)) version = db.Column(db.String(16)) # Backrefs: user: User @property def path(self): return os.path.join( self.user.path, 'tesseract_ocr_models', str(self.id) ) def to_dict(self, backrefs=False, relationships=False): compatible_service_versions = json.loads(self.compatible_service_versions) # noqa dict_tesseract_ocr_model = { 'id': self.hashid, 'user_id': self.user.hashid, 'compatible_service_versions': compatible_service_versions, 'description': self.description, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'title': self.title, **self.file_mixin_to_dict() } if backrefs: dict_tesseract_ocr_model['user'] = self.user.to_dict( backrefs=True, relationships=False) if relationships: pass return dict_tesseract_ocr_model @staticmethod def insert_defaults(): user = User.query.filter_by(username='nopaque').first() defaults_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'TesseractOCRModel.defaults.yml' ) with open(defaults_file, 'r') as f: defaults = yaml.safe_load(f) for m in defaults: if TesseractOCRModel.query.filter_by(title=m['title'], version=m['version']).first() is not None: # noqa continue tesseract_ocr_model = TesseractOCRModel( compatible_service_versions=json.dumps(m['compatible_service_versions']), # noqa description=m['description'], publisher=m['publisher'], publishing_year=m['publishing_year'], title=m['title'], user=user, version=m['version'] ) db.session.add(tesseract_ocr_model) db.session.flush(objects=[tesseract_ocr_model]) db.session.refresh(tesseract_ocr_model) tesseract_ocr_model.filename = f'{tesseract_ocr_model.id}.traineddata' # noqa r = requests.get(m['url'], stream=True) pbar = tqdm( desc=f'{tesseract_ocr_model.title} ({tesseract_ocr_model.filename})', # noqa unit="B", unit_scale=True, unit_divisor=1024, total=int(r.headers['Content-Length']) ) pbar.clear() with open(tesseract_ocr_model.path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks pbar.update(len(chunk)) f.write(chunk) pbar.close() db.session.commit() class JobInput(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_inputs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Backrefs: job: Job def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_input', job_id=self.job.id, job_input_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/inputs/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'inputs', str(self.id)) def to_dict(self, backrefs=False, relationships=False): dict_job_input = { 'id': self.hashid, 'job_id': self.job.hashid, 'download_url': self.download_url, 'url': self.url, **self.file_mixin_to_dict() } if backrefs: dict_job_input['job'] = self.job.to_dict( backrefs=True, relationships=False) return dict_job_input @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-input-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id class JobResult(FileMixin, HashidMixin, db.Model): __tablename__ = 'job_results' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys job_id = db.Column(db.Integer, db.ForeignKey('jobs.id')) # Fields description = db.Column(db.String(255)) # Backrefs: job: Job def __repr__(self): return f'' @property def download_url(self): return url_for( 'jobs.download_job_result', job_id=self.job_id, job_result_id=self.id ) @property def jsonpatch_path(self): return f'{self.job.jsonpatch_path}/results/{self.hashid}' @property def path(self): return os.path.join(self.job.path, 'results', str(self.id)) def to_dict(self, backrefs=False, relationships=False): dict_job_result = { 'id': self.hashid, 'job_id': self.job.hashid, 'description': self.description, 'download_url': self.download_url, 'url': self.url, **self.file_mixin_to_dict( backrefs=backrefs, relationships=relationships) } if backrefs: dict_job_result['job'] = self.job.to_dict( backrefs=True, relationships=False) return dict_job_result @property def url(self): return url_for( 'jobs.job', job_id=self.job_id, _anchor=f'job-{self.job.hashid}-result-{self.hashid}' ) @property def user_hashid(self): return self.job.user.hashid @property def user_id(self): return self.job.user_id class Job(HashidMixin, db.Model): ''' Class to define Jobs. ''' __tablename__ = 'jobs' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) end_date = db.Column(db.DateTime()) service = db.Column(db.String(64)) ''' ' Dictionary as JSON formatted string. ' Example: {"binarization": True} ''' service_args = db.Column(db.String(255)) service_version = db.Column(db.String(16)) status_enum_value = db.Column('status', db.Integer, default=1) title = db.Column(db.String(32)) # Backrefs: user: User # Relationships inputs = db.relationship( 'JobInput', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) results = db.relationship( 'JobResult', backref='job', cascade='all, delete-orphan', lazy='dynamic' ) def __repr__(self): return f'' @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/jobs/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'jobs', str(self.id)) @property def status(self): return JobStatus(self.status_enum_value) @status.setter def status(self, enum_member): if not isinstance(enum_member, JobStatus): return TypeError() self.status_enum_value = enum_member.value @property def url(self): return url_for('jobs.job', job_id=self.id) @property def user_hashid(self): return self.user.hashid def delete(self): ''' Delete the job and its inputs and results from the database. ''' if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa self.status = JobStatus.CANCELING db.session.commit() while self.status != JobStatus.CANCELED: # In case the daemon handled a job in any way if self.status != JobStatus.CANCELING: self.status = JobStatus.CANCELING db.session.commit() sleep(1) db.session.refresh(self) shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def makedirs(self): os.mkdir(self.path) os.mkdir(os.path.join(self.path, 'inputs')) os.mkdir(os.path.join(self.path, 'pipeline_data')) os.mkdir(os.path.join(self.path, 'results')) def restart(self): ''' Restart a job - only if the status is complete or failed ''' if self.status not in [JobStatus.COMPLETED, JobStatus.FAILED]: # noqa raise Exception('Could not restart job: status is not "completed/failed"') # noqa shutil.rmtree(os.path.join(self.path, 'results'), ignore_errors=True) shutil.rmtree(os.path.join(self.path, 'pyflow.data'), ignore_errors=True) # noqa for result in self.results: db.session.delete(result) self.end_date = None self.status = JobStatus.SUBMITTED def to_dict(self, backrefs=False, relationships=False): service_args = json.loads(self.service_args) if self.service == 'tesseract-ocr' and 'model' in service_args: tesseract_ocr_pipeline_model = TesseractOCRModel.query.get(service_args['model']) # noqa service_args['model'] = tesseract_ocr_pipeline_model.title dict_job = { 'id': self.hashid, 'user_id': self.user.hashid, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'end_date': None if self.end_date is None else f'{self.end_date.isoformat()}Z', # noqa 'service': self.service, 'service_args': service_args, 'service_version': self.service_version, 'status': self.status.name, 'title': self.title, 'url': self.url } if backrefs: dict_job['user'] = self.user.to_dict( backrefs=True, relationships=False) if relationships: dict_job['inputs'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.inputs } dict_job['results'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.results } return dict_job class CorpusFile(FileMixin, HashidMixin, db.Model): __tablename__ = 'corpus_files' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys corpus_id = db.Column(db.Integer, db.ForeignKey('corpora.id')) # Fields address = db.Column(db.String(255)) author = db.Column(db.String(255)) booktitle = db.Column(db.String(255)) chapter = db.Column(db.String(255)) editor = db.Column(db.String(255)) institution = db.Column(db.String(255)) journal = db.Column(db.String(255)) pages = db.Column(db.String(255)) publisher = db.Column(db.String(255)) publishing_year = db.Column(db.Integer) school = db.Column(db.String(255)) title = db.Column(db.String(255)) # Backrefs: corpus: Corpus @property def download_url(self): return url_for( 'corpora.download_corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def jsonpatch_path(self): return f'{self.corpus.jsonpatch_path}/files/{self.hashid}' @property def path(self): return os.path.join(self.corpus.path, 'files', str(self.id)) @property def url(self): return url_for( 'corpora.corpus_file', corpus_id=self.corpus_id, corpus_file_id=self.id ) @property def user_hashid(self): return self.corpus.user.hashid @property def user_id(self): return self.corpus.user_id def delete(self): try: os.remove(self.path) except OSError: current_app.logger.error( f'Removing {self.path} led to an OSError!' ) pass db.session.delete(self) self.corpus.status = CorpusStatus.UNPREPARED def to_dict(self, backrefs=False, relationships=False): dict_corpus_file = { 'id': self.hashid, 'corpus_id': self.corpus.hashid, 'download_url': self.download_url, 'url': self.url, 'address': self.address, 'author': self.author, 'booktitle': self.booktitle, 'chapter': self.chapter, 'editor': self.editor, 'institution': self.institution, 'journal': self.journal, 'pages': self.pages, 'publisher': self.publisher, 'publishing_year': self.publishing_year, 'school': self.school, 'title': self.title, **self.file_mixin_to_dict( backrefs=backrefs, relationships=relationships) } if backrefs: dict_corpus_file['corpus'] = self.corpus.to_dict( backrefs=True, relationships=False) return dict_corpus_file class Corpus(HashidMixin, db.Model): ''' Class to define a corpus. ''' __tablename__ = 'corpora' # Primary key id = db.Column(db.Integer, primary_key=True) # Foreign keys user_id = db.Column(db.Integer, db.ForeignKey('users.id')) # Fields creation_date = db.Column(db.DateTime(), default=datetime.utcnow) description = db.Column(db.String(255)) last_edited_date = db.Column(db.DateTime(), default=datetime.utcnow) status_enum_value = db.Column('status', db.Integer, default=1) title = db.Column(db.String(32)) num_analysis_sessions = db.Column(db.Integer, default=0) num_tokens = db.Column(db.Integer, default=0) archive_file = db.Column(db.String(255)) # Backrefs: user: User # Relationships files = db.relationship( 'CorpusFile', backref='corpus', lazy='dynamic', cascade='all, delete-orphan' ) # "static" attributes max_num_tokens = 2147483647 def __repr__(self): return f'' @property def analysis_url(self): return url_for('corpora.analyse_corpus', corpus_id=self.id) @property def jsonpatch_path(self): return f'{self.user.jsonpatch_path}/corpora/{self.hashid}' @property def path(self): return os.path.join(self.user.path, 'corpora', str(self.id)) @property def status(self): return CorpusStatus(self.status_enum_value) @status.setter def status(self, enum_member): if not isinstance(enum_member, CorpusStatus): return TypeError() self.status_enum_value = enum_member.value @property def url(self): return url_for('corpora.corpus', corpus_id=self.id) @property def user_hashid(self): return self.user.hashid def build(self): corpus_element = ET.fromstring('\n') for corpus_file in self.files: element_tree = ET.parse(corpus_file.path) text_element = element_tree.getroot() text_element.set('address', corpus_file.address or 'NULL') text_element.set('author', corpus_file.author) text_element.set('booktitle', corpus_file.booktitle or 'NULL') text_element.set('chapter', corpus_file.chapter or 'NULL') text_element.set('editor', corpus_file.editor or 'NULL') text_element.set('institution', corpus_file.institution or 'NULL') text_element.set('journal', corpus_file.journal or 'NULL') text_element.set('pages', corpus_file.pages or 'NULL') text_element.set('publisher', corpus_file.publisher or 'NULL') text_element.set('publishing_year', str(corpus_file.publishing_year)) # noqa text_element.set('school', corpus_file.school or 'NULL') text_element.set('title', corpus_file.title) corpus_element.insert(1, text_element) ET.ElementTree(corpus_element).write( os.path.join(self.path, 'cwb', 'corpus.vrt'), encoding='utf-8' ) self.last_edited_date = datetime.utcnow() self.status = CorpusStatus.SUBMITTED def delete(self): shutil.rmtree(self.path, ignore_errors=True) db.session.delete(self) def makedirs(self): os.mkdir(self.path) os.mkdir(os.path.join(self.path, 'files')) os.mkdir(os.path.join(self.path, 'cwb')) os.mkdir(os.path.join(self.path, 'cwb', 'data')) os.mkdir(os.path.join(self.path, 'cwb', 'registry')) def to_dict(self, backrefs=False, relationships=False): dict_corpus = { 'id': self.hashid, 'user_id': self.user.hashid, 'analysis_url': self.analysis_url, 'url': self.url, 'creation_date': self.creation_date.isoformat() + 'Z', 'description': self.description, 'max_num_tokens': self.max_num_tokens, 'num_analysis_sessions': self.num_analysis_sessions, 'num_tokens': self.num_tokens, 'status': self.status.name, 'last_edited_date': self.last_edited_date.isoformat() + 'Z', 'title': self.title } if backrefs: dict_corpus['user'] = self.user.to_dict( backrefs=True, relationships=False) if relationships: dict_corpus['files'] = { x.hashid: x.to_dict(backrefs=False, relationships=True) for x in self.files } return dict_corpus @login.user_loader def load_user(user_id): return User.query.get(int(user_id))