from .corpus_utils import CheckCorporaMixin from .job_utils import CheckJobsMixin from ..import db, socketio import docker class TaskRunner(CheckCorporaMixin, CheckJobsMixin): def __init__(self): self.docker = docker.from_env() self._socketio_message_buffer = {} def run(self): self.check_corpora() self.check_jobs() db.session.commit() self.flush_socketio_messages() def buffer_socketio_message(self, event, payload, room, msg_id=None, override_policy='replace'): if room not in self._socketio_message_buffer: self._socketio_message_buffer[room] = {} if event not in self._socketio_message_buffer[room]: self._socketio_message_buffer[room][event] = {} if msg_id is None: msg_id = len(self._socketio_message_buffer[room][event].keys()) if override_policy == 'append': if msg_id in self._socketio_message_buffer[room][event]: # If the current message value isn't a list, convert it! if not isinstance(self._socketio_message_buffer[room][event][msg_id], list): # noqa self._socketio_message_buffer[room][event][msg_id] = [self._socketio_message_buffer[room][event][msg_id]] # noqa else: self._socketio_message_buffer[room][event][msg_id] = [] self._socketio_message_buffer[room][event][msg_id].append(payload) elif override_policy == 'replace': self._socketio_message_buffer[room][event][msg_id] = payload else: raise Exception('Unknown override policy: {}'.format(override_policy)) # noqa return msg_id def buffer_user_patch_operation(self, ressource, patch_operation): self.buffer_socketio_message('user_{}_patch'.format(ressource.user_id), patch_operation, 'user_{}'.format(ressource.user_id), msg_id='_', override_policy='append') def clear_socketio_message_buffer(self): self._socketio_message_buffer = {} def flush_socketio_messages(self): for room in self._socketio_message_buffer: for event in self._socketio_message_buffer[room]: for message in self._socketio_message_buffer[room][event]: socketio.emit(event, self._socketio_message_buffer[room][event][message], room=room) # noqa self.clear_socketio_message_buffer()