Cleanup eventcode

This commit is contained in:
Patrick Jentsch 2019-11-07 10:44:02 +01:00
parent 414ebc73ff
commit 1b3d40eed3
4 changed files with 63 additions and 89 deletions

View File

@ -28,9 +28,10 @@ def recv_query(message):
def observe_corpus_analysis_connection(app, corpus_id, session_id): def observe_corpus_analysis_connection(app, corpus_id, session_id):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
with app.app_context(): with app.app_context():
cqi_client = CQiClient(host='172.25.0.2') # cqi_client = CQiClient(host='192.168.0.2')
# cqi_client.ctrl_connect('opaque', 'opaque')
while session_id in connected_sessions: while session_id in connected_sessions:
logger.warning(cqi_client.ctrl_ping()) # logger.warning(cqi_client.ctrl_ping())
logger.warning('Run container, run!') logger.warning('Run container, run!')
socketio.sleep(3) socketio.sleep(3)
corpus = Corpus.query.filter_by(id=corpus_id).first() corpus = Corpus.query.filter_by(id=corpus_id).first()

View File

@ -1,7 +1,7 @@
from flask import current_app, request from flask import current_app, request
from flask_login import current_user, login_required from flask_login import current_user, login_required
from . import socketio
from .decorators import admin_required from .decorators import admin_required
from . import db, socketio
from .models import User from .models import User
import json import json
import jsonpatch import jsonpatch
@ -9,8 +9,9 @@ import logging
''' '''
' A list containing session ids of connected Socket.IO sessions. It is used to ' A list containing session ids of connected Socket.IO sessions, to keep track
' determine runtimes of associated background tasks. ' of all connected sessions, which is used to determine the runtimes of
' associated background tasks.
''' '''
connected_sessions = [] connected_sessions = []
@ -19,47 +20,41 @@ connected_sessions = []
@login_required @login_required
def connect(): def connect():
''' '''
' The Socket.IO module creates a session id (sid) on each request. The ' The Socket.IO module creates a session id (sid) for each request.
' initiating client is automatically placed in a room with that sid, which ' On connect the sid is saved in the connected sessions list.
' will be used for further information exchange generated by a background
' task associated with the sid.
''' '''
connected_sessions.append(request.sid) connected_sessions.append(request.sid)
socketio.start_background_task(background_task,
current_app._get_current_object(),
current_user.id,
request.sid)
@socketio.on('disconnect') @socketio.on('disconnect')
@login_required @login_required
def disconnect(): def disconnect():
''' '''
' On disconnect the session id (sid) of the connection gets removed from ' On disconnect the session id gets removed from the connected sessions
' connected sessions list (see above). ' list.
''' '''
connected_sessions.remove(request.sid) connected_sessions.remove(request.sid)
@socketio.on('inspect_user') @socketio.on('subscribe_user_ressources')
@login_required
def subscribe_user_ressources():
socketio.start_background_task(user_ressource_subscription_handler,
current_app._get_current_object(),
current_user.id, request.sid)
@socketio.on('subscribe_foreign_user_ressources')
@login_required @login_required
@admin_required @admin_required
def inspect_user(user_id): def subscribe_foreign_user_ressources(user_id):
''' socketio.start_background_task(user_ressource_subscription_handler,
' The Socket.IO module creates a session id (sid) on each request. The
' initiating admin is automatically placed in a room with that sid, which
' will be used for further information exchange generated by a background
' task associated with the sid. Admin will be placed in that room on emiting
' "conncect_admin".
'''
socketio.start_background_task(background_task,
current_app._get_current_object(), current_app._get_current_object(),
user_id, user_id, request.sid, True)
request.sid,
True)
def background_task(app, user_id, session_id, foreign=False): def user_ressource_subscription_handler(app, user_id, session_id,
foreign=False):
''' '''
' Sends initial corpus and job lists to the client. Afterwards it checks ' Sends initial corpus and job lists to the client. Afterwards it checks
' every 3 seconds if changes to the initial values appeared. If changes are ' every 3 seconds if changes to the initial values appeared. If changes are
@ -67,39 +62,41 @@ def background_task(app, user_id, session_id, foreign=False):
' '
' NOTE: The initial values are send as a init-* events. ' NOTE: The initial values are send as a init-* events.
' The JSON patches are send as update-* events. ' The JSON patches are send as update-* events.
'
' > where '*' is either 'corpora' or 'jobs' ' > where '*' is either 'corpora' or 'jobs'
''' '''
logger = logging.getLogger(__name__)
logger.warning('user_id: {}, foreign: {}'.format(user_id, foreign))
init_events = {'corpora': 'init-foreign-corpora' if foreign
else 'init-corpora',
'jobs': 'init-foreign-jobs' if foreign else 'init-jobs'}
update_events = {'corpora': 'update-foreign-corpora' if foreign
else 'update-corpora',
'jobs': 'update-foreign-jobs' if foreign
else 'update-jobs'}
with app.app_context(): with app.app_context():
user = db.session.query(User).get_or_404(user_id) # Gather current values from database.
''' Get current values from the database. ''' user = User.query.filter_by(id=user_id).first()
corpora = user.corpora_as_dict() corpora = user.corpora_as_dict()
jobs = user.jobs_as_dict() jobs = user.jobs_as_dict()
''' Send initial values. ''' # Send initial values to the user.
socketio.emit('init-foreign-corpora' if foreign else 'init-corpora', socketio.emit(init_events['corpora'], json.dumps(corpora),
json.dumps(corpora),
room=session_id) room=session_id)
socketio.emit('init-foreign-jobs' if foreign else 'init-jobs', socketio.emit(init_events['jobs'], json.dumps(jobs), room=session_id)
json.dumps(jobs),
room=session_id)
''' TODO: Implement maximum runtime for this loop. '''
while session_id in connected_sessions: while session_id in connected_sessions:
''' Get current values from the database ''' # Get new values from the database
new_corpora = user.corpora_as_dict() new_corpora = user.corpora_as_dict()
new_jobs = user.jobs_as_dict() new_jobs = user.jobs_as_dict()
''' Compute JSON patches. ''' # Compute JSON patches.
corpus_patch = jsonpatch.JsonPatch.from_diff(corpora, new_corpora) corpora_patch = jsonpatch.JsonPatch.from_diff(corpora, new_corpora)
jobs_patch = jsonpatch.JsonPatch.from_diff(jobs, new_jobs) jobs_patch = jsonpatch.JsonPatch.from_diff(jobs, new_jobs)
''' In case there are patches, send them to the user. ''' # In case there are patches, send them to the user.
if corpus_patch: if corpora_patch:
socketio.emit('update-foreign-corpora' if foreign else 'update-corpora', socketio.emit(update_events['corpora'],
corpus_patch.to_string(), corpora_patch.to_string(), room=session_id)
room=session_id)
if jobs_patch: if jobs_patch:
socketio.emit('update-foreign-jobs' if foreign else 'update-jobs', socketio.emit(update_events['jobs'], jobs_patch.to_string(),
jobs_patch.to_string(),
room=session_id) room=session_id)
''' Set new values as references for the next iteration. ''' # Set new values as references for the next iteration.
corpora = new_corpora corpora = new_corpora
jobs = new_jobs jobs = new_jobs
socketio.sleep(3) socketio.sleep(3)

