Source code for chariots.workers._base_worker_pool

"""module that provides base classes for the worker API"""
from abc import abstractmethod, ABC
from enum import Enum
from typing import Any

import rq


[docs]class JobStatus(Enum): """ enum of all the possible states a job can be in. """ queued = 'queued' running = 'running' done = 'done' failed = 'failed' deferred = 'deferred'
[docs] @classmethod def from_rq(cls, status: rq.job.JobStatus) -> 'JobStatus': """ Translates an RQ Job status into a Chariots JobStatus """ if status == rq.job.JobStatus.QUEUED: # pylint: disable = no-member return cls.queued if status == rq.job.JobStatus.FINISHED: # pylint: disable = no-member return cls.done if status == rq.job.JobStatus.FAILED: # pylint: disable = no-member return cls.failed if status == rq.job.JobStatus.STARTED: # pylint: disable = no-member return cls.running if status == rq.job.JobStatus.DEFFERED: # pylint: disable = no-member return cls.deferred raise ValueError('unknown job status: {}'.format(status))
[docs]class BaseWorkerPool(ABC): """ `BaseWorkerPool` is the class you will need to subclass in order to make your own JobQueue system work with Chariots. In order to do so you will need to create: * n_workers: a property that informs Chariots of the total number of workers (available as well as in use) * spawn_worker: a method that creates a new worker * execute_pipeline_async: a method that executes a pipeline inside one of this Pool's workers. * get_pipeline_response_json_for_id: a method that to retreieve the json of the `PipelineResponse` if available """ @property @abstractmethod def n_workers(self): """total number of workers in the pool"""
[docs] @abstractmethod def spawn_worker(self): """create a new worker in the pool"""
[docs] @abstractmethod def execute_pipeline_async(self, pipeline: 'chariots.Pipeline', pipeline_input: Any, app: 'chariots.Chariots') -> str: """ method to execute a pipeline inside of a worker. :param pipeline: the pipeline that needs to be executed inside a worker :param pipeline_input: the input to be fed to the pipeline when it gets executed :param app: the app that this pipeline belongs to :return: the id string of the job. This id needs to correspond to the one that will get sent to `BaseWorkerPool.get_pipeline_response_json_for_id` """
[docs] @abstractmethod def get_pipeline_response_json_for_id(self, job_id: str) -> str: """ fetches the results from a pipeline that got executed inside a worker. If the results are not available (not done, execution failed, ...), the `PipelineResponse` returned will have the corresponding job status and a None value :param job_id: the id (as outputted from `execute_pipeline_async` of the job to fetch results for) :return: a jsonified version of the corresponding `PipelineResponse` """