chariots.workers¶
module that handles Workers in your Chariots. those allow you to: * execute pipelines in parallel * execute pipelines asynchronously (not blocking the main server process)
This module also provides a default implementation using RQ
-
class
chariots.workers.
BaseWorkerPool
[source]¶ Bases:
abc.ABC
BaseWorkerPool is the class you will need to subclass in order to make your own JobQueue system work with Chariots.
In order to do so you will need to create:
n_workers: a property that informs Chariots of the total number of workers (available as well as in use)
spawn_worker: a method that creates a new worker
execute_pipeline_async: a method that executes a pipeline inside one of this Pool’s workers.
get_pipeline_response_json_for_id: a method that to retreieve the json of the PipelineResponse if available
-
abstract
execute_pipeline_async
(pipeline: chariots.Pipeline, pipeline_input: Any, app: chariots.Chariots) → str[source]¶ method to execute a pipeline inside of a worker.
- Parameters
pipeline – the pipeline that needs to be executed inside a worker
pipeline_input – the input to be fed to the pipeline when it gets executed
app – the app that this pipeline belongs to
- Returns
the id string of the job. This id needs to correspond to the one that will get sent to BaseWorkerPool.get_pipeline_response_json_for_id
-
abstract
get_pipeline_response_json_for_id
(job_id: str) → str[source]¶ fetches the results from a pipeline that got executed inside a worker. If the results are not available (not done, execution failed, …), the PipelineResponse returned will have the corresponding job status and a None value
- Parameters
job_id – the id (as outputted from execute_pipeline_async of the job to fetch results for)
- Returns
a jsonified version of the corresponding PipelineResponse
-
abstract property
n_workers
¶ total number of workers in the pool
-
class
chariots.workers.
RQWorkerPool
(redis_kwargs: Optional[Dict[str, Any]] = None, redis: Optional[redis.client.Redis] = None, queue_kwargs: Optional[Dict[str, Any]] = None)[source]¶ Bases:
chariots.workers._base_worker_pool.BaseWorkerPool
a worker pool based on the RQ queue job queues. You will need a functionning redis to use this. This worker pool will allow you to easily paralellize you Chariots app. You can check the how to guide on workers to have more info.
To use an RQWorkerPool with your Chariots app, you can do as such.
>>> from redis import Redis >>> from chariots import workers >>> from chariots.pipelines import PipelinesServer ... ... >>> app = PipelinesServer( ... my_pipelines, ... op_store_client=op_store_client, ... worker_pool=workers.RQWorkerPool(redis=Redis()), ... use_workers=True, ... import_name='app' ... )
- Parameters
redis – the redis connection that will be used by RQ. overrides any redis_kwargs arguments if present
redis_kwargs – keyword arguments to be passed to the Redis classed constructor. this will only be used if the redis argument is unset
queue_kwargs – additional keyword arguments that will get passed to the rq.Queue object at init be aware that the connection and name arguments will be overridden.
-
__init__
(redis_kwargs: Optional[Dict[str, Any]] = None, redis: Optional[redis.client.Redis] = None, queue_kwargs: Optional[Dict[str, Any]] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
execute_pipeline_async
(pipeline: chariots.pipelines._pipeline.Pipeline, pipeline_input: Any, app: chariots.pipelines.pipelines_server.PipelinesServer) → str[source]¶ method to execute a pipeline inside of a worker.
- Parameters
pipeline – the pipeline that needs to be executed inside a worker
pipeline_input – the input to be fed to the pipeline when it gets executed
app – the app that this pipeline belongs to
- Returns
the id string of the job. This id needs to correspond to the one that will get sent to BaseWorkerPool.get_pipeline_response_json_for_id
-
get_pipeline_response_json_for_id
(job_id: str) → str[source]¶ fetches the results from a pipeline that got executed inside a worker. If the results are not available (not done, execution failed, …), the PipelineResponse returned will have the corresponding job status and a None value
- Parameters
job_id – the id (as outputted from execute_pipeline_async of the job to fetch results for)
- Returns
a jsonified version of the corresponding PipelineResponse
-
property
n_workers
¶ total number of workers in the pool