from datetime import datetime from flask import current_app, request from flask_login import current_user from socket import gaierror from .. import db, socketio from ..decorators import socketio_login_required from ..events import socketio_sessions from ..models import Corpus, User import cqi import math import os import shutil import logging ''' ' A dictionary containing lists of, with corpus ids associated, Socket.IO ' session ids (sid). {: [, ...], ...} ''' corpus_analysis_sessions = {} ''' ' A dictionary containing Socket.IO session id - CQi client pairs. ' {: CQiClient, ...} ''' corpus_analysis_clients = {} @socketio.on('corpus_analysis_init') @socketio_login_required def init_corpus_analysis(corpus_id): corpus = Corpus.query.get(corpus_id) if corpus is None: response = {'code': 404, 'desc': None, 'msg': 'Not Found'} socketio.emit('corpus_analysis_init', response, room=request.sid) return if not (corpus.creator == current_user or current_user.is_administrator()): # noqa response = {'code': 403, 'desc': None, 'msg': 'Forbidden'} socketio.emit('corpus_analysis_init', response, room=request.sid) return if corpus.status not in ['prepared', 'start analysis', 'analysing']: response = {'code': 424, 'desc': 'Corpus status is not "prepared", "start analysis" or "analying"', 'msg': 'Failed Dependency'} # noqa socketio.emit('corpus_analysis_init', response, room=request.sid) return if corpus.status == 'prepared': corpus.status = 'start analysis' db.session.commit() event = 'user_{}_patch'.format(current_user.id) jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa room = 'user_{}'.format(corpus.user_id) socketio.emit(event, jsonpatch, room=room) socketio.start_background_task(corpus_analysis_session_handler, current_app._get_current_object(), corpus_id, current_user.id, request.sid) def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): with app.app_context(): ''' Setup analysis session ''' corpus = Corpus.query.get(corpus_id) retry_counter = 15 while corpus.status != 'analysing': db.session.refresh(corpus) retry_counter -= 1 if retry_counter == 0: response = {'code': 408, 'desc': 'Corpus analysis session took to long to start', 'msg': 'Request Timeout'} # noqa socketio.emit('corpus_analysis_init', response, room=request.sid) # noqa socketio.sleep(3) client = cqi.CQiClient('cqpserver_{}'.format(corpus_id)) try: connect_status = client.connect() payload = {'code': connect_status, 'msg': cqi.api.specification.lookup[connect_status]} # noqa except cqi.errors.CQiException as e: handle_cqi_exception('corpus_analysis_init', e, session_id) return except gaierror: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error'} # noqa socketio.emit('corpus_analysis_init', response, room=session_id) return corpus_analysis_clients[session_id] = client if corpus_id in corpus_analysis_sessions: corpus_analysis_sessions[corpus_id].append(session_id) else: corpus_analysis_sessions[corpus_id] = [session_id] client.status = 'ready' response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} socketio.emit('corpus_analysis_init', response, room=session_id) ''' Observe analysis session ''' while session_id in socketio_sessions: socketio.sleep(3) ''' Teardown analysis session ''' if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) try: client.disconnect() except cqi.errors.CQiException: pass corpus_analysis_clients.pop(session_id, None) corpus_analysis_sessions[corpus_id].remove(session_id) if not corpus_analysis_sessions[corpus_id]: corpus_analysis_sessions.pop(corpus_id, None) corpus.status = 'stop analysis' db.session.commit() event = 'user_{}_patch'.format(corpus.user_id) jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa room = 'user_{}'.format(corpus.user_id) socketio.emit(event, jsonpatch, room=room) @socketio.on('corpus_analysis_meta_data') @socketio_login_required def corpus_analysis_get_meta_data(corpus_id): # get meta data from db db_corpus = Corpus.query.get(corpus_id) metadata = {} metadata['corpus_name'] = db_corpus.title metadata['corpus_description'] = db_corpus.description metadata['corpus_creation_date'] = db_corpus.creation_date.isoformat() metadata['corpus_last_edited_date'] = \ db_corpus.last_edited_date.isoformat() client = corpus_analysis_clients.get(request.sid) if client is None: response = {'code': 424, 'desc': 'No client found for this session', 'msg': 'Failed Dependency'} socketio.emit('corpus_analysis_meta_data', response, room=request.sid) return # check if client is busy or not if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) # get meta data from corpus in cqp server client.status = 'running' try: client_corpus = client.corpora.get('CORPUS') metadata['corpus_properties'] = client_corpus.attrs['properties'] metadata['corpus_size_tokens'] = client_corpus.attrs['size'] text_attr = client_corpus.structural_attributes.get('text') struct_attrs = client_corpus.structural_attributes.list( filters={'part_of': text_attr}) text_ids = range(0, (text_attr.attrs['size'])) texts_metadata = {} for text_id in text_ids: texts_metadata[text_id] = {} for struct_attr in struct_attrs: texts_metadata[text_id][struct_attr.attrs['name'][(len(text_attr.attrs['name']) + 1):]] = struct_attr.values_by_ids(list(range(struct_attr.attrs['size'])))[text_id] # noqa metadata['corpus_all_texts'] = texts_metadata metadata['corpus_analysis_date'] = datetime.utcnow().isoformat() metadata['corpus_cqi_py_protocol_version'] = client.api.version metadata['corpus_cqi_py_package_version'] = cqi.__version__ # TODO: make this dynamically metadata['corpus_cqpserver_version'] = 'CQPserver v3.4.22' # write some metadata to the db db_corpus.current_nr_of_tokens = metadata['corpus_size_tokens'] db.session.commit() event = 'user_{}_patch'.format(db_corpus.user_id) jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/current_nr_of_tokens'.format(db_corpus.id), 'value': db_corpus.current_nr_of_tokens}] # noqa room = 'user_{}'.format(db_corpus.user_id) socketio.emit(event, jsonpatch, room=room) # emit data payload = metadata response = {'code': 200, 'desc': 'Corpus meta data', 'msg': 'OK', 'payload': payload} socketio.emit('corpus_analysis_meta_data', response, room=request.sid) except cqi.errors.CQiException as e: payload = {'code': e.code, 'desc': e.description, 'msg': e.name} response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': payload} socketio.emit('corpus_analysis_meta_data', response, room=request.sid) client.status = 'ready' @socketio.on('corpus_analysis_query') @socketio_login_required def corpus_analysis_query(query): client = corpus_analysis_clients.get(request.sid) if client is None: response = {'code': 424, 'desc': 'No client found for this session', 'msg': 'Failed Dependency'} socketio.emit('corpus_analysis_query', response, room=request.sid) return if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) client.status = 'running' try: corpus = client.corpora.get('CORPUS') query_status = corpus.query(query) results = corpus.subcorpora.get('Results') except cqi.errors.CQiException as e: client.status = 'ready' handle_cqi_exception('corpus_analysis_query', e, request.sid) return payload = {'status': query_status, 'msg': cqi.api.specification.lookup[query_status], 'match_count': results.attrs['size']} response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} socketio.emit('corpus_analysis_query', response, room=request.sid) chunk_size = 100 chunk_start = 0 context = 50 progress = 0 while chunk_start <= results.attrs['size']: if client.status == 'abort': break try: chunk = results.export(context=context, cutoff=chunk_size, offset=chunk_start) # noqa except cqi.errors.CQiException as e: handle_cqi_exception('corpus_analysis_query', e, request.sid) break if (results.attrs['size'] == 0): progress = 100 else: progress = ((chunk_start + chunk_size) / results.attrs['size']) * 100 # noqa progress = min(100, int(math.ceil(progress))) payload = {'chunk': chunk, 'progress': progress} response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} socketio.emit('corpus_analysis_query_results', response, room=request.sid) # noqa chunk_start += chunk_size client.status = 'ready' @socketio.on('corpus_analysis_get_match_with_full_context') @socketio_login_required def corpus_analysis_get_match_with_full_context(payload): type = payload['type'] data_indexes = payload['data_indexes'] first_cpos = payload['first_cpos'] last_cpos = payload['last_cpos'] client = corpus_analysis_clients.get(request.sid) if client is None: response = {'code': 424, 'desc': 'No client found for this session', 'msg': 'Failed Dependency'} socketio.emit('corpus_analysis_get_match_with_full_context', response, room=request.sid) return if client.status == 'running': client.status = 'abort' while client.status != 'ready': socketio.sleep(0.1) client.status = 'running' try: corpus = client.corpora.get('CORPUS') s = corpus.structural_attributes.get('s') except cqi.errors.CQiException as e: handle_cqi_exception('corpus_analysis_get_match_with_full_context', e, request.sid) # noqa return i = 0 # Send data one match at a time. for index, f_cpos, l_cpos in zip(data_indexes, first_cpos, last_cpos): i += 1 matches = [] cpos_lookup = text_lookup = {} try: tmp = s.export(f_cpos, l_cpos, context=10) except cqi.errors.CQiException as e: handle_cqi_exception('corpus_analysis_get_match_with_full_context', e, request.sid) # noqa break matches.append(tmp['matches'][0]) cpos_lookup.update(tmp['cpos_lookup']) text_lookup.update(tmp['text_lookup']) progress = i / len(data_indexes) * 100 payload = {'matches': matches, 'progress': progress, 'cpos_lookup': cpos_lookup, 'text_lookup': text_lookup} response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload, 'type': type, 'data_indexes': data_indexes} socketio.emit('corpus_analysis_get_match_with_full_context', response, room=request.sid) client.status = 'ready' @socketio.on('export_corpus') @socketio_login_required def export_corpus(corpus_id): corpus = Corpus.query.get(corpus_id) if corpus is None: response = {'code': 404, 'msg': 'Not found'} socketio.emit('export_corpus', response, room=request.sid) return if corpus.status != 'prepared': response = {'code': 412, 'msg': 'Precondition Failed'} socketio.emit('export_corpus', response, room=request.sid) return # delete old corpus archive if it exists/has been build before if corpus.archive_file is not None and os.path.isfile(corpus.archive_file): os.remove(corpus.archive_file) zip_name = corpus.title zip_path = os.path.join(current_user.path, 'corpora', zip_name) corpus.archive_file = os.path.join(corpus.path, zip_name) + '.zip' db.session.commit() shutil.make_archive(zip_path, 'zip', corpus.path) shutil.move(zip_path + '.zip', corpus.archive_file) socketio.emit('export_corpus_' + str(corpus.id), room=request.sid) def handle_cqi_exception(event, exception, room): response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', 'payload': {'code': exception.code, 'desc': exception.description, 'msg': exception.name}} socketio.emit(event, response, room=room)