mirror of
				https://gitlab.ub.uni-bielefeld.de/sfb1288inf/nopaque.git
				synced 2025-11-04 04:12:45 +00:00 
			
		
		
		
	Use enums where appropriate. This commit includes new migrations that are NOT compatible with older nopaque instances
This commit is contained in:
		@@ -1,5 +1,5 @@
 | 
			
		||||
from app.models import Corpus, CorpusStatus
 | 
			
		||||
from flask import current_app
 | 
			
		||||
from ..models import Corpus
 | 
			
		||||
import docker
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
@@ -8,19 +8,19 @@ import shutil
 | 
			
		||||
class CheckCorporaMixin:
 | 
			
		||||
    def check_corpora(self):
 | 
			
		||||
        corpora = Corpus.query.all()
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'submitted'):
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.SUBMITTED):  # noqa
 | 
			
		||||
            self.create_build_corpus_service(corpus)
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'queued' or x.status == 'running'):  # noqa
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.QUEUED or x.status == CorpusStatus.BUILDING):  # noqa
 | 
			
		||||
            self.checkout_build_corpus_service(corpus)
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'prepared' and x.num_analysis_sessions > 0):  # noqa
 | 
			
		||||
            corpus.status = 'start analysis'
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'analysing' and x.num_analysis_sessions == 0):  # noqa
 | 
			
		||||
            corpus.status = 'stop analysis'
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'analysing'):
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.BUILT and x.num_analysis_sessions > 0):  # noqa
 | 
			
		||||
            corpus.status = CorpusStatus.STARTING_ANALYSIS_SESSION
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION and x.num_analysis_sessions == 0):  # noqa
 | 
			
		||||
            corpus.status = CorpusStatus.CANCELING_ANALYSIS_SESSION
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.RUNNING_ANALYSIS_SESSION):  # noqa
 | 
			
		||||
            self.checkout_analysing_corpus_container(corpus)
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'start analysis'):
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.STARTING_ANALYSIS_SESSION):  # noqa
 | 
			
		||||
            self.create_cqpserver_container(corpus)
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == 'stop analysis'):
 | 
			
		||||
        for corpus in (x for x in corpora if x.status == CorpusStatus.CANCELING_ANALYSIS_SESSION):  # noqa
 | 
			
		||||
            self.remove_cqpserver_container(corpus)
 | 
			
		||||
 | 
			
		||||
    def create_build_corpus_service(self, corpus):
 | 
			
		||||
@@ -95,7 +95,7 @@ class CheckCorporaMixin:
 | 
			
		||||
                f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
        corpus.status = 'queued'
 | 
			
		||||
        corpus.status = CorpusStatus.QUEUED
 | 
			
		||||
 | 
			
		||||
    def checkout_build_corpus_service(self, corpus):
 | 
			
		||||
        service_name = f'build-corpus_{corpus.id}'
 | 
			
		||||
@@ -106,7 +106,7 @@ class CheckCorporaMixin:
 | 
			
		||||
                f'Get service "{service_name}" failed '
 | 
			
		||||
                f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
            )
 | 
			
		||||
            corpus.status = 'failed'
 | 
			
		||||
            corpus.status = CorpusStatus.FAILED
 | 
			
		||||
            return
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
@@ -117,22 +117,22 @@ class CheckCorporaMixin:
 | 
			
		||||
        if not service_tasks:
 | 
			
		||||
            return
 | 
			
		||||
        task_state = service_tasks[0].get('Status').get('State')
 | 
			
		||||
        if corpus.status == 'queued' and task_state != 'pending':
 | 
			
		||||
            corpus.status = 'running'
 | 
			
		||||
        if corpus.status == CorpusStatus.QUEUED and task_state != 'pending':  # noqa
 | 
			
		||||
            corpus.status = CorpusStatus.BUILDING
 | 
			
		||||
            return
 | 
			
		||||
        elif corpus.status == 'running' and task_state == 'complete':
 | 
			
		||||
            corpus.status = 'prepared'
 | 
			
		||||
        elif corpus.status == 'running' and task_state == 'failed':
 | 
			
		||||
            corpus.status = 'failed'
 | 
			
		||||
        elif corpus.status == CorpusStatus.BUILDING and task_state == 'complete':  # noqa
 | 
			
		||||
            corpus.status = CorpusStatus.BUILT
 | 
			
		||||
        elif corpus.status == CorpusStatus.BUILDING and task_state == 'failed':  # noqa
 | 
			
		||||
            corpus.status = CorpusStatus.FAILED
 | 
			
		||||
        else:
 | 
			
		||||
            return
 | 
			
		||||
        # try:
 | 
			
		||||
        #     service.remove()
 | 
			
		||||
        # except docker.errors.APIError as e:
 | 
			
		||||
        #     current_app.logger.error(
 | 
			
		||||
        #         f'Remove service "{service_name}" failed '
 | 
			
		||||
        #         f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
        #     )
 | 
			
		||||
        try:
 | 
			
		||||
            service.remove()
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
                f'Remove service "{service_name}" failed '
 | 
			
		||||
                f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def create_cqpserver_container(self, corpus):
 | 
			
		||||
        ''' # Docker container settings # '''
 | 
			
		||||
