modernize type hinting

This commit is contained in:
Patrick Jentsch 2024-09-25 17:46:53 +02:00
parent 02e6c7c16c
commit 492fdc9d28
13 changed files with 116 additions and 88 deletions

View File

@ -16,6 +16,7 @@ from flask_hashids import Hashids
docker_client = DockerClient.from_env() docker_client = DockerClient.from_env()
apifairy = APIFairy() apifairy = APIFairy()
assets = Environment() assets = Environment()
db = SQLAlchemy() db = SQLAlchemy()
@ -43,6 +44,11 @@ def create_app(config: Config = Config) -> Flask:
_register_socketio_namespaces(app) _register_socketio_namespaces(app)
_register_db_event_listeners(app) _register_db_event_listeners(app)
@app.before_request
def log_headers():
from flask import request
print(request.__dict__)
return app return app
@ -64,16 +70,24 @@ def _configure_logging(app: Flask):
def _configure_middlewares(app: Flask): def _configure_middlewares(app: Flask):
from werkzeug.middleware.proxy_fix import ProxyFix proxy_fix_enabled: bool = app.config['NOPAQUE_PROXY_FIX_ENABLED']
if proxy_fix_enabled:
from werkzeug.middleware.proxy_fix import ProxyFix
proxy_fix_x_for: int = app.config['NOPAQUE_PROXY_FIX_X_FOR']
proxy_fix_x_host: int = app.config['NOPAQUE_PROXY_FIX_X_HOST']
proxy_fix_x_port: int = app.config['NOPAQUE_PROXY_FIX_X_PORT']
proxy_fix_x_prefix: int = app.config['NOPAQUE_PROXY_FIX_X_PREFIX']
proxy_fix_x_proto: int = app.config['NOPAQUE_PROXY_FIX_X_PROTO']
if app.config['NOPAQUE_PROXY_FIX_ENABLED']:
app.wsgi_app = ProxyFix( app.wsgi_app = ProxyFix(
app.wsgi_app, app.wsgi_app,
x_for=app.config['NOPAQUE_PROXY_FIX_X_FOR'], x_for=proxy_fix_x_for,
x_host=app.config['NOPAQUE_PROXY_FIX_X_HOST'], x_host=proxy_fix_x_host,
x_port=app.config['NOPAQUE_PROXY_FIX_X_PORT'], x_port=proxy_fix_x_port,
x_prefix=app.config['NOPAQUE_PROXY_FIX_X_PREFIX'], x_prefix=proxy_fix_x_prefix,
x_proto=app.config['NOPAQUE_PROXY_FIX_X_PROTO'] x_proto=proxy_fix_x_proto
) )
@ -119,32 +133,56 @@ def _init_extensions(app: Flask):
def _register_blueprints(app: Flask): def _register_blueprints(app: Flask):
from .admin import bp as admin_blueprint from .admin import bp as admin_blueprint
from .api import bp as api_blueprint
from .auth import bp as auth_blueprint
from .contributions import bp as contributions_blueprint
from .corpora import bp as corpora_blueprint
from .errors import bp as errors_bp
from .jobs import bp as jobs_blueprint
from .main import bp as main_blueprint
from .services import bp as services_blueprint
from .settings import bp as settings_blueprint
from .users import bp as users_blueprint
from .workshops import bp as workshops_blueprint
app.register_blueprint(admin_blueprint, url_prefix='/admin') app.register_blueprint(admin_blueprint, url_prefix='/admin')
from .api import bp as api_blueprint
app.register_blueprint(api_blueprint, url_prefix='/api') app.register_blueprint(api_blueprint, url_prefix='/api')
from .auth import bp as auth_blueprint
app.register_blueprint(auth_blueprint) app.register_blueprint(auth_blueprint)
from .contributions import bp as contributions_blueprint
app.register_blueprint(contributions_blueprint, url_prefix='/contributions') app.register_blueprint(contributions_blueprint, url_prefix='/contributions')
from .corpora import bp as corpora_blueprint
app.register_blueprint(corpora_blueprint, cli_group='corpus', url_prefix='/corpora') app.register_blueprint(corpora_blueprint, cli_group='corpus', url_prefix='/corpora')
from .errors import bp as errors_bp
app.register_blueprint(errors_bp) app.register_blueprint(errors_bp)
from .jobs import bp as jobs_blueprint
app.register_blueprint(jobs_blueprint, url_prefix='/jobs') app.register_blueprint(jobs_blueprint, url_prefix='/jobs')
from .main import bp as main_blueprint
app.register_blueprint(main_blueprint, cli_group=None) app.register_blueprint(main_blueprint, cli_group=None)
from .services import bp as services_blueprint
app.register_blueprint(services_blueprint, url_prefix='/services') app.register_blueprint(services_blueprint, url_prefix='/services')
from .settings import bp as settings_blueprint
app.register_blueprint(settings_blueprint, url_prefix='/settings') app.register_blueprint(settings_blueprint, url_prefix='/settings')
from .users import bp as users_blueprint
app.register_blueprint(users_blueprint, cli_group='user', url_prefix='/users') app.register_blueprint(users_blueprint, cli_group='user', url_prefix='/users')
from .workshops import bp as workshops_blueprint
app.register_blueprint(workshops_blueprint, url_prefix='/workshops') app.register_blueprint(workshops_blueprint, url_prefix='/workshops')
# def _add_admin_views():
# from flask_admin.contrib.sqla import ModelView
# from . import models
# for v in models.__dict__.values():
# # Check if v is a class
# if not isinstance(v, type):
# continue
# # Check if v is a subclass of db.Model
# if not issubclass(v, db.Model):
# continue
# admin.add_view(ModelView(v, db.session, category='Database'))
def _register_socketio_namespaces(app: Flask): def _register_socketio_namespaces(app: Flask):
from .corpora.cqi_over_sio import CQiOverSocketIO from .corpora.cqi_over_sio import CQiOverSocketIO

