mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-03 20:02:47 +00:00 
			
		
		
		
	Do not manually emit corpus changes via socket.
This commit is contained in:
		@@ -2,6 +2,7 @@ from datetime import datetime
 | 
			
		||||
from flask import current_app, request
 | 
			
		||||
from flask_login import current_user
 | 
			
		||||
from socket import gaierror
 | 
			
		||||
from werkzeug.utils import secure_filename
 | 
			
		||||
from .. import db, socketio
 | 
			
		||||
from ..decorators import socketio_login_required
 | 
			
		||||
from ..socketio_events import socketio_sessions
 | 
			
		||||
@@ -43,24 +44,12 @@ def init_corpus_analysis(corpus_id):
 | 
			
		||||
    if corpus.status == 'prepared':
 | 
			
		||||
        corpus.status = 'start analysis'
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
        event = 'user_{}_patch'.format(corpus.user_id)
 | 
			
		||||
        jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
 | 
			
		||||
        room = 'user_{}'.format(corpus.user_id)
 | 
			
		||||
        socketio.emit(event, jsonpatch, room=room)
 | 
			
		||||
    socketio.start_background_task(corpus_analysis_session_handler,
 | 
			
		||||
                                   current_app._get_current_object(),
 | 
			
		||||
                                   corpus_id, current_user.id, request.sid)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
 | 
			
		||||
    def change_corpus_status(status):
 | 
			
		||||
        corpus.status = status
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
        event = 'user_{}_patch'.format(corpus.user_id)
 | 
			
		||||
        jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/status'.format(corpus.id), 'value': corpus.status}]  # noqa
 | 
			
		||||
        room = 'user_{}'.format(corpus.user_id)
 | 
			
		||||
        socketio.emit(event, jsonpatch, room=room)
 | 
			
		||||
 | 
			
		||||
    with app.app_context():
 | 
			
		||||
        ''' Setup analysis session '''
 | 
			
		||||
        corpus = Corpus.query.get(corpus_id)
 | 
			
		||||
@@ -71,7 +60,8 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
 | 
			
		||||
            if retry_counter == 0:
 | 
			
		||||
                response = {'code': 408, 'desc': None, 'msg': 'Request Timeout'}  # noqa
 | 
			
		||||
                socketio.emit('corpus_analysis_init', response, room=request.sid)  # noqa
 | 
			
		||||
                change_corpus_status('stop analysis')
 | 
			
		||||
                corpus.status = 'stop analysis'
 | 
			
		||||
                db.session.commit()
 | 
			
		||||
                return
 | 
			
		||||
            socketio.sleep(3)
 | 
			
		||||
        client = cqi.CQiClient('cqpserver_{}'.format(corpus_id))
 | 
			
		||||
@@ -80,12 +70,14 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
 | 
			
		||||
            payload = {'code': connect_status, 'msg': cqi.api.specification.lookup[connect_status]}  # noqa
 | 
			
		||||
        except cqi.errors.CQiException as e:
 | 
			
		||||
            handle_cqi_exception('corpus_analysis_init', e, session_id)
 | 
			
		||||
            change_corpus_status('stop analysis')
 | 
			
		||||
            corpus.status = 'stop analysis'
 | 
			
		||||
            db.session.commit()
 | 
			
		||||
            return
 | 
			
		||||
        except gaierror:
 | 
			
		||||
            response = {'code': 500, 'desc': None, 'msg': 'Internal Server Error'}  # noqa
 | 
			
		||||
            socketio.emit('corpus_analysis_init', response, room=session_id)
 | 
			
		||||
            change_corpus_status('stop analysis')
 | 
			
		||||
            corpus.status = 'stop analysis'
 | 
			
		||||
            db.session.commit()
 | 
			
		||||
            return
 | 
			
		||||
        corpus_analysis_clients[session_id] = client
 | 
			
		||||
        if corpus_id in corpus_analysis_sessions:
 | 
			
		||||
@@ -102,7 +94,7 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
 | 
			
		||||
        if client.status == 'running':
 | 
			
		||||
            client.status = 'abort'
 | 
			
		||||
            while client.status != 'ready':
 | 
			
		||||
                socketio.sleep(0.1)
 | 
			
		||||
                socketio.sleep(0.3)
 | 
			
		||||
        try:
 | 
			
		||||
            client.disconnect()
 | 
			
		||||
        except cqi.errors.CQiException:
 | 
			
		||||
