chariots.workers

module that handles Workers in your Chariots. those allow you to: * execute pipelines in parallel * execute pipelines asynchronously (not blocking the main server process)

This module also provides a default implementation using RQ

class chariots.workers.BaseWorkerPool[source]

Bases: abc.ABC

BaseWorkerPool is the class you will need to subclass in order to make your own JobQueue system work with Chariots.

In order to do so you will need to create:

  • n_workers: a property that informs Chariots of the total number of workers (available as well as in use)

  • spawn_worker: a method that creates a new worker

  • execute_pipeline_async: a method that executes a pipeline inside one of this Pool’s workers.

  • get_pipeline_response_json_for_id: a method that to retreieve the json of the PipelineResponse if available

abstract execute_pipeline_async(pipeline: chariots.Pipeline, pipeline_input: Any, app: chariots.Chariots) → str[source]

method to execute a pipeline inside of a worker.

Parameters
  • pipeline – the pipeline that needs to be executed inside a worker

  • pipeline_input – the input to be fed to the pipeline when it gets executed

  • app – the app that this pipeline belongs to

Returns

the id string of the job. This id needs to correspond to the one that will get sent to BaseWorkerPool.get_pipeline_response_json_for_id

abstract get_pipeline_response_json_for_id(job_id: str) → str[source]

fetches the results from a pipeline that got executed inside a worker. If the results are not available (not done, execution failed, …), the PipelineResponse returned will have the corresponding job status and a None value

Parameters

job_id – the id (as outputted from execute_pipeline_async of the job to fetch results for)

Returns

a jsonified version of the corresponding PipelineResponse

abstract property n_workers

total number of workers in the pool

abstract spawn_worker()[source]

create a new worker in the pool

class chariots.workers.RQWorkerPool(redis_kwargs: Optional[Dict[str, Any]] = None, redis: Optional[redis.client.Redis] = None, queue_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: chariots.workers._base_worker_pool.BaseWorkerPool

a worker pool based on the RQ queue job queues. You will need a functionning redis to use this. This worker pool will allow you to easily paralellize you Chariots app. You can check the how to guide on workers to have more info.

To use an RQWorkerPool with your Chariots app, you can do as such.

>>> from redis import Redis
>>> from chariots import workers
>>> from chariots.pipelines import PipelinesServer
...
...
>>> app = PipelinesServer(
...     my_pipelines,
...     op_store_client=op_store_client,
...     worker_pool=workers.RQWorkerPool(redis=Redis()),
...     use_workers=True,
...     import_name='app'
... )
Parameters
  • redis – the redis connection that will be used by RQ. overrides any redis_kwargs arguments if present

  • redis_kwargs – keyword arguments to be passed to the Redis classed constructor. this will only be used if the redis argument is unset

  • queue_kwargs – additional keyword arguments that will get passed to the rq.Queue object at init be aware that the connection and name arguments will be overridden.

__init__(redis_kwargs: Optional[Dict[str, Any]] = None, redis: Optional[redis.client.Redis] = None, queue_kwargs: Optional[Dict[str, Any]] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

execute_pipeline_async(pipeline: chariots.pipelines._pipeline.Pipeline, pipeline_input: Any, app: chariots.pipelines.pipelines_server.PipelinesServer) → str[source]

method to execute a pipeline inside of a worker.

Parameters
  • pipeline – the pipeline that needs to be executed inside a worker

  • pipeline_input – the input to be fed to the pipeline when it gets executed

  • app – the app that this pipeline belongs to

Returns

the id string of the job. This id needs to correspond to the one that will get sent to BaseWorkerPool.get_pipeline_response_json_for_id

get_pipeline_response_json_for_id(job_id: str) → str[source]

fetches the results from a pipeline that got executed inside a worker. If the results are not available (not done, execution failed, …), the PipelineResponse returned will have the corresponding job status and a None value

Parameters

job_id – the id (as outputted from execute_pipeline_async of the job to fetch results for)

Returns

a jsonified version of the corresponding PipelineResponse

property n_workers

total number of workers in the pool

spawn_worker()[source]

create a new worker in the pool

class chariots.workers.JobStatus[source]

Bases: enum.Enum

enum of all the possible states a job can be in.

deferred = 'deferred'
done = 'done'
failed = 'failed'
from_rq = <bound method JobStatus.from_rq of <enum 'JobStatus'>>[source]
queued = 'queued'
running = 'running'