View File

@ -1,7 +1,6 @@
from datetime import datetime from datetime import datetime
from flask import current_app from flask import current_app
from pathlib import Path from pathlib import Path
from typing import Dict, List
import json import json
import shutil import shutil
from app import db from app import db
@ -15,7 +14,7 @@ class SandpaperConverter:
def run(self): def run(self):
with self.json_db_file.open('r') as f: with self.json_db_file.open('r') as f:
json_db: List[Dict] = json.load(f) json_db: list[dict] = json.load(f)
for json_user in json_db: for json_user in json_db:
if not json_user['confirmed']: if not json_user['confirmed']:
@ -26,7 +25,7 @@ class SandpaperConverter:
db.session.commit() db.session.commit()
def convert_user(self, json_user: Dict, user_dir: Path): def convert_user(self, json_user: dict, user_dir: Path):
current_app.logger.info(f'Create User {json_user["username"]}...') current_app.logger.info(f'Create User {json_user["username"]}...')
try: try:
user = User.create( user = User.create(
@ -48,7 +47,7 @@ class SandpaperConverter:
current_app.logger.info('Done') current_app.logger.info('Done')
def convert_corpus(self, json_corpus: Dict, user: User, corpus_dir: Path): def convert_corpus(self, json_corpus: dict, user: User, corpus_dir: Path):
current_app.logger.info(f'Create Corpus {json_corpus["title"]}...') current_app.logger.info(f'Create Corpus {json_corpus["title"]}...')
try: try:
corpus = Corpus.create( corpus = Corpus.create(
@ -64,7 +63,7 @@ class SandpaperConverter:
current_app.logger.info('Done') current_app.logger.info('Done')
def convert_corpus_file(self, json_corpus_file: Dict, corpus: Corpus, corpus_dir: Path): def convert_corpus_file(self, json_corpus_file: dict, corpus: Corpus, corpus_dir: Path):
current_app.logger.info(f'Create CorpusFile {json_corpus_file["title"]}...') current_app.logger.info(f'Create CorpusFile {json_corpus_file["title"]}...')
corpus_file = CorpusFile( corpus_file = CorpusFile(
corpus=corpus, corpus=corpus,

View File

@ -7,7 +7,7 @@ from flask_login import current_user
from flask_socketio import Namespace from flask_socketio import Namespace
from inspect import signature from inspect import signature
from threading import Lock from threading import Lock
from typing import Callable, Dict, List, Optional from typing import Callable
from app import db, docker_client, hashids, socketio from app import db, docker_client, hashids, socketio
from app.decorators import socketio_login_required from app.decorators import socketio_login_required
from app.models import Corpus, CorpusStatus from app.models import Corpus, CorpusStatus
@ -37,7 +37,8 @@ Basic concept:
4.2 The CQiClient and (Mutex) Lock belonging to it are teared down. 4.2 The CQiClient and (Mutex) Lock belonging to it are teared down.
''' '''
CQI_API_FUNCTION_NAMES: List[str] = [
CQI_API_FUNCTION_NAMES: list[str] = [
'ask_feature_cl_2_3', 'ask_feature_cl_2_3',
'ask_feature_cqi_1_0', 'ask_feature_cqi_1_0',
'ask_feature_cqp_2_3', 'ask_feature_cqp_2_3',
@ -93,7 +94,7 @@ class CQiOverSocketIO(Namespace):
@socketio_login_required @socketio_login_required
def on_init(self, db_corpus_hashid: str): def on_init(self, db_corpus_hashid: str):
db_corpus_id: int = hashids.decode(db_corpus_hashid) db_corpus_id: int = hashids.decode(db_corpus_hashid)
db_corpus: Optional[Corpus] = Corpus.query.get(db_corpus_id) db_corpus: Corpus | None = Corpus.query.get(db_corpus_id)
if db_corpus is None: if db_corpus is None:
return {'code': 404, 'msg': 'Not Found'} return {'code': 404, 'msg': 'Not Found'}
if not (db_corpus.user == current_user if not (db_corpus.user == current_user
@ -134,7 +135,7 @@ class CQiOverSocketIO(Namespace):
return {'code': 200, 'msg': 'OK'} return {'code': 200, 'msg': 'OK'}
@socketio_login_required @socketio_login_required
def on_exec(self, fn_name: str, fn_args: Dict = {}): def on_exec(self, fn_name: str, fn_args: dict = {}):
try: try:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock'] cqi_client_lock: Lock = session['cqi_over_sio']['cqi_client_lock']
@ -198,7 +199,7 @@ class CQiOverSocketIO(Namespace):
except (BrokenPipeError, CQiException): except (BrokenPipeError, CQiException):
pass pass
cqi_client_lock.release() cqi_client_lock.release()
db_corpus: Optional[Corpus] = Corpus.query.get(db_corpus_id) db_corpus: Corpus | None = Corpus.query.get(db_corpus_id)
if db_corpus is None: if db_corpus is None:
return return
db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1 db_corpus.num_analysis_sessions = Corpus.num_analysis_sessions - 1

View File

@ -8,7 +8,6 @@ from cqi.models.attributes import (
) )
from cqi.status import StatusOk as CQiStatusOk from cqi.status import StatusOk as CQiStatusOk
from flask import session from flask import session
from typing import Dict, List
import gzip import gzip
import json import json
import math import math
@ -17,7 +16,7 @@ from app.models import Corpus
from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus from .utils import lookups_by_cpos, partial_export_subcorpus, export_subcorpus
CQI_EXTENSION_FUNCTION_NAMES: List[str] = [ CQI_EXTENSION_FUNCTION_NAMES: list[str] = [
'ext_corpus_update_db', 'ext_corpus_update_db',
'ext_corpus_static_data', 'ext_corpus_static_data',
'ext_corpus_paginate_corpus', 'ext_corpus_paginate_corpus',
@ -37,7 +36,7 @@ def ext_corpus_update_db(corpus: str) -> CQiStatusOk:
return CQiStatusOk() return CQiStatusOk()
def ext_corpus_static_data(corpus: str) -> Dict: def ext_corpus_static_data(corpus: str) -> dict:
db_corpus_id: int = session['cqi_over_sio']['db_corpus_id'] db_corpus_id: int = session['cqi_over_sio']['db_corpus_id']
db_corpus: Corpus = Corpus.query.get(db_corpus_id) db_corpus: Corpus = Corpus.query.get(db_corpus_id)
@ -48,8 +47,8 @@ def ext_corpus_static_data(corpus: str) -> Dict:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus) cqi_corpus: CQiCorpus = cqi_client.corpora.get(corpus)
cqi_p_attrs: List[CQiPositionalAttribute] = cqi_corpus.positional_attributes.list() cqi_p_attrs: list[CQiPositionalAttribute] = cqi_corpus.positional_attributes.list()
cqi_s_attrs: List[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list() cqi_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list()
static_data = { static_data = {
'corpus': { 'corpus': {
@ -64,19 +63,19 @@ def ext_corpus_static_data(corpus: str) -> Dict:
for p_attr in cqi_p_attrs: for p_attr in cqi_p_attrs:
print(f'corpus.freqs.{p_attr.name}') print(f'corpus.freqs.{p_attr.name}')
static_data['corpus']['freqs'][p_attr.name] = [] static_data['corpus']['freqs'][p_attr.name] = []
p_attr_id_list: List[int] = list(range(p_attr.lexicon_size)) p_attr_id_list: list[int] = list(range(p_attr.lexicon_size))
static_data['corpus']['freqs'][p_attr.name].extend(p_attr.freqs_by_ids(p_attr_id_list)) static_data['corpus']['freqs'][p_attr.name].extend(p_attr.freqs_by_ids(p_attr_id_list))
del p_attr_id_list del p_attr_id_list
print(f'p_attrs.{p_attr.name}') print(f'p_attrs.{p_attr.name}')
static_data['p_attrs'][p_attr.name] = [] static_data['p_attrs'][p_attr.name] = []
cpos_list: List[int] = list(range(cqi_corpus.size)) cpos_list: list[int] = list(range(cqi_corpus.size))
static_data['p_attrs'][p_attr.name].extend(p_attr.ids_by_cpos(cpos_list)) static_data['p_attrs'][p_attr.name].extend(p_attr.ids_by_cpos(cpos_list))
del cpos_list del cpos_list
print(f'values.p_attrs.{p_attr.name}') print(f'values.p_attrs.{p_attr.name}')
static_data['values']['p_attrs'][p_attr.name] = [] static_data['values']['p_attrs'][p_attr.name] = []
p_attr_id_list: List[int] = list(range(p_attr.lexicon_size)) p_attr_id_list: list[int] = list(range(p_attr.lexicon_size))
static_data['values']['p_attrs'][p_attr.name].extend(p_attr.values_by_ids(p_attr_id_list)) static_data['values']['p_attrs'][p_attr.name].extend(p_attr.values_by_ids(p_attr_id_list))
del p_attr_id_list del p_attr_id_list
@ -127,23 +126,23 @@ def ext_corpus_static_data(corpus: str) -> Dict:
print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds') print(f's_attrs.{s_attr.name}.lexicon.{id}.bounds')
static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound] static_data['s_attrs'][s_attr.name]['lexicon'][id]['bounds'] = [lbound, rbound]
static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {} static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'] = {}
cpos_list: List[int] = list(range(lbound, rbound + 1)) cpos_list: list[int] = list(range(lbound, rbound + 1))
for p_attr in cqi_p_attrs: for p_attr in cqi_p_attrs:
p_attr_ids: List[int] = [] p_attr_ids: list[int] = []
p_attr_ids.extend(p_attr.ids_by_cpos(cpos_list)) p_attr_ids.extend(p_attr.ids_by_cpos(cpos_list))
print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}') print(f's_attrs.{s_attr.name}.lexicon.{id}.freqs.{p_attr.name}')
static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids)) static_data['s_attrs'][s_attr.name]['lexicon'][id]['freqs'][p_attr.name] = dict(Counter(p_attr_ids))
del p_attr_ids del p_attr_ids
del cpos_list del cpos_list
sub_s_attrs: List[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr}) sub_s_attrs: list[CQiStructuralAttribute] = cqi_corpus.structural_attributes.list(filters={'part_of': s_attr})
print(f's_attrs.{s_attr.name}.values') print(f's_attrs.{s_attr.name}.values')
static_data['s_attrs'][s_attr.name]['values'] = [ static_data['s_attrs'][s_attr.name]['values'] = [
sub_s_attr.name[(len(s_attr.name) + 1):] sub_s_attr.name[(len(s_attr.name) + 1):]
for sub_s_attr in sub_s_attrs for sub_s_attr in sub_s_attrs
] ]
s_attr_id_list: List[int] = list(range(s_attr.size)) s_attr_id_list: list[int] = list(range(s_attr.size))
sub_s_attr_values: List[str] = [] sub_s_attr_values: list[str] = []
for sub_s_attr in sub_s_attrs: for sub_s_attr in sub_s_attrs:
tmp = [] tmp = []
tmp.extend(sub_s_attr.values_by_ids(s_attr_id_list)) tmp.extend(sub_s_attr.values_by_ids(s_attr_id_list))
@ -173,7 +172,7 @@ def ext_corpus_paginate_corpus(
corpus: str, corpus: str,
page: int = 1, page: int = 1,
per_page: int = 20 per_page: int = 20
) -> Dict: ) -> dict:
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus) cqi_corpus = cqi_client.corpora.get(corpus)
# Sanity checks # Sanity checks
@ -219,7 +218,7 @@ def ext_cqp_paginate_subcorpus(
context: int = 50, context: int = 50,
page: int = 1, page: int = 1,
per_page: int = 20 per_page: int = 20
) -> Dict: ) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)
@ -266,7 +265,7 @@ def ext_cqp_partial_export_subcorpus(
subcorpus: str, subcorpus: str,
match_id_list: list, match_id_list: list,
context: int = 50 context: int = 50
) -> Dict: ) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)
@ -278,7 +277,7 @@ def ext_cqp_partial_export_subcorpus(
def ext_cqp_export_subcorpus( def ext_cqp_export_subcorpus(
subcorpus: str, subcorpus: str,
context: int = 50 context: int = 50
) -> Dict: ) -> dict:
corpus_name, subcorpus_name = subcorpus.split(':', 1) corpus_name, subcorpus_name = subcorpus.split(':', 1)
cqi_client: CQiClient = session['cqi_over_sio']['cqi_client'] cqi_client: CQiClient = session['cqi_over_sio']['cqi_client']
cqi_corpus = cqi_client.corpora.get(corpus_name) cqi_corpus = cqi_client.corpora.get(corpus_name)

View File

@ -1,13 +1,12 @@
from cqi.models.corpora import Corpus as CQiCorpus from cqi.models.corpora import Corpus as CQiCorpus
from cqi.models.subcorpora import Subcorpus as CQiSubcorpus from cqi.models.subcorpora import Subcorpus as CQiSubcorpus
from typing import Dict, List
def lookups_by_cpos(corpus: CQiCorpus, cpos_list: List[int]) -> Dict: def lookups_by_cpos(corpus: CQiCorpus, cpos_list: list[int]) -> dict:
lookups = {} lookups = {}
lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list} lookups['cpos_lookup'] = {cpos: {} for cpos in cpos_list}
for attr in corpus.positional_attributes.list(): for attr in corpus.positional_attributes.list():
cpos_attr_values: List[str] = attr.values_by_cpos(cpos_list) cpos_attr_values: list[str] = attr.values_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list): for i, cpos in enumerate(cpos_list):
lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i] lookups['cpos_lookup'][cpos][attr.name] = cpos_attr_values[i]
for attr in corpus.structural_attributes.list(): for attr in corpus.structural_attributes.list():
@ -15,7 +14,7 @@ def lookups_by_cpos(corpus: CQiCorpus, cpos_list: List[int]) -> Dict:
# attr.has_values == False # attr.has_values == False
if attr.has_values: if attr.has_values:
continue continue
cpos_attr_ids: List[int] = attr.ids_by_cpos(cpos_list) cpos_attr_ids: list[int] = attr.ids_by_cpos(cpos_list)
for i, cpos in enumerate(cpos_list): for i, cpos in enumerate(cpos_list):
if cpos_attr_ids[i] == -1: if cpos_attr_ids[i] == -1:
continue continue
@ -39,9 +38,9 @@ def lookups_by_cpos(corpus: CQiCorpus, cpos_list: List[int]) -> Dict:
def partial_export_subcorpus( def partial_export_subcorpus(
subcorpus: CQiSubcorpus, subcorpus: CQiSubcorpus,
match_id_list: List[int], match_id_list: list[int],
context: int = 25 context: int = 25
) -> Dict: ) -> dict:
if subcorpus.size == 0: if subcorpus.size == 0:
return {"matches": []} return {"matches": []}
match_boundaries = [] match_boundaries = []
@ -91,7 +90,7 @@ def export_subcorpus(
context: int = 25, context: int = 25,
cutoff: float = float('inf'), cutoff: float = float('inf'),
offset: int = 0 offset: int = 0
) -> Dict: ) -> dict:
if subcorpus.size == 0: if subcorpus.size == 0:
return {"matches": []} return {"matches": []}
first_match = max(0, offset) first_match = max(0, offset)

View File

@ -1,7 +1,7 @@
from flask import abort, request from flask import abort, request
from flask_login import current_user from flask_login import current_user
from functools import wraps from functools import wraps
from typing import List, Union from typing import Optional
from werkzeug.exceptions import NotAcceptable from werkzeug.exceptions import NotAcceptable
from app.models import Permission from app.models import Permission
@ -46,8 +46,8 @@ def socketio_admin_required(f):
def content_negotiation( def content_negotiation(
produces: Union[str, List[str], None] = None, produces: Optional[str | list[str]] = None,
consumes: Union[str, List[str], None] = None consumes: Optional[str | list[str]] = None
): ):
def decorator(f): def decorator(f):
@wraps(f) @wraps(f)

View File

@ -1,7 +1,6 @@
from flask import current_app from flask import current_app
from flask_migrate import upgrade from flask_migrate import upgrade
from pathlib import Path from pathlib import Path
from typing import List
from app import db from app import db
from app.models import ( from app.models import (
Corpus, Corpus,
@ -20,7 +19,7 @@ def deploy():
print('Make default directories') print('Make default directories')
base_dir = current_app.config['NOPAQUE_DATA_DIR'] base_dir = current_app.config['NOPAQUE_DATA_DIR']
default_dirs: List[Path] = [ default_dirs: list[Path] = [
base_dir / 'tmp', base_dir / 'tmp',
base_dir / 'users' base_dir / 'users'
] ]

View File

@ -3,7 +3,6 @@ from enum import IntEnum
from flask import current_app, url_for from flask import current_app, url_for
from flask_hashids import HashidMixin from flask_hashids import HashidMixin
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from typing import Union
from pathlib import Path from pathlib import Path
import shutil import shutil
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -25,7 +24,7 @@ class CorpusStatus(IntEnum):
CANCELING_ANALYSIS_SESSION = 9 CANCELING_ANALYSIS_SESSION = 9
@staticmethod @staticmethod
def get(corpus_status: Union['CorpusStatus', int, str]) -> 'CorpusStatus': def get(corpus_status: 'CorpusStatus | int | str') -> 'CorpusStatus':
if isinstance(corpus_status, CorpusStatus): if isinstance(corpus_status, CorpusStatus):
return corpus_status return corpus_status
if isinstance(corpus_status, int): if isinstance(corpus_status, int):

View File

@ -1,6 +1,5 @@
from flask_hashids import HashidMixin from flask_hashids import HashidMixin
from enum import IntEnum from enum import IntEnum
from typing import Union
from app import db from app import db
@ -11,7 +10,7 @@ class CorpusFollowerPermission(IntEnum):
MANAGE_CORPUS = 8 MANAGE_CORPUS = 8
@staticmethod @staticmethod
def get(corpus_follower_permission: Union['CorpusFollowerPermission', int, str]) -> 'CorpusFollowerPermission': def get(corpus_follower_permission: 'CorpusFollowerPermission | int | str') -> 'CorpusFollowerPermission':
if isinstance(corpus_follower_permission, CorpusFollowerPermission): if isinstance(corpus_follower_permission, CorpusFollowerPermission):
return corpus_follower_permission return corpus_follower_permission
if isinstance(corpus_follower_permission, int): if isinstance(corpus_follower_permission, int):
@ -38,16 +37,16 @@ class CorpusFollowerRole(HashidMixin, db.Model):
def __repr__(self): def __repr__(self):
return f'<CorpusFollowerRole {self.name}>' return f'<CorpusFollowerRole {self.name}>'
def has_permission(self, permission: Union[CorpusFollowerPermission, int, str]): def has_permission(self, permission: CorpusFollowerPermission | int | str):
perm = CorpusFollowerPermission.get(permission) perm = CorpusFollowerPermission.get(permission)
return self.permissions & perm.value == perm.value return self.permissions & perm.value == perm.value
def add_permission(self, permission: Union[CorpusFollowerPermission, int, str]): def add_permission(self, permission: CorpusFollowerPermission | int | str):
perm = CorpusFollowerPermission.get(permission) perm = CorpusFollowerPermission.get(permission)
if not self.has_permission(perm): if not self.has_permission(perm):
self.permissions += perm.value self.permissions += perm.value
def remove_permission(self, permission: Union[CorpusFollowerPermission, int, str]): def remove_permission(self, permission: CorpusFollowerPermission | int | str):
perm = CorpusFollowerPermission.get(permission) perm = CorpusFollowerPermission.get(permission)
if self.has_permission(perm): if self.has_permission(perm):
self.permissions -= perm.value self.permissions -= perm.value

View File

@ -3,7 +3,6 @@ from enum import IntEnum
from flask import current_app, url_for from flask import current_app, url_for
from flask_hashids import HashidMixin from flask_hashids import HashidMixin
from time import sleep from time import sleep
from typing import Union
from pathlib import Path from pathlib import Path
import shutil import shutil
from app import db from app import db
@ -21,7 +20,7 @@ class JobStatus(IntEnum):
FAILED = 8 FAILED = 8
@staticmethod @staticmethod
def get(job_status: Union['JobStatus', int, str]) -> 'JobStatus': def get(job_status: 'JobStatus | int | str') -> 'JobStatus':
if isinstance(job_status, JobStatus): if isinstance(job_status, JobStatus):
return job_status return job_status
if isinstance(job_status, int): if isinstance(job_status, int):

View File

@ -1,6 +1,5 @@
from enum import IntEnum from enum import IntEnum
from flask_hashids import HashidMixin from flask_hashids import HashidMixin
from typing import Union
from app import db from app import db
@ -14,7 +13,7 @@ class Permission(IntEnum):
USE_API = 4 USE_API = 4
@staticmethod @staticmethod
def get(permission: Union['Permission', int, str]) -> 'Permission': def get(permission: 'Permission | int | str') -> 'Permission':
if isinstance(permission, Permission): if isinstance(permission, Permission):
return permission return permission
if isinstance(permission, int): if isinstance(permission, int):
@ -38,16 +37,16 @@ class Role(HashidMixin, db.Model):
def __repr__(self): def __repr__(self):
return f'<Role {self.name}>' return f'<Role {self.name}>'
def has_permission(self, permission: Union[Permission, int, str]): def has_permission(self, permission: Permission | int | str):
p = Permission.get(permission) p = Permission.get(permission)
return self.permissions & p.value == p.value return self.permissions & p.value == p.value
def add_permission(self, permission: Union[Permission, int, str]): def add_permission(self, permission: Permission | int | str):
p = Permission.get(permission) p = Permission.get(permission)
if not self.has_permission(p): if not self.has_permission(p):
self.permissions += p.value self.permissions += p.value
def remove_permission(self, permission: Union[Permission, int, str]): def remove_permission(self, permission: Permission | int | str):
p = Permission.get(permission) p = Permission.get(permission)
if self.has_permission(p): if self.has_permission(p):
self.permissions -= p.value self.permissions -= p.value

View File

@ -5,7 +5,6 @@ from flask_hashids import HashidMixin
from flask_login import UserMixin from flask_login import UserMixin
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from pathlib import Path from pathlib import Path
from typing import Union
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
import jwt import jwt
import re import re
@ -26,7 +25,7 @@ class ProfilePrivacySettings(IntEnum):
SHOW_MEMBER_SINCE = 4 SHOW_MEMBER_SINCE = 4
@staticmethod @staticmethod
def get(profile_privacy_setting: Union['ProfilePrivacySettings', int, str]) -> 'ProfilePrivacySettings': def get(profile_privacy_setting: 'ProfilePrivacySettings | int | str') -> 'ProfilePrivacySettings':
if isinstance(profile_privacy_setting, ProfilePrivacySettings): if isinstance(profile_privacy_setting, ProfilePrivacySettings):
return profile_privacy_setting return profile_privacy_setting
if isinstance(profile_privacy_setting, int): if isinstance(profile_privacy_setting, int):
@ -315,7 +314,7 @@ class User(HashidMixin, UserMixin, db.Model):
def has_profile_privacy_setting(self, setting): def has_profile_privacy_setting(self, setting):
s = ProfilePrivacySettings.get(setting) s = ProfilePrivacySettings.get(setting)
return self.profile_privacy_settings & s.value == s.value return self.profile_privacy_settings & s.value == s.value
def add_profile_privacy_setting(self, setting): def add_profile_privacy_setting(self, setting):
s = ProfilePrivacySettings.get(setting) s = ProfilePrivacySettings.get(setting)
if not self.has_profile_privacy_setting(s): if not self.has_profile_privacy_setting(s):
@ -350,7 +349,7 @@ class User(HashidMixin, UserMixin, db.Model):
def is_following_corpus(self, corpus): def is_following_corpus(self, corpus):
return corpus in self.followed_corpora return corpus in self.followed_corpora
def generate_follow_corpus_token(self, corpus_hashid, role_name, expiration=7): def generate_follow_corpus_token(self, corpus_hashid, role_name, expiration=7):
now = datetime.utcnow() now = datetime.utcnow()
payload = { payload = {
@ -366,7 +365,7 @@ class User(HashidMixin, UserMixin, db.Model):
current_app.config['SECRET_KEY'], current_app.config['SECRET_KEY'],
algorithm='HS256' algorithm='HS256'
) )
def follow_corpus_by_token(self, token): def follow_corpus_by_token(self, token):
try: try:
payload = jwt.decode( payload = jwt.decode(

12
wsgi.py
View File

@ -4,17 +4,16 @@ import eventlet
eventlet.monkey_patch() eventlet.monkey_patch()
from flask import Flask # noqa from typing import Any # noqa
from typing import Any, Dict # noqa
from app import create_app, db, scheduler, socketio # noqa from app import create_app, db, scheduler, socketio # noqa
from app import models # noqa from app import models # noqa
app: Flask = create_app() app = create_app()
@app.shell_context_processor @app.shell_context_processor
def make_shell_context() -> Dict[str, Any]: def make_shell_context() -> dict[str, Any]:
''' Adds variables to the shell context. ''' ''' Adds variables to the shell context. '''
return { return {
'db': db, 'db': db,
@ -34,9 +33,8 @@ def make_shell_context() -> Dict[str, Any]:
def main(): def main():
if app.config['NOPAQUE_IS_PRIMARY_INSTANCE']: with app.app_context():
with app.app_context(): scheduler.start()
scheduler.start()
socketio.run(app, host='0.0.0.0') socketio.run(app, host='0.0.0.0')