mirror of
https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
synced 2025-01-25 00:50:35 +00:00
58 lines
2.5 KiB
Python
58 lines
2.5 KiB
Python
from .corpus_utils import CheckCorporaMixin
|
|
from .job_utils import CheckJobsMixin
|
|
from ..import db, socketio
|
|
import docker
|
|
|
|
|
|
class TaskRunner(CheckCorporaMixin, CheckJobsMixin):
|
|
def __init__(self):
|
|
self.docker = docker.from_env()
|
|
self._socketio_message_buffer = {}
|
|
|
|
def run(self):
|
|
self.check_corpora()
|
|
self.check_jobs()
|
|
db.session.commit()
|
|
self.flush_socketio_messages()
|
|
|
|
def buffer_socketio_message(self, event, payload, room,
|
|
msg_id=None, override_policy='replace'):
|
|
if room not in self._socketio_message_buffer:
|
|
self._socketio_message_buffer[room] = {}
|
|
if event not in self._socketio_message_buffer[room]:
|
|
self._socketio_message_buffer[room][event] = {}
|
|
if msg_id is None:
|
|
msg_id = len(self._socketio_message_buffer[room][event].keys())
|
|
if override_policy == 'append':
|
|
if msg_id in self._socketio_message_buffer[room][event]:
|
|
# If the current message value isn't a list, convert it!
|
|
if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa
|
|
self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa
|
|
else:
|
|
self._socketio_message_buffer[room][event][msg_id] = []
|
|
self._socketio_message_buffer[room][event][msg_id].append(payload)
|
|
elif override_policy == 'replace':
|
|
self._socketio_message_buffer[room][event][msg_id] = payload
|
|
else:
|
|
raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa
|
|
return msg_id
|
|
|
|
def buffer_user_patch_operation(self, ressource, patch_operation):
|
|
self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id),
|
|
patch_operation,
|
|
'user_{}'.format(ressource.user_id),
|
|
msg_id='_', override_policy='append')
|
|
|
|
def clear_socketio_message_buffer(self):
|
|
self._socketio_message_buffer = {}
|
|
|
|
def flush_socketio_messages(self):
|
|
for room in self._socketio_message_buffer:
|
|
for event in self._socketio_message_buffer[room]:
|
|
for message in self._socketio_message_buffer[room][event]:
|
|
socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa
|
|
self.clear_socketio_message_buffer()
|
|
|
|
|
|
task_runner = TaskRunner()
|