@@ -203,7 +203,7 @@ class CheckCorporaMixin:
 | 
			
		||||
                f'Run container "{name}" failed '
 | 
			
		||||
                f'due to "docker.errors.ImageNotFound" error: {e}'
 | 
			
		||||
            )
 | 
			
		||||
            corpus.status = 'failed'
 | 
			
		||||
            corpus.status = CorpusStatus.FAILED
 | 
			
		||||
            return
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
@@ -211,7 +211,7 @@ class CheckCorporaMixin:
 | 
			
		||||
                f'due to "docker.errors.APIError" error: {e}'
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
        corpus.status = 'analysing'
 | 
			
		||||
        corpus.status = CorpusStatus.RUNNING_ANALYSIS_SESSION
 | 
			
		||||
 | 
			
		||||
    def checkout_analysing_corpus_container(self, corpus):
 | 
			
		||||
        container_name = f'cqpserver_{corpus.id}'
 | 
			
		||||
@@ -223,7 +223,7 @@ class CheckCorporaMixin:
 | 
			
		||||
                f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
            )
 | 
			
		||||
            corpus.num_analysis_sessions = 0
 | 
			
		||||
            corpus.status = 'prepared'
 | 
			
		||||
            corpus.status = CorpusStatus.BUILT
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
                f'Get container "{container_name}" failed '
 | 
			
		||||
@@ -235,7 +235,7 @@ class CheckCorporaMixin:
 | 
			
		||||
        try:
 | 
			
		||||
            container = self.docker.containers.get(container_name)
 | 
			
		||||
        except docker.errors.NotFound:
 | 
			
		||||
            corpus.status = 'prepared'
 | 
			
		||||
            corpus.status = CorpusStatus.BUILT
 | 
			
		||||
            return
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,8 @@
 | 
			
		||||
from app import db
 | 
			
		||||
from app.models import Job, JobResult, JobStatus, TesseractOCRModel
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from flask import current_app
 | 
			
		||||
from werkzeug.utils import secure_filename
 | 
			
		||||
from .. import db
 | 
			
		||||
from ..models import Job, JobResult, TesseractOCRModel
 | 
			
		||||
import docker
 | 
			
		||||
import json
 | 
			
		||||
import os
 | 
			
		||||
@@ -12,11 +12,11 @@ import shutil
 | 
			
		||||
class CheckJobsMixin:
 | 
			
		||||
    def check_jobs(self):
 | 
			
		||||
        jobs = Job.query.all()
 | 
			
		||||
        for job in (x for x in jobs if x.status == 'submitted'):
 | 
			
		||||
        for job in (x for x in jobs if x.status == JobStatus.SUBMITTED):
 | 
			
		||||
            self.create_job_service(job)
 | 
			
		||||
        for job in (x for x in jobs if x.status in ['queued', 'running']):
 | 
			
		||||
        for job in (x for x in jobs if x.status in [JobStatus.QUEUED, JobStatus.RUNNING]):  # noqa
 | 
			
		||||
            self.checkout_job_service(job)
 | 
			
		||||
        for job in (x for x in jobs if x.status == 'canceling'):
 | 
			
		||||
        for job in (x for x in jobs if x.status == JobStatus.CANCELING):
 | 
			
		||||
            self.remove_job_service(job)
 | 
			
		||||
 | 
			
		||||
    def create_job_service(self, job):
 | 
			
		||||
