diff --git a/app/corpora/events.py b/app/corpora/events.py index 323bab74..fd228593 100644 --- a/app/corpora/events.py +++ b/app/corpora/events.py @@ -32,18 +32,18 @@ def init_corpus_analysis(corpus_id): response = {'code': 404, 'desc': None, 'msg': 'Not Found'} socketio.emit('corpus_analysis_init', response, room=request.sid) return - if not (corpus.creator == current_user or current_user.is_administrator()): # noqa + if not (corpus.creator == current_user or current_user.is_administrator()): response = {'code': 403, 'desc': None, 'msg': 'Forbidden'} socketio.emit('corpus_analysis_init', response, room=request.sid) return if corpus.status not in ['prepared', 'start analysis', 'analysing']: - response = {'code': 424, 'desc': 'Corpus status is not "prepared", "start analysis" or "analying"', 'msg': 'Failed Dependency'} # noqa + response = {'code': 424, 'desc': None, 'msg': 'Failed Dependency'} socketio.emit('corpus_analysis_init', response, room=request.sid) return if corpus.status == 'prepared': corpus.status = 'start analysis' db.session.commit() - event = 'user_{}_patch'.format(current_user.id) + event = 'user_{}_patch'.format(corpus.user_id) jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa room = 'user_{}'.format(corpus.user_id) socketio.emit(event, jsonpatch, room=room) @@ -53,6 +53,14 @@ def init_corpus_analysis(corpus_id): def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): + def change_corpus_status(status): + corpus.status = status + db.session.commit() + event = 'user_{}_patch'.format(corpus.user_id) + jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa + room = 'user_{}'.format(corpus.user_id) + socketio.emit(event, jsonpatch, room=room) + with app.app_context(): ''' Setup analysis session ''' corpus = Corpus.query.get(corpus_id) @@ -61,8 +69,10 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): db.session.refresh(corpus) retry_counter -= 1 if retry_counter == 0: - response = {'code': 408, 'desc': 'Corpus analysis session took to long to start', 'msg': 'Request Timeout'} # noqa + response = {'code': 408, 'desc': None, 'msg': 'Request Timeout'} # noqa socketio.emit('corpus_analysis_init', response, room=request.sid) # noqa + change_corpus_status('stop analysis') + return socketio.sleep(3) client = cqi.CQiClient('cqpserver_{}'.format(corpus_id)) try: @@ -70,10 +80,12 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): payload = {'code': connect_status, 'msg': cqi.api.specification.lookup[connect_status]} # noqa except cqi.errors.CQiException as e: handle_cqi_exception('corpus_analysis_init', e, session_id) + change_corpus_status('stop analysis') return except gaierror: response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error'} # noqa socketio.emit('corpus_analysis_init', response, room=session_id) + change_corpus_status('stop analysis') return corpus_analysis_clients[session_id] = client if corpus_id in corpus_analysis_sessions: @@ -99,12 +111,7 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): corpus_analysis_sessions[corpus_id].remove(session_id) if not corpus_analysis_sessions[corpus_id]: corpus_analysis_sessions.pop(corpus_id, None) - corpus.status = 'stop analysis' - db.session.commit() - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) + change_corpus_status('stop analysis') @socketio.on('corpus_analysis_meta_data') diff --git a/app/corpora/events2.py b/app/corpora/events2.py deleted file mode 100644 index 71f992c4..00000000 --- a/app/corpora/events2.py +++ /dev/null @@ -1,363 +0,0 @@ -from datetime import datetime -from flask import current_app, request -from flask_login import current_user -from socket import gaierror -from .. import db, socketio -from ..decorators import socketio_login_required -from ..events import socketio_sessions -from ..models import Corpus, User -import cqi -import math -import os -import shutil -import logging - - -''' -' A dictionary containing lists of, with corpus ids associated, Socket.IO -' session ids (sid). {: [, ...], ...} -''' -corpus_analysis_sessions = {} -''' -' A dictionary containing Socket.IO session id - CQi client pairs. -' {: CQiClient, ...} -''' -corpus_analysis_clients = {} - - -@socketio.on('corpus_analysis_init') -@socketio_login_required -def init_corpus_analysis(corpus_id): - corpus = Corpus.query.get(corpus_id) - if corpus is None: - response = {'code': 404, 'desc': None, 'msg': 'Not Found'} - socketio.emit('corpus_analysis_init', response, room=request.sid) - return - if not (corpus.creator == current_user or current_user.is_administrator()): # noqa - response = {'code': 403, 'desc': None, 'msg': 'Forbidden'} - socketio.emit('corpus_analysis_init', response, room=request.sid) - return - if corpus.status not in ['prepared', 'start analysis', 'analysing']: - response = {'code': 424, 'desc': 'Corpus status is not "prepared", "start analysis" or "analying"', 'msg': 'Failed Dependency'} # noqa - socketio.emit('corpus_analysis_init', response, room=request.sid) - return - if corpus.status == 'prepared': - corpus.status = 'start analysis' - db.session.commit() - event = 'user_{}_patch'.format(current_user.id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) - socketio.start_background_task(corpus_analysis_session_handler, - current_app._get_current_object(), - corpus_id, current_user.id, request.sid) - - -def corpus_analysis_session_handler(app, corpus_id, user_id, session_id): - with app.app_context(): - ''' Setup analysis session ''' - corpus = Corpus.query.get(corpus_id) - retry_counter = 15 - while corpus.status != 'analysing': - db.session.refresh(corpus) - retry_counter -= 1 - if retry_counter == 0: - response = {'code': 408, 'desc': 'Corpus analysis session took to long to start', 'msg': 'Request Timeout'} # noqa - socketio.emit('corpus_analysis_init', response, room=request.sid) # noqa - socketio.sleep(3) - client = cqi.CQiClient('cqpserver_{}'.format(corpus_id)) - try: - connect_status = client.connect() - payload = {'code': connect_status, 'msg': cqi.api.specification.lookup[connect_status]} # noqa - except cqi.errors.CQiException as e: - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, - 'msg': 'Internal Server Error', 'payload': payload} - socketio.emit('corpus_analysis_init', response, room=session_id) - return - except gaierror: - response = {'code': 500, 'desc': None, - 'msg': 'Internal Server Error'} - socketio.emit('corpus_analysis_init', response, room=session_id) - return - corpus_analysis_clients[session_id] = client - if corpus_id in corpus_analysis_sessions: - corpus_analysis_sessions[corpus_id].append(session_id) - else: - corpus_analysis_sessions[corpus_id] = [session_id] - client.status = 'ready' - response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} - socketio.emit('corpus_analysis_init', response, room=session_id) - ''' Observe analysis session ''' - while session_id in socketio_sessions: - socketio.sleep(3) - ''' Teardown analysis session ''' - if client.status == 'running': - client.status = 'abort' - while client.status != 'ready': - socketio.sleep(0.1) - try: - client.disconnect() - except cqi.errors.CQiException: - pass - corpus_analysis_clients.pop(session_id, None) - corpus_analysis_sessions[corpus_id].remove(session_id) - if not corpus_analysis_sessions[corpus_id]: - corpus_analysis_sessions.pop(corpus_id, None) - corpus.status = 'stop analysis' - db.session.commit() - event = 'user_{}_patch'.format(corpus.user_id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}] # noqa - room = 'user_{}'.format(corpus.user_id) - socketio.emit(event, jsonpatch, room=room) - - -@socketio.on('corpus_analysis_meta_data') -@socketio_login_required -def corpus_analysis_get_meta_data(corpus_id): - # get meta data from db - db_corpus = Corpus.query.get(corpus_id) - metadata = {} - metadata['corpus_name'] = db_corpus.title - metadata['corpus_description'] = db_corpus.description - metadata['corpus_creation_date'] = db_corpus.creation_date.isoformat() - metadata['corpus_last_edited_date'] = \ - db_corpus.last_edited_date.isoformat() - client = corpus_analysis_clients.get(request.sid) - if client is None: - response = {'code': 424, 'desc': 'No client found for this session', - 'msg': 'Failed Dependency'} - socketio.emit('corpus_analysis_meta_data', response, room=request.sid) - return - # check if client is busy or not - if client.status == 'running': - client.status = 'abort' - while client.status != 'ready': - socketio.sleep(0.1) - # get meta data from corpus in cqp server - client.status = 'running' - try: - client_corpus = client.corpora.get('CORPUS') - metadata['corpus_properties'] = client_corpus.attrs['properties'] - metadata['corpus_size_tokens'] = client_corpus.attrs['size'] - - text_attr = client_corpus.structural_attributes.get('text') - struct_attrs = client_corpus.structural_attributes.list( - filters={'part_of': text_attr}) - text_ids = range(0, (text_attr.attrs['size'])) - texts_metadata = {} - for text_id in text_ids: - texts_metadata[text_id] = {} - for struct_attr in struct_attrs: - texts_metadata[text_id][struct_attr.attrs['name'][(len(text_attr.attrs['name']) + 1):]] = struct_attr.values_by_ids(list(range(struct_attr.attrs['size'])))[text_id] # noqa - metadata['corpus_all_texts'] = texts_metadata - metadata['corpus_analysis_date'] = datetime.utcnow().isoformat() - metadata['corpus_cqi_py_protocol_version'] = client.api.version - metadata['corpus_cqi_py_package_version'] = cqi.__version__ - # TODO: make this dynamically - metadata['corpus_cqpserver_version'] = 'CQPserver v3.4.22' - - # write some metadata to the db - db_corpus.current_nr_of_tokens = metadata['corpus_size_tokens'] - db.session.commit() - event = 'user_{}_patch'.format(db_corpus.user_id) - jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/current_nr_of_tokens'.format(db_corpus.id), 'value': db_corpus.current_nr_of_tokens}] # noqa - room = 'user_{}'.format(db_corpus.user_id) - socketio.emit(event, jsonpatch, room=room) - - # emit data - payload = metadata - response = {'code': 200, 'desc': 'Corpus meta data', 'msg': 'OK', - 'payload': payload} - socketio.emit('corpus_analysis_meta_data', response, room=request.sid) - except cqi.errors.CQiException as e: - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', - 'payload': payload} - socketio.emit('corpus_analysis_meta_data', response, room=request.sid) - client.status = 'ready' - - -@socketio.on('corpus_analysis_query') -@socketio_login_required -def corpus_analysis_query(query): - client = corpus_analysis_clients.get(request.sid) - if client is None: - response = {'code': 424, 'desc': 'No client found for this session', - 'msg': 'Failed Dependency'} - socketio.emit('corpus_analysis_query', response, room=request.sid) - return - if client.status == 'running': - client.status = 'abort' - while client.status != 'ready': - socketio.sleep(0.1) - client.status = 'running' - try: - corpus = client.corpora.get('CORPUS') - query_status = corpus.query(query) - results = corpus.subcorpora.get('Results') - except cqi.errors.CQiException as e: - client.status = 'ready' - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', - 'payload': payload} - socketio.emit('corpus_analysis_query', response, room=request.sid) - return - payload = {'status': query_status, - 'msg': cqi.api.specification.lookup[query_status], - 'match_count': results.attrs['size']} - response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} - socketio.emit('corpus_analysis_query', response, room=request.sid) - chunk_size = 100 - chunk_start = 0 - context = 50 - progress = 0 - while chunk_start <= results.attrs['size']: - if client.status == 'abort': - break - try: - chunk = results.export(context=context, cutoff=chunk_size, offset=chunk_start) # noqa - except cqi.errors.CQiException as e: - client.status = 'ready' - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', - 'payload': payload} - socketio.emit('corpus_analysis_query', response, room=request.sid) - return - if (results.attrs['size'] == 0): - progress = 100 - else: - progress = ((chunk_start + chunk_size) / results.attrs['size']) * 100 # noqa - progress = min(100, int(math.ceil(progress))) - response = {'code': 200, 'desc': None, 'msg': 'OK', - 'payload': {'chunk': chunk, 'progress': progress}} - socketio.emit('corpus_analysis_query_results', response, - room=request.sid) - chunk_start += chunk_size - client.status = 'ready' - - -@socketio.on('corpus_analysis_inspect_match') -@socketio_login_required -def corpus_analysis_inspect_match(payload): - client = corpus_analysis_clients.get(request.sid) - if client is None: - response = {'code': 424, 'desc': 'No client found for this session', - 'msg': 'Failed Dependency'} - socketio.emit('corpus_analysis_inspect_match', response, room=request.sid) # noqa - return - match_id = payload['match_id'] - if client.status == 'running': - client.status = 'abort' - while client.status != 'ready': - socketio.sleep(0.1) - client.status = 'running' - try: - corpus = client.corpora.get('CORPUS') - results = corpus.subcorpora.get('Results') - except cqi.errors.CQiException as e: - client.status = 'ready' - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', - 'payload': payload} - socketio.emit('corpus_analysis_inspect_match', response, room=request.sid) # noqa - return - context = 200 - try: - payload = results.export(context=context, cutoff=1, offset=match_id) - except cqi.errors.CQiException as e: - client.status = 'ready' - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error', - 'payload': payload} - socketio.emit('corpus_analysis_inspect_match', response, room=request.sid) # noqa - return - response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload} - socketio.emit('corpus_analysis_inspect_match', response, room=request.sid) - client.status = 'ready' - - - -@socketio.on('corpus_analysis_get_match_with_full_context') -@socketio_login_required -def corpus_analysis_get_match_with_full_context(payload): - type = payload['type'] - data_indexes = payload['data_indexes'] - first_cpos = payload['first_cpos'] - last_cpos = payload['last_cpos'] - client = corpus_analysis_clients.get(request.sid) - if client is None: - response = {'code': 424, 'desc': 'No client found for this session', - 'msg': 'Failed Dependency'} - socketio.emit('corpus_analysis_get_match_with_full_context', response, - room=request.sid) - return - if client.status == 'running': - client.status = 'abort' - while client.status != 'ready': - socketio.sleep(0.1) - client.status = 'running' - try: - corpus = client.corpora.get('CORPUS') - s = corpus.structural_attributes.get('s') - payload = {} - payload['matches'] = [] - payload['cpos_lookup'] = {} - payload['text_lookup'] = {} - payload['progress'] = 0 - i = 0 - # Send data one match at a time. - for index, f_cpos, l_cpos in zip(data_indexes, first_cpos, last_cpos): - i += 1 - tmp_match = s.export(f_cpos, l_cpos, context=10) - payload['matches'].append(tmp_match['matches'][0]) - payload['cpos_lookup'].update(tmp_match['cpos_lookup']) - payload['text_lookup'].update(tmp_match['text_lookup']) - payload['progress'] = i/len(data_indexes)*100 - response = {'code': 200, - 'desc': None, - 'msg': 'OK', - 'payload': payload, - 'type': type, - 'data_indexes': data_indexes} - socketio.emit('corpus_analysis_get_match_with_full_context', - response, room=request.sid) - payload['matches'] = [] - payload['cpos_lookup'] = {} - payload['text_lookup'] = {} - except cqi.errors.CQiException as e: - payload = {'code': e.code, 'desc': e.description, 'msg': e.name} - response = {'code': 500, - 'desc': None, - 'msg': 'Internal Server Error', - 'payload': payload, - 'type': type, - 'data_indexes': data_indexes} - socketio.emit('corpus_analysis_get_match_with_full_context', - response, - room=request.sid) - client.status = 'ready' - - -@socketio.on('export_corpus') -@socketio_login_required -def export_corpus(corpus_id): - corpus = Corpus.query.get(corpus_id) - if corpus is None: - response = {'code': 404, 'msg': 'Not found'} - socketio.emit('export_corpus', response, room=request.sid) - return - if corpus.status != 'prepared': - response = {'code': 412, 'msg': 'Precondition Failed'} - socketio.emit('export_corpus', response, room=request.sid) - return - # delete old corpus archive if it exists/has been build before - if corpus.archive_file is not None and os.path.isfile(corpus.archive_file): - os.remove(corpus.archive_file) - zip_name = corpus.title - zip_path = os.path.join(current_user.path, 'corpora', zip_name) - corpus.archive_file = os.path.join(corpus.path, zip_name) + '.zip' - db.session.commit() - shutil.make_archive(zip_path, 'zip', corpus.path) - shutil.move(zip_path + '.zip', corpus.archive_file) - socketio.emit('export_corpus_' + str(corpus.id), room=request.sid)