Source code for chariots.workers._rq_worker_pool

""""RQ implementation of the Workers API"""
import json
from _sha1 import sha1
from typing import Any, Dict, Optional

from redis import Redis
from rq import Queue, Connection, Worker

from .. import pipelines, op_store  # pylint: disable=unused-import; # noqa
from . import BaseWorkerPool, JobStatus


def _inner_pipe_execution(pipeline: 'pipelines.Pipeline', pipeline_input: Any, runner: pipelines.runners.BaseRunner,
                          op_store_client: 'op_store.OpStoreClient'):
    pipeline.load(op_store_client)
    res = json.dumps(runner.run(pipeline, pipeline_input))
    pipeline.save(op_store_client)
    return res


[docs]class RQWorkerPool(BaseWorkerPool): """ a worker pool based on the RQ queue job queues. You will need a functionning redis to use this. This worker pool will allow you to easily paralellize you Chariots app. You can check :doc:`the how to guide on workers<../how_to_guides/workers>` to have more info. To use an `RQWorkerPool` with your Chariots app, you can do as such. .. testsetup:: >>> import tempfile >>> import shutil >>> from chariots.testing import TestOpStoreClient >>> app_path = tempfile.mkdtemp() >>> my_pipelines = [] >>> op_store_client = TestOpStoreClient(app_path) .. doctest:: >>> from redis import Redis >>> from chariots import workers >>> from chariots.pipelines import PipelinesServer ... ... >>> app = PipelinesServer( ... my_pipelines, ... op_store_client=op_store_client, ... worker_pool=workers.RQWorkerPool(redis=Redis()), ... use_workers=True, ... import_name='app' ... ) :param redis: the redis connection that will be used by RQ. overrides any redis_kwargs arguments if present :param redis_kwargs: keyword arguments to be passed to the Redis classed constructor. this will only be used if the redis argument is unset :param queue_kwargs: additional keyword arguments that will get passed to the `rq.Queue` object at init be aware that the `connection` and `name` arguments will be overridden. """
[docs] def __init__(self, redis_kwargs: Optional[Dict[str, Any]] = None, redis: Optional[Redis] = None, queue_kwargs: Optional[Dict[str, Any]] = None): self._queue_name = 'chariots_workers' redis_kwargs = redis_kwargs or {} self._redis = redis or Redis(**redis_kwargs) queue_kwargs = queue_kwargs or {} queue_kwargs['connection'] = self._redis queue_kwargs['name'] = self._queue_name self._queue = Queue(**queue_kwargs) self._jobs = {}
[docs] def spawn_worker(self): with Connection(self._redis): worker = Worker(self._queue_name) worker.work()
[docs] def execute_pipeline_async(self, pipeline: 'pipelines.Pipeline', pipeline_input: Any, app: pipelines.PipelinesServer) -> str: if not self.n_workers: raise ValueError('async job requested but it seems no workers are available') rq_job = self._queue.enqueue(_inner_pipe_execution, kwargs={ 'pipeline': pipeline, 'pipeline_input': pipeline_input, 'runner': app.runner, 'op_store_client': app.op_store_client }) chariots_job_id = sha1(rq_job.id.encode('utf-8')).hexdigest() self._jobs[chariots_job_id] = rq_job return chariots_job_id
[docs] def get_pipeline_response_json_for_id(self, job_id: str) -> str: rq_job = self._jobs.get(job_id) if rq_job is None: raise ValueError('job {} was not found, are you sure you submited it using workers'.format(job_id)) job_status = JobStatus.from_rq(rq_job.get_status()) return json.dumps( pipelines.PipelineResponse(json.loads(rq_job.result) if job_status is JobStatus.done else None, {}, job_id=job_id, job_status=job_status).json())
@property def n_workers(self): return Worker.count(connection=self._redis)