View File

@ -35,8 +35,7 @@
</div> </div>
</div> </div>
<script> <script>
var selected_user_id = {{selected_user.id|tojson|safe}} socket.emit('subscribe_foreign_user_ressources', {{ selected_user.id }});
socket.emit('inspect_user', selected_user_id);
</script> </script>
<div class="col s12 m6"> <div class="col s12 m6">
<div id="job-foreign-list"> <div id="job-foreign-list">

View File

@ -38,41 +38,29 @@
var jobsSubscribers = []; var jobsSubscribers = [];
var socket = io(); var socket = io();
socket.emit('subscribe_user_ressources');
socket.on('init-corpora', function(msg) { socket.on('init-corpora', function(msg) {
var subscriber;
corpora = JSON.parse(msg); corpora = JSON.parse(msg);
for (subscriber of corporaSubscribers) {subscriber._init(corpora);} for (let subscriber of corporaSubscribers) {subscriber._init(corpora);}
}); });
socket.on('init-jobs', function(msg) { socket.on('init-jobs', function(msg) {
var subscriber;
jobs = JSON.parse(msg); jobs = JSON.parse(msg);
for (subscriber of jobsSubscribers) {subscriber._init(jobs);} for (let subscriber of jobsSubscribers) {subscriber._init(jobs);}
}); });
socket.on('update-corpora', function(msg) { socket.on('update-corpora', function(msg) {
var patch, patchedCorpora, subscriber; var patch;
patch = JSON.parse(msg); patch = JSON.parse(msg);
corpora = jsonpatch.apply_patch(corpora, patch); corpora = jsonpatch.apply_patch(corpora, patch);
for (subscriber of corporaSubscribers) {subscriber._update(patch);} for (let subscriber of corporaSubscribers) {subscriber._update(patch);}
}); });
socket.on('update-jobs', function(msg) { socket.on('update-jobs', function(msg) {
var patch, patchedJobs, subscriber; var patch;
patch = JSON.parse(msg); patch = JSON.parse(msg);
jobs = jsonpatch.apply_patch(jobs, patch); jobs = jsonpatch.apply_patch(jobs, patch);
for (subscriber of jobsSubscribers) {subscriber._update(patch);} for (let subscriber of jobsSubscribers) {subscriber._update(patch);}
}); });
socket.on('message', function(msg) { socket.on('message', function(msg) {
console.log(msg); console.log(msg);
}); });
@ -83,38 +71,27 @@
var foreignJobs; var foreignJobs;
var foreignJobsSubscribers = []; var foreignJobsSubscribers = [];
socket.on('init-foreign-corpora', function(msg) { socket.on('init-foreign-corpora', function(msg) {
var subscriber;
foreignCorpora = JSON.parse(msg); foreignCorpora = JSON.parse(msg);
for (subscriber of foreignCorporaSubscribers) {subscriber._init(foreignCorpora);} for (let subscriber of foreignCorporaSubscribers) {subscriber._init(foreignCorpora);}
}); });
socket.on('init-foreign-jobs', function(msg) { socket.on('init-foreign-jobs', function(msg) {
var subscriber;
foreignJobs = JSON.parse(msg); foreignJobs = JSON.parse(msg);
for (subscriber of foreignJobsSubscribers) {subscriber._init(foreignJobs);} for (let subscriber of foreignJobsSubscribers) {subscriber._init(foreignJobs);}
}); });
socket.on('update-foreign-corpora', function(msg) { socket.on('update-foreign-corpora', function(msg) {
var patch, patchedCorpora, subscriber; var patch;
patch = JSON.parse(msg); patch = JSON.parse(msg);
foreignCorpora = jsonpatch.apply_patch(foreignCorpora, patch); foreignCorpora = jsonpatch.apply_patch(foreignCorpora, patch);
for (subscriber of foreignCorporaSubscribers) {subscriber._update(patch);} for (let subscriber of foreignCorporaSubscribers) {subscriber._update(patch);}
}); });
socket.on('update-foreign-jobs', function(msg) { socket.on('update-foreign-jobs', function(msg) {
var patch, patchedJobs, subscriber; var patch;
patch = JSON.parse(msg); patch = JSON.parse(msg);
foreignJobs = jsonpatch.apply_patch(foreignJobs, patch); foreignJobs = jsonpatch.apply_patch(foreignJobs, patch);
for (subscriber of foreignJobsSubscribers) {subscriber._update(patch);} for (let subscriber of foreignJobsSubscribers) {subscriber._update(patch);}
}); });
</script> </script>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/> <meta name="viewport" content="width=device-width, initial-scale=1.0"/>