@@ -74,7 +74,7 @@ class CheckJobsMixin:
 | 
			
		||||
            service_args = json.loads(job.service_args)
 | 
			
		||||
            model = TesseractOCRModel.query.get(service_args['model'])
 | 
			
		||||
            if model is None:
 | 
			
		||||
                job.status = 'failed'
 | 
			
		||||
                job.status = JobStatus.FAILED
 | 
			
		||||
                return
 | 
			
		||||
            models_mount_source = model.path
 | 
			
		||||
            models_mount_target = f'/usr/local/share/tessdata/{model.filename}'
 | 
			
		||||
@@ -122,7 +122,7 @@ class CheckJobsMixin:
 | 
			
		||||
                f'due to "docker.errors.APIError": {e}'
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
        job.status = 'queued'
 | 
			
		||||
        job.status = JobStatus.QUEUED
 | 
			
		||||
 | 
			
		||||
    def checkout_job_service(self, job):
 | 
			
		||||
        service_name = f'job_{job.id}'
 | 
			
		||||
@@ -133,7 +133,7 @@ class CheckJobsMixin:
 | 
			
		||||
                f'Get service "{service_name}" failed '
 | 
			
		||||
                f'due to "docker.errors.NotFound": {e}'
 | 
			
		||||
            )
 | 
			
		||||
            job.status = 'failed'
 | 
			
		||||
            job.status = JobStatus.FAILED
 | 
			
		||||
            return
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
@@ -145,11 +145,11 @@ class CheckJobsMixin:
 | 
			
		||||
        if not service_tasks:
 | 
			
		||||
            return
 | 
			
		||||
        task_state = service_tasks[0].get('Status').get('State')
 | 
			
		||||
        if job.status == 'queued' and task_state != 'pending':
 | 
			
		||||
            job.status = 'running'
 | 
			
		||||
        if job.status == JobStatus.QUEUED and task_state != 'pending':
 | 
			
		||||
            job.status = JobStatus.RUNNING
 | 
			
		||||
            return
 | 
			
		||||
        elif job.status == 'running' and task_state == 'complete':
 | 
			
		||||
            job.status = 'complete'
 | 
			
		||||
        elif job.status == JobStatus.RUNNING and task_state == 'complete':  # noqa
 | 
			
		||||
            job.status = JobStatus.COMPLETED
 | 
			
		||||
            results_dir = os.path.join(job.path, 'results')
 | 
			
		||||
            with open(os.path.join(results_dir, 'outputs.json')) as f:
 | 
			
		||||
                outputs = json.load(f)
 | 
			
		||||
@@ -169,8 +169,8 @@ class CheckJobsMixin:
 | 
			
		||||
                    os.path.join(results_dir, output['file']),
 | 
			
		||||
                    job_result.path
 | 
			
		||||
                )
 | 
			
		||||
        elif job.status == 'running' and task_state == 'failed':
 | 
			
		||||
            job.status = 'failed'
 | 
			
		||||
        elif job.status == JobStatus.RUNNING and task_state == 'failed':
 | 
			
		||||
            job.status = JobStatus.FAILED
 | 
			
		||||
        else:
 | 
			
		||||
            return
 | 
			
		||||
        job.end_date = datetime.utcnow()
 | 
			
		||||
@@ -187,7 +187,7 @@ class CheckJobsMixin:
 | 
			
		||||
        try:
 | 
			
		||||
            service = self.docker.services.get(service_name)
 | 
			
		||||
        except docker.errors.NotFound:
 | 
			
		||||
            job.status = 'canceled'
 | 
			
		||||
            job.status = JobStatus.CANCELED
 | 
			
		||||
            return
 | 
			
		||||
        except docker.errors.APIError as e:
 | 
			
		||||
            current_app.logger.error(
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user