@@ -111,7 +103,8 @@ def corpus_analysis_session_handler(app, corpus_id, user_id, session_id):
 | 
			
		||||
        corpus_analysis_sessions[corpus_id].remove(session_id)
 | 
			
		||||
        if not corpus_analysis_sessions[corpus_id]:
 | 
			
		||||
            corpus_analysis_sessions.pop(corpus_id, None)
 | 
			
		||||
            change_corpus_status('stop analysis')
 | 
			
		||||
            corpus.status = 'stop analysis'
 | 
			
		||||
            db.session.commit()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@socketio.on('corpus_analysis_meta_data')
 | 
			
		||||
@@ -127,24 +120,23 @@ def corpus_analysis_get_meta_data(corpus_id):
 | 
			
		||||
        db_corpus.last_edited_date.isoformat()
 | 
			
		||||
    client = corpus_analysis_clients.get(request.sid)
 | 
			
		||||
    if client is None:
 | 
			
		||||
        response = {'code': 424, 'desc': 'No client found for this session',
 | 
			
		||||
                    'msg': 'Failed Dependency'}
 | 
			
		||||
        response = {'code': 424, 'desc': None, 'msg': 'Failed Dependency'}
 | 
			
		||||
        socketio.emit('corpus_analysis_meta_data', response, room=request.sid)
 | 
			
		||||
        return
 | 
			
		||||
    # check if client is busy or not
 | 
			
		||||
    if client.status == 'running':
 | 
			
		||||
        client.status = 'abort'
 | 
			
		||||
        while client.status != 'ready':
 | 
			
		||||
            socketio.sleep(0.1)
 | 
			
		||||
            socketio.sleep(0.3)
 | 
			
		||||
    # get meta data from corpus in cqp server
 | 
			
		||||
    client.status = 'running'
 | 
			
		||||
    try:
 | 
			
		||||
        client_corpus = client.corpora.get('CORPUS')
 | 
			
		||||
        metadata['corpus_properties'] = client_corpus.attrs['properties']
 | 
			
		||||
        metadata['corpus_size_tokens'] = client_corpus.attrs['size']
 | 
			
		||||
        cwb_corpus = client.corpora.get('CORPUS')
 | 
			
		||||
        metadata['corpus_properties'] = cwb_corpus.attrs['properties']
 | 
			
		||||
        metadata['corpus_size_tokens'] = cwb_corpus.attrs['size']
 | 
			
		||||
 | 
			
		||||
        text_attr = client_corpus.structural_attributes.get('text')
 | 
			
		||||
        struct_attrs = client_corpus.structural_attributes.list(
 | 
			
		||||
        text_attr = cwb_corpus.structural_attributes.get('text')
 | 
			
		||||
        struct_attrs = cwb_corpus.structural_attributes.list(
 | 
			
		||||
            filters={'part_of': text_attr})
 | 
			
		||||
        text_ids = range(0, (text_attr.attrs['size']))
 | 
			
		||||
        texts_metadata = {}
 | 
			
		||||
@@ -162,15 +154,10 @@ def corpus_analysis_get_meta_data(corpus_id):
 | 
			
		||||
        # write some metadata to the db
 | 
			
		||||
        db_corpus.current_nr_of_tokens = metadata['corpus_size_tokens']
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
        event = 'user_{}_patch'.format(db_corpus.user_id)
 | 
			
		||||
        jsonpatch = [{'op': 'replace', 'path': '/corpora/{}/current_nr_of_tokens'.format(db_corpus.id), 'value': db_corpus.current_nr_of_tokens}]  # noqa
 | 
			
		||||
        room = 'user_{}'.format(db_corpus.user_id)
 | 
			
		||||
        socketio.emit(event, jsonpatch, room=room)
 | 
			
		||||
 | 
			
		||||
        # emit data
 | 
			
		||||
        payload = metadata
 | 
			
		||||
        response = {'code': 200, 'desc': 'Corpus meta data', 'msg': 'OK',
 | 
			
		||||
                    'payload': payload}
 | 
			
		||||
        response = {'code': 200, 'desc': None, 'msg': 'OK', 'payload': payload}
 | 
			
		||||
        socketio.emit('corpus_analysis_meta_data', response, room=request.sid)
 | 
			
		||||
    except cqi.errors.CQiException as e:
 | 
			
		||||
        payload = {'code': e.code, 'desc': e.description, 'msg': e.name}
 | 
			
		||||
@@ -185,14 +172,13 @@ def corpus_analysis_get_meta_data(corpus_id):
 | 
			
		||||
def corpus_analysis_query(query):
 | 
			
		||||
    client = corpus_analysis_clients.get(request.sid)
 | 
			
		||||
    if client is None:
 | 
			
		||||
        response = {'code': 424, 'desc': 'No client found for this session',
 | 
			
		||||
                    'msg': 'Failed Dependency'}
 | 
			
		||||
        response = {'code': 424, 'desc': None, 'msg': 'Failed Dependency'}
 | 
			
		||||
        socketio.emit('corpus_analysis_query', response, room=request.sid)
 | 
			
		||||
        return
 | 
			
		||||
    if client.status == 'running':
 | 
			
		||||
        client.status = 'abort'
 | 
			
		||||
        while client.status != 'ready':
 | 
			
		||||
            socketio.sleep(0.1)
 | 
			
		||||
            socketio.sleep(0.3)
 | 
			
		||||
    client.status = 'running'
 | 
			
		||||
    try:
 | 
			
		||||
        corpus = client.corpora.get('CORPUS')
 | 
			
		||||
@@ -248,7 +234,7 @@ def corpus_analysis_get_match_with_full_context(payload):
 | 
			
		||||
    if client.status == 'running':
 | 
			
		||||
        client.status = 'abort'
 | 
			
		||||
        while client.status != 'ready':
 | 
			
		||||
            socketio.sleep(0.1)
 | 
			
		||||
            socketio.sleep(0.3)
 | 
			
		||||
    client.status = 'running'
 | 
			
		||||
    try:
 | 
			
		||||
        corpus = client.corpora.get('CORPUS')
 | 
			
		||||
@@ -295,13 +281,15 @@ def export_corpus(corpus_id):
 | 
			
		||||
    # delete old corpus archive if it exists/has been build before
 | 
			
		||||
    if corpus.archive_file is not None and os.path.isfile(corpus.archive_file):
 | 
			
		||||
        os.remove(corpus.archive_file)
 | 
			
		||||
    zip_name = corpus.title
 | 
			
		||||
    zip_path = os.path.join(current_user.path, 'corpora', zip_name)
 | 
			
		||||
    corpus.archive_file = os.path.join(corpus.path, zip_name) + '.zip'
 | 
			
		||||
    archive_file_base_name = '[corpus]_' + secure_filename(corpus.title)
 | 
			
		||||
    corpus.archive_file = archive_file_base_name + '.zip'
 | 
			
		||||
    db.session.commit()
 | 
			
		||||
    shutil.make_archive(zip_path, 'zip', corpus.path)
 | 
			
		||||
    shutil.move(zip_path + '.zip', corpus.archive_file)
 | 
			
		||||
    socketio.emit('export_corpus_' + str(corpus.id), room=request.sid)
 | 
			
		||||
    shutil.make_archive(
 | 
			
		||||
        os.path.join(corpus.creator.path, 'corpora', archive_file_base_name),
 | 
			
		||||
        'zip',
 | 
			
		||||
        corpus.path
 | 
			
		||||
    )
 | 
			
		||||
    socketio.emit('export_corpus_{}'.format(corpus.id), room=request.sid)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def handle_cqi_exception(event, exception, room):
 | 
			
		||||
 
 | 
			
		||||
@@ -131,11 +131,12 @@ def download_corpus(corpus_id):
 | 
			
		||||
    corpus = Corpus.query.get_or_404(corpus_id)
 | 
			
		||||
    if not (corpus.creator == current_user or current_user.is_administrator()):
 | 
			
		||||
        abort(403)
 | 
			
		||||
    # TODO: Check what happens here
 | 
			
		||||
    dir = os.path.dirname(corpus.archive_file)
 | 
			
		||||
    filename = os.path.basename(corpus.archive_file)
 | 
			
		||||
    return send_from_directory(as_attachment=True, directory=dir,
 | 
			
		||||
                               filename=filename, mimetype='zip')
 | 
			
		||||
    return send_from_directory(
 | 
			
		||||
        as_attachment=True,
 | 
			
		||||
        directory=os.path.join(corpus.creator.path, 'corpora'),
 | 
			
		||||
        filename=corpus.archive_file,
 | 
			
		||||
        mimetype='zip'
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@corpora.route('/<int:corpus_id>/analyse')
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user