mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-01-23 16:20:34 +00:00
Change delete execution
This commit is contained in:
parent
1152417419
commit
bab479db20
@ -2,7 +2,7 @@ from app import db
|
||||
from app.decorators import admin_required
|
||||
from app.models import Role, User
|
||||
from app.tables import AdminUserItem, AdminUserTable
|
||||
from app.utils import background_delete_user
|
||||
from app.background_functions import delete_user_
|
||||
from flask import current_app, flash, redirect, render_template, url_for
|
||||
from flask_login import login_required
|
||||
from . import admin
|
||||
@ -50,10 +50,9 @@ def admin_user_page(user_id):
|
||||
@login_required
|
||||
@admin_required
|
||||
def admin_delete_user(user_id):
|
||||
delete_thread = threading.Thread(
|
||||
target=background_delete_user,
|
||||
args=(current_app._get_current_object(), user_id)
|
||||
)
|
||||
delete_thread = threading.Thread(target=delete_user_,
|
||||
args=(current_app._get_current_object(),
|
||||
user_id))
|
||||
delete_thread.start()
|
||||
flash('User {} has been deleted!'.format(user_id))
|
||||
return redirect(url_for('admin.for_admins_only'))
|
||||
|
25
app/background_functions.py
Normal file
25
app/background_functions.py
Normal file
@ -0,0 +1,25 @@
|
||||
from .models import Corpus, Job, User
|
||||
|
||||
|
||||
def delete_corpus_(app, corpus_id):
|
||||
with app.app_context():
|
||||
corpus = Corpus.query.filter_by(id=corpus_id).first()
|
||||
if corpus is None:
|
||||
raise Exception('Corpus {} not found!'.format(corpus_id))
|
||||
corpus.delete()
|
||||
|
||||
|
||||
def delete_job_(app, job_id):
|
||||
with app.app_context():
|
||||
job = Job.query.filter_by(id=job_id).first()
|
||||
if job is None:
|
||||
raise Exception('Job {} not found!'.format(job_id))
|
||||
job.delete()
|
||||
|
||||
|
||||
def delete_user_(app, user_id):
|
||||
with app.app_context():
|
||||
user = User.query.filter_by(id=user_id).first()
|
||||
if user is None:
|
||||
raise Exception('User {} not found!'.format(user_id))
|
||||
user.delete()
|
@ -5,7 +5,7 @@ from flask import (abort, current_app, flash, redirect, request,
|
||||
from flask_login import current_user, login_required
|
||||
from werkzeug.utils import secure_filename
|
||||
from . import corpora
|
||||
from .background_tasks import (delete_corpus_, delete_corpus_file_,
|
||||
from .background_functions import (delete_corpus_, delete_corpus_file_,
|
||||
edit_corpus_file_)
|
||||
from .forms import (AddCorpusFileForm, AddCorpusForm, EditCorpusFileForm,
|
||||
QueryDownloadForm, QueryForm)
|
||||
@ -28,11 +28,11 @@ def add_corpus():
|
||||
try:
|
||||
os.makedirs(dir)
|
||||
except OSError:
|
||||
flash('OSError!')
|
||||
db.session.remove(corpus)
|
||||
db.session.commit()
|
||||
flash('Corpus added!')
|
||||
return redirect(url_for('corpora.corpus', corpus_id=corpus.id))
|
||||
flash('[ERROR]: Could not add corpus!')
|
||||
corpus.delete()
|
||||
else:
|
||||
flash('Corpus added!')
|
||||
return redirect(url_for('corpora.corpus', corpus_id=corpus.id))
|
||||
return render_template('corpora/add_corpus.html.j2',
|
||||
add_corpus_form=add_corpus_form,
|
||||
title='Add corpus')
|
||||
|
@ -1,5 +1,5 @@
|
||||
from app.models import Job, JobInput, JobResult
|
||||
from app.utils import background_delete_job
|
||||
from app.background_functions import delete_job_
|
||||
from flask import (abort, current_app, flash, redirect, render_template,
|
||||
send_from_directory, url_for)
|
||||
from flask_login import current_user, login_required
|
||||
@ -23,7 +23,7 @@ def delete_job(job_id):
|
||||
job = Job.query.get_or_404(job_id)
|
||||
if not (job.creator == current_user or current_user.is_administrator()):
|
||||
abort(403)
|
||||
delete_thread = threading.Thread(target=background_delete_job,
|
||||
delete_thread = threading.Thread(target=delete_job_,
|
||||
args=(current_app._get_current_object(),
|
||||
job_id))
|
||||
delete_thread.start()
|
||||
|
141
app/models.py
141
app/models.py
@ -3,8 +3,7 @@ from flask import current_app
|
||||
from flask_login import UserMixin, AnonymousUserMixin
|
||||
from itsdangerous import BadSignature, TimedJSONWebSignatureSerializer
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from . import db
|
||||
from . import login_manager
|
||||
from . import db, logger, login_manager
|
||||
import os
|
||||
import shutil
|
||||
import xml.etree.ElementTree as ET
|
||||
@ -83,12 +82,9 @@ class Role(db.Model):
|
||||
to them. Order of the roles dictionary determines the ID of each role.
|
||||
User hast the ID 1 and Administrator has the ID 2.
|
||||
"""
|
||||
roles = {
|
||||
'User': [Permission.CREATE_JOB],
|
||||
'Administrator': [Permission.ADMIN,
|
||||
Permission.CREATE_JOB,
|
||||
Permission.DELETE_JOB]
|
||||
}
|
||||
roles = {'User': [Permission.CREATE_JOB],
|
||||
'Administrator': [Permission.ADMIN, Permission.CREATE_JOB,
|
||||
Permission.DELETE_JOB]}
|
||||
default_role = 'User'
|
||||
for r in roles:
|
||||
role = Role.query.filter_by(name=r).first()
|
||||
@ -208,17 +204,21 @@ class User(UserMixin, db.Model):
|
||||
"""
|
||||
return self.can(Permission.ADMIN)
|
||||
|
||||
def delete_user(self):
|
||||
def delete(self):
|
||||
"""
|
||||
Delete user from database. Also delete all associated jobs and corpora
|
||||
files.
|
||||
Delete the user and its corpora and jobs from database and filesystem.
|
||||
"""
|
||||
delete_path = os.path.join('/mnt/opaque/', str(self.id))
|
||||
while os.path.exists(delete_path):
|
||||
try:
|
||||
shutil.rmtree(delete_path, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
for job in self.jobs:
|
||||
job.delete()
|
||||
for corpus in self.corpora:
|
||||
corpus.delete()
|
||||
path = os.path.join(current_app.config['OPAQUE_STORAGE_DIRECTORY'],
|
||||
str(self.id))
|
||||
try:
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
pass
|
||||
db.session.delete(self)
|
||||
db.session.commit()
|
||||
|
||||
@ -246,9 +246,7 @@ class JobInput(db.Model):
|
||||
dir = db.Column(db.String(255))
|
||||
job_id = db.Column(db.Integer, db.ForeignKey('jobs.id'))
|
||||
# Relationships
|
||||
results = db.relationship('JobResult',
|
||||
backref='job_input',
|
||||
lazy='dynamic',
|
||||
results = db.relationship('JobResult', backref='job_input', lazy='dynamic',
|
||||
cascade='save-update, merge, delete')
|
||||
|
||||
def __repr__(self):
|
||||
@ -314,24 +312,44 @@ class Job(db.Model):
|
||||
title = db.Column(db.String(32))
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
||||
# Relationships
|
||||
inputs = db.relationship('JobInput',
|
||||
backref='job',
|
||||
lazy='dynamic',
|
||||
inputs = db.relationship('JobInput', backref='job', lazy='dynamic',
|
||||
cascade='save-update, merge, delete')
|
||||
results = db.relationship('JobResult',
|
||||
backref='job',
|
||||
lazy='dynamic',
|
||||
results = db.relationship('JobResult', backref='job', lazy='dynamic',
|
||||
cascade='save-update, merge, delete')
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(Job, self).__init__(**kwargs)
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
String representation of the Job. For human readability.
|
||||
"""
|
||||
return '<Job %r>' % self.title
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
Delete the job and its inputs and outputs from database and filesystem.
|
||||
"""
|
||||
self.status = 'stopping'
|
||||
db.session.commit()
|
||||
while self.status != 'deleted':
|
||||
''' TODO: wait a second '''
|
||||
db.session.refresh(self)
|
||||
path = os.path.join(current_app.config['OPAQUE_STORAGE_DIRECTORY'],
|
||||
str(self.user_id), 'jobs', str(self.id))
|
||||
'''
|
||||
' TODO: Remove this workaround by executing the following command
|
||||
' before service removal.
|
||||
'
|
||||
' docker service update --mount-rm <service>
|
||||
'''
|
||||
while os.path.exists(path):
|
||||
try:
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
''' TODO: Proper exception handling '''
|
||||
logger.warning(e)
|
||||
pass
|
||||
db.session.delete(self)
|
||||
db.session.commit()
|
||||
|
||||
def to_dict(self):
|
||||
return {'id': self.id,
|
||||
'creation_date': self.creation_date.timestamp(),
|
||||
@ -349,34 +367,6 @@ class Job(db.Model):
|
||||
'title': self.title,
|
||||
'user_id': self.user_id}
|
||||
|
||||
def flag_for_stop(self):
|
||||
"""
|
||||
Flag running or failed job (anything that is not completed) with
|
||||
stopping. Opaque daemon will end services flaged with 'stopping'.
|
||||
"""
|
||||
self.status = 'stopping'
|
||||
db.session.commit()
|
||||
|
||||
def delete_job(self):
|
||||
"""
|
||||
Delete job with given job id from database. Also delete associated job
|
||||
files. Contianers are still running for a few seconds after
|
||||
the associated service has been removed. This is the reason for the
|
||||
while loop. The loop checks if the file path to all the job files still
|
||||
exists and removes it again and again till the container did shutdown
|
||||
for good.
|
||||
See: https://docs.docker.com/engine/swarm/swarm-tutorial/delete-service/
|
||||
"""
|
||||
delete_path = os.path.join('/mnt/opaque/', str(self.user_id), 'jobs',
|
||||
str(self.id))
|
||||
while os.path.exists(delete_path):
|
||||
try:
|
||||
shutil.rmtree(delete_path, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
db.session.delete(self)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
class CorpusFile(db.Model):
|
||||
"""
|
||||
@ -394,20 +384,20 @@ class CorpusFile(db.Model):
|
||||
|
||||
def delete(self):
|
||||
path = os.path.join(current_app.config['OPAQUE_STORAGE_DIRECTORY'],
|
||||
self.dir,
|
||||
self.filename)
|
||||
self.dir, self.filename)
|
||||
try:
|
||||
os.remove(path)
|
||||
except:
|
||||
return
|
||||
except Exception as e:
|
||||
''' TODO: Proper exception handling '''
|
||||
logger.warning(e)
|
||||
pass
|
||||
self.corpus.status = 'unprepared'
|
||||
db.session.delete(self)
|
||||
db.session.commit()
|
||||
|
||||
def insert_metadata(self):
|
||||
file = os.path.join(current_app.config['OPAQUE_STORAGE_DIRECTORY'],
|
||||
self.dir,
|
||||
self.filename)
|
||||
self.dir, self.filename)
|
||||
element_tree = ET.parse(file)
|
||||
text_node = element_tree.find('text')
|
||||
text_node.set('author', self.author)
|
||||
@ -433,9 +423,7 @@ class Corpus(db.Model):
|
||||
analysis_container_ip = db.Column(db.String(16))
|
||||
analysis_container_name = db.Column(db.String(32))
|
||||
# Relationships
|
||||
files = db.relationship('CorpusFile',
|
||||
backref='corpus',
|
||||
lazy='dynamic',
|
||||
files = db.relationship('CorpusFile', backref='corpus', lazy='dynamic',
|
||||
cascade='save-update, merge, delete')
|
||||
|
||||
def __repr__(self):
|
||||
@ -456,13 +444,20 @@ class Corpus(db.Model):
|
||||
for corpus_file in self.files:
|
||||
corpus_file.delete()
|
||||
path = os.path.join(current_app.config['OPAQUE_STORAGE_DIRECTORY'],
|
||||
str(self.user_id),
|
||||
'corpora',
|
||||
str(self.id))
|
||||
try:
|
||||
shutil.rmtree(path)
|
||||
except:
|
||||
return
|
||||
str(self.user_id), 'corpora', str(self.id))
|
||||
'''
|
||||
' TODO: Remove this workaround by executing the following command
|
||||
' before service removal.
|
||||
'
|
||||
' docker service update --mount-rm <service>
|
||||
'''
|
||||
while os.path.exists(path):
|
||||
try:
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
''' TODO: Proper exception handling '''
|
||||
logger.warning(e)
|
||||
pass
|
||||
db.session.delete(self)
|
||||
db.session.commit()
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
from app import db, logger
|
||||
from app.utils import background_delete_user
|
||||
from app.background_functions import delete_user_
|
||||
from flask import abort, current_app, flash, redirect, render_template, url_for
|
||||
from flask_login import current_user, login_required, logout_user
|
||||
from . import profile
|
||||
@ -94,10 +94,9 @@ def delete_self():
|
||||
"""
|
||||
View to delete yourslef and all associated data.
|
||||
"""
|
||||
delete_thread = threading.Thread(
|
||||
target=background_delete_user,
|
||||
args=(current_app._get_current_object(), current_user.id)
|
||||
)
|
||||
delete_thread = threading.Thread(target=delete_user_,
|
||||
args=(current_app._get_current_object(),
|
||||
current_user.id))
|
||||
delete_thread.start()
|
||||
logout_user()
|
||||
flash('Your account has been deleted!')
|
||||
|
@ -46,9 +46,8 @@ def service(service):
|
||||
try:
|
||||
os.makedirs(absolut_dir)
|
||||
except OSError:
|
||||
flash('[OSError] Could not add job!')
|
||||
db.session.delete(job)
|
||||
db.session.commit()
|
||||
flash('[ERROR]: Could not add job!')
|
||||
job.delete()
|
||||
else:
|
||||
for file in add_job_form.files.data:
|
||||
filename = secure_filename(file.filename)
|
||||
|
54
app/utils.py
54
app/utils.py
@ -1,54 +0,0 @@
|
||||
from . import db, logger
|
||||
from .models import Job, User, Corpus, CorpusFile
|
||||
|
||||
|
||||
'''
|
||||
' A list of background process functions. Functions should be called using the
|
||||
Thread class from the module threading.
|
||||
'''
|
||||
|
||||
|
||||
def background_delete_user(app, current_user_id):
|
||||
with app.app_context():
|
||||
logger.warning('Called by delete_thread.')
|
||||
logger.warning('User id is: {}.'.format(current_user_id))
|
||||
jobs = Job.query.filter_by(user_id=current_user_id).all()
|
||||
corpora = Corpus.query.filter_by(user_id=current_user_id).all()
|
||||
logger.warning('Jobs to delete are: {}'.format(jobs))
|
||||
user = User.query.get_or_404(current_user_id)
|
||||
for job in jobs:
|
||||
job.flag_for_stop()
|
||||
logger.warning('Job status: {}'.format(job.status))
|
||||
deleted = False
|
||||
while deleted is False:
|
||||
logger.warning('Refreshing')
|
||||
db.session.refresh(job)
|
||||
logger.warning('Refreshed')
|
||||
if job.status == 'deleted':
|
||||
logger.warning('Job status is deleted.')
|
||||
job.delete_job()
|
||||
deleted = True
|
||||
logger.warning('Job deletion loop has ended.')
|
||||
for corpus in corpora:
|
||||
corpus.delete_corpus()
|
||||
logger.warning('Corpus deletion loop has ended.')
|
||||
user.delete_user()
|
||||
|
||||
|
||||
def background_delete_job(app, job_id):
|
||||
with app.app_context():
|
||||
logger.warning('Called by delete_thread.')
|
||||
logger.warning('Job id is: {}.'.format(job_id))
|
||||
job = Job.query.filter_by(id=job_id).first()
|
||||
logger.warning('Job object is: {}'.format(job))
|
||||
logger.warning('Job status: {}'.format(job.status))
|
||||
job.flag_for_stop()
|
||||
logger.warning('Job status: {}'.format(job.status))
|
||||
deleted = False
|
||||
while deleted is False:
|
||||
db.session.refresh(job)
|
||||
if job.status == 'deleted':
|
||||
logger.warning('Job status is deleted.')
|
||||
job.delete_job()
|
||||
deleted = True
|
||||
logger.warning('Loop has ended.')
|
Loading…
x
Reference in New Issue
Block a user