Handle user data streams different 1/2

In the future this will be much more performant!
This commit is contained in:
Patrick Jentsch 2020-04-30 11:28:55 +02:00
parent b7fc804b40
commit e1b07d8719
7 changed files with 83 additions and 121 deletions

View File

@ -1,6 +1,6 @@
from flask import current_app, request
from flask_login import current_user
from . import socketio
from . import db, socketio
from .decorators import socketio_admin_required, socketio_login_required
from .models import User
import json
@ -33,24 +33,24 @@ def disconnect():
connected_sessions.remove(request.sid)
@socketio.on('user_ressources_init')
@socketio.on('user_data_stream_init')
@socketio_login_required
def subscribe_user_ressources():
socketio.start_background_task(user_ressource_session_handler,
def user_data_stream_init():
socketio.start_background_task(user_data_stream,
current_app._get_current_object(),
current_user.id, request.sid)
@socketio.on('foreign_user_ressources_init')
@socketio.on('foreign_user_data_stream_init')
@socketio_login_required
@socketio_admin_required
def subscribe_foreign_user_ressources(user_id):
socketio.start_background_task(user_ressource_session_handler,
def foreign_user_data_stream_init(user_id):
socketio.start_background_task(user_data_stream,
current_app._get_current_object(),
user_id, request.sid, True)
def user_ressource_session_handler(app, user_id, session_id, foreign=False):
def user_data_stream(app, user_id, session_id, foreign=False):
'''
' Sends initial corpus and job lists to the client. Afterwards it checks
' every 3 seconds if changes to the initial values appeared. If changes are
@ -59,37 +59,29 @@ def user_ressource_session_handler(app, user_id, session_id, foreign=False):
' NOTE: The initial values are send as a init events.
' The JSON patches are send as update events.
'''
init_events = \
{'corpora': 'foreign_corpora_init' if foreign else 'corpora_init',
'jobs': 'foreign_jobs_init' if foreign else 'jobs_init'}
update_events = \
{'corpora': 'foreign_corpora_update' if foreign else 'corpora_update',
'jobs': 'foreign_jobs_update' if foreign else 'jobs_update'}
if foreign:
init_event = 'foreign_user_data_stream_init'
update_event = 'foreign_user_data_stream_update'
else:
init_event = 'user_data_stream_init'
update_event = 'user_data_stream_update'
with app.app_context():
# Gather current values from database.
user = User.query.get(user_id)
corpora = {corpus.id: corpus.to_dict() for corpus in user.corpora}
jobs = {job.id: job.to_dict() for job in user.jobs}
user_dict = user.to_dict()
# Send initial values to the user.
socketio.emit(init_events['corpora'], json.dumps(corpora),
room=session_id)
socketio.emit(init_events['jobs'], json.dumps(jobs), room=session_id)
socketio.emit(init_event, json.dumps(user_dict), room=session_id)
while session_id in connected_sessions:
# Get new values from the database
new_corpora = {corpus.id: corpus.to_dict()
for corpus in user.corpora}
new_jobs = {job.id: job.to_dict() for job in user.jobs}
db.session.refresh(user)
new_user_dict = user.to_dict()
# Compute JSON patches.
corpora_patch = jsonpatch.JsonPatch.from_diff(corpora, new_corpora)
jobs_patch = jsonpatch.JsonPatch.from_diff(jobs, new_jobs)
user_patch = jsonpatch.JsonPatch.from_diff(user_dict,
new_user_dict)
# In case there are patches, send them to the user.
if corpora_patch:
socketio.emit(update_events['corpora'],
corpora_patch.to_string(), room=session_id)
if jobs_patch:
socketio.emit(update_events['jobs'], jobs_patch.to_string(),
if user_patch:
socketio.emit(update_event, user_patch.to_string(),
room=session_id)
# Set new values as references for the next iteration.
corpora = new_corpora
jobs = new_jobs
user_dict = new_user_dict
socketio.sleep(3)

View File

@ -132,19 +132,20 @@ class User(UserMixin, db.Model):
def to_dict(self):
return {'id': self.id,
'role_id': self.role_id,
'confirmed': self.confirmed,
'email': self.email,
'last_seen': self.last_seen.timestamp(),
'member_since': self.member_since.timestamp(),
'role_id': self.role_id,
'username': self.username,
'settings': {'dark_mode': self.setting_dark_mode,
'job_status_mail_notifications':
self.setting_job_status_mail_notifications,
'job_status_site_notifications':
self.setting_job_status_site_notifications},
'corpora': [corpus.to_dict() for corpus in self.corpora],
'jobs': [job.to_dict() for job in self.jobs]}
'corpora': {corpus.id: corpus.to_dict()
for corpus in self.corpora},
'jobs': {job.id: job.to_dict() for job in self.jobs}}
def __repr__(self):
"""
@ -371,10 +372,11 @@ class Job(db.Model):
'description': self.description,
'end_date': (self.end_date.timestamp() if self.end_date else
None),
'inputs': [input.to_dict() for input in self.inputs],
'inputs': {input.id: input.to_dict() for input in self.inputs},
'mem_mb': self.mem_mb,
'n_cores': self.n_cores,
'results': [result.to_dict() for result in self.results],
'results': {result.id: result.to_dict()
for result in self.results},
'service': self.service,
'service_args': self.service_args,
'service_version': self.service_version,
@ -455,7 +457,7 @@ class Corpus(db.Model):
'description': self.description,
'status': self.status,
'title': self.title,
'files': [file.to_dict() for file in self.files]}
'files': {file.id: file.to_dict() for file in self.files}}
def delete(self):
for corpus_file in self.files:

View File

@ -29,66 +29,48 @@ nopaque.socket = {};
nopaque.socket.init = function() {
nopaque.socket = io({transports: ['websocket']});
// Add event handlers
nopaque.socket.on("corpora_init", function(msg) {
nopaque.corpora = JSON.parse(msg);
for (let subscriber of nopaque.corporaSubscribers) {subscriber._init(nopaque.corpora);}
nopaque.socket.on("user_data_stream_init", function(msg) {
nopaque.user = JSON.parse(msg);
for (let subscriber of nopaque.corporaSubscribers) {subscriber._init(nopaque.user.corpora);}
for (let subscriber of nopaque.jobsSubscribers) {subscriber._init(nopaque.user.jobs);}
});
nopaque.socket.on("jobs_init", function(msg) {
nopaque.jobs = JSON.parse(msg);
for (let subscriber of nopaque.jobsSubscribers) {subscriber._init(nopaque.jobs);}
});
nopaque.socket.on("corpora_update", function(msg) {
nopaque.socket.on("user_data_stream_update", function(msg) {
var patch;
patch = JSON.parse(msg);
nopaque.corpora = jsonpatch.apply_patch(nopaque.corpora, patch);
for (let subscriber of nopaque.corporaSubscribers) {subscriber._update(patch);}
});
nopaque.socket.on("jobs_update", function(msg) {
var patch;
patch = JSON.parse(msg);
nopaque.jobs = jsonpatch.apply_patch(nopaque.jobs, patch);
if (["all", "end"].includes(nopaque.user.settings.jobStatusSiteNotifications)) {
for (operation of patch) {
/* "/jobId/valueName" -> ["jobId", "valueName"] */
pathArray = operation.path.split("/").slice(1);
nopaque.user = jsonpatch.apply_patch(nopaque.user, patch);
corpora_patch = patch.filter(operation => operation.path.startsWith("/corpora"));
jobs_patch = patch.filter(operation => operation.path.startsWith("/jobs"));
for (let subscriber of nopaque.corporaSubscribers) {subscriber._update(corpora_patch);}
for (let subscriber of nopaque.jobsSubscribers) {subscriber._update(jobs_patch);}
if (["all", "end"].includes(nopaque.user.settings.job_status_site_notifications)) {
for (operation of jobs_patch) {
/* "/jobs/{jobId}/..." -> ["{jobId}", ...] */
pathArray = operation.path.split("/").slice(2);
if (operation.op === "replace" && pathArray[1] === "status") {
if (nopaque.user.settings.jobStatusSiteNotifications === "end" && !["complete", "failed"].includes(operation.value)) {continue;}
nopaque.flash(`[<a href="/jobs/${pathArray[0]}">${nopaque.jobs[pathArray[0]].title}</a>] New status: ${operation.value}`, "job");
if (nopaque.user.settings.job_status_site_notifications === "end" && !["complete", "failed"].includes(operation.value)) {continue;}
nopaque.flash(`[<a href="/jobs/${pathArray[0]}">${nopaque.user.jobs[pathArray[0]].title}</a>] New status: ${operation.value}`, "job");
}
}
}
for (let subscriber of nopaque.jobsSubscribers) {subscriber._update(patch);}
});
nopaque.socket.on("foreign_corpora_init", function(msg) {
nopaque.foreignCorpora = JSON.parse(msg);
for (let subscriber of nopaque.foreignCorporaSubscribers) {subscriber._init(nopaque.foreignCorpora);}
nopaque.socket.on("foreign_user_data_stream_init", function(msg) {
nopaque.foreignUser = JSON.parse(msg);
for (let subscriber of nopaque.foreignCorporaSubscribers) {subscriber._init(nopaque.foreignUser.corpora);}
for (let subscriber of nopaque.foreignJobsSubscribers) {subscriber._init(nopaque.foreignUser.jobs);}
});
nopaque.socket.on("foreign_jobs_init", function(msg) {
nopaque.foreignJobs = JSON.parse(msg);
for (let subscriber of nopaque.foreignJobsSubscribers) {subscriber._init(nopaque.foreignJobs);}
});
nopaque.socket.on("foreign_corpora_update", function(msg) {
nopaque.socket.on("foreign_user_data_stream_update", function(msg) {
var patch;
patch = JSON.parse(msg);
nopaque.foreignCorpora = jsonpatch.apply_patch(nopaque.foreignCorpora, patch);
for (let subscriber of nopaque.foreignCorporaSubscribers) {subscriber._update(patch);}
});
nopaque.socket.on("foreign_jobs_update", function(msg) {
var patch;
patch = JSON.parse(msg);
nopaque.foreignJobs = jsonpatch.apply_patch(nopaque.foreignJobs, patch);
for (let subscriber of nopaque.foreignJobsSubscribers) {subscriber._update(patch);}
nopaque.foreignUser = jsonpatch.apply_patch(nopaque.foreignUser, patch);
corpora_patch = patch.filter(operation => operation.path.startsWith("/corpora"));
jobs_patch = patch.filter(operation => operation.path.startsWith("/jobs"));
for (let subscriber of nopaque.foreignCorporaSubscribers) {subscriber._update(corpora_patch);}
for (let subscriber of nopaque.foreignJobsSubscribers) {subscriber._update(jobs_patch);}
});
}
@ -233,11 +215,6 @@ document.addEventListener("DOMContentLoaded", function() {
flashedMessage = nopaque.flashedMessages.shift();
nopaque.flash(flashedMessage[1], flashedMessage[0]);
}
if (nopaque.user.isAuthenticated) {
if (nopaque.user.settings.darkMode) {
DarkReader.enable({"brightness": 150, "contrast": 100, "sepia": 0});
}
nopaque.socket.init();
nopaque.socket.emit("user_ressources_init");
}
nopaque.socket.init();
nopaque.socket.emit("user_data_stream_init");
});

View File

@ -13,6 +13,7 @@ class RessourceList extends List {
_init(ressources) {
this.clear();
this.addRessources(Object.values(ressources));
this.sort("creation_date", {order: "desc"});
}
@ -22,8 +23,8 @@ class RessourceList extends List {
let item, pathArray;
for (let operation of patch) {
/* "/ressourceId/valueName" -> ["ressourceId", "valueName"] */
pathArray = operation.path.split("/").slice(1);
/* "/{ressourceName}/{ressourceId}/..." -> ["{ressourceId}", "..."] */
pathArray = operation.path.split("/").slice(2);
switch(operation.op) {
case "add":
if (pathArray.includes("results")) {break;}
@ -48,7 +49,6 @@ class RessourceList extends List {
}
}
addRessources(ressources) {
this.add(ressources.map(x => RessourceList.dataMapper[this.type](x)));
}

View File

@ -130,19 +130,19 @@
_init() {
let corpus;
corpus = (this.foreignCorpusFlag ? nopaque.foreignCorpora[this.corpusId]
: nopaque.corpora[this.corpusId]);
corpus = (this.foreignCorpusFlag ? nopaque.foreignUser.corpora[this.corpusId]
: nopaque.user.corpora[this.corpusId]);
// Status
this.setStatus(corpus.status, corpus.files.length);
this.setStatus(corpus.status);
}
_update(patch) {
let pathArray;
for (let operation of patch) {
/* "/corpusId/valueName" -> ["corpusId", "valueName"] */
pathArray = operation.path.split("/").slice(1);
/* "/corpora/{corpusId}/valueName" -> ["{corpusId}", ...] */
pathArray = operation.path.split("/").slice(2);
if (pathArray[0] != this.corpusId) {continue;}
switch(operation.op) {
case "add":
@ -165,7 +165,7 @@
setStatus(status) {
let analyzeElement, buildElement, numFiles, progressIndicatorElement, statusElement;
numFiles = (this.foreignCorpusFlag ? nopaque.foreignCorpora[this.corpusId] : nopaque.corpora[this.corpusId]).files.length;
numFiles = Object.keys((this.foreignCorpusFlag ? nopaque.foreignUser.corpora[this.corpusId] : nopaque.user.corpora[this.corpusId]).files).length;
progressIndicatorElement = document.getElementById("progress-indicator");
if (["queued", "running", "start analysis", "stop analysis"].includes(status)) {

View File

@ -157,9 +157,8 @@
_init() {
let job;
job = (this.foreignJobFlag ? nopaque.foreignJobs[this.jobId]
: nopaque.jobs[this.jobId]);
job = (this.foreignJobFlag ? nopaque.foreignUser.jobs[this.jobId]
: nopaque.user.jobs[this.jobId]);
// End date
this.setEndDate(job.end_date);
// Status
@ -174,8 +173,8 @@
let pathArray;
for (let operation of patch) {
/* "/jobId/valueName" -> ["jobId", "valueName"] */
pathArray = operation.path.split("/").slice(1);
/* "/jobs/{jobId}/..." -> ["{jobId}", ...] */
pathArray = operation.path.split("/").slice(2);
if (pathArray[0] != this.jobId) {continue;}
switch(operation.op) {
case "add":
@ -212,20 +211,15 @@
}
setResults(results) {
let resultsElement;
results.sort(function (a, b) {
var filenameA = a.filename.toUpperCase();
var filenameB = b.filename.toUpperCase();
if (filenameA < filenameB) {
return -1;
}
if (filenameA > filenameB) {
return 1;
}
let resultsArray, resultsElement;
resultsArray = Object.values(results)
resultsArray.sort(function (a, b) {
if (a.filename < b.filename) {return -1;}
if (a.filename > b.filename) {return 1;}
return 0;
});
resultsElement = document.getElementById("results");
for (let result of results) {
for (let result of resultsArray) {
resultsElement.insertAdjacentHTML(
"beforeend",
`<tr>

View File

@ -44,19 +44,16 @@
{% endif %}
<script src="{{ url_for('static', filename='js/JSONPatch.js/jsonpatch.min.js') }}"></script>
<script src="{{ url_for('static', filename='js/Dark_Reader/darkreader.js') }}"></script>
{% if current_user.is_authenticated and current_user.setting_dark_mode %}
<script>
DarkReader.enable({"brightness": 150, "contrast": 100, "sepia": 0});
</script>
{% endif %}
<script src="{{ url_for('static', filename='js/List.js/list.min.js') }}"></script>
<script src="{{ url_for('static', filename='js/Socket.IO/socket.io.slim.js') }}"></script>
<script src="{{ url_for('static', filename='js/nopaque.js') }}"></script>
<script src="{{ url_for('static', filename='js/nopaque.lists.js') }}"></script>
<script>
{% if current_user.is_authenticated %}
nopaque.user.isAuthenticated = true;
nopaque.user.settings.darkMode = {{ current_user.setting_dark_mode|tojson }};
nopaque.user.settings.jobStatusMailNotifications = {{ current_user.setting_job_status_mail_notifications|tojson }};
nopaque.user.settings.jobStatusSiteNotifications = {{ current_user.setting_job_status_site_notifications|tojson }};
{% else %}
nopaque.user.isAuthenticated = false;
{% endif %}
nopaque.flashedMessages = {{ get_flashed_messages(with_categories=True)|tojson }};
</script>
</head>