Source code for chariots.pipelines.pipelines_server

"""class that handles the backend setup of the Chariots app, to deploy the pipelines in a Flask server"""
import json
from typing import Mapping, Any, List, Optional, Union

from flask import Flask, request


import chariots
from .. import errors, versioning
from . import Pipeline
from . import runners, nodes, callbacks


class PipelineResponse:
    """
    A PipelineResponse represents all the information that is sent from the _deployment when a pipeline is executed.
    """

    def __init__(self, value: Any, versions: Mapping[nodes.BaseNode, versioning.Version], job_id: str,
                 job_status: '_base_worker_pool.JobSatus'):
        self.value = value
        self.versions = versions
        self.job_id = job_id
        self.job_status = job_status

    def json(self) -> Mapping[str, Any]:
        """
        jsonify the response to be passed over http

        :return: the dict representing this response
        """
        return {
            'pipeline_output': self.value,
            'versions': {node.name: str(version) for node, version in self.versions.items()},
            'job_id': self.job_id,
            'job_status': self.job_status.value,
        }

    @classmethod
    def from_request(cls, response_json: Any, query_pipeline: Pipeline) -> 'PipelineResponse':
        """
        builds the response from the response that was received through http and the pipeline used to query it

        :param response_json: the response json of the call
        :param query_pipeline: the pipeline that was used in the query that generated the response
        :return: the corresponding PipelineResponse
        """
        return cls(
            value=response_json['pipeline_output'],
            versions={query_pipeline.node_for_name[node_name]: versioning.Version.parse(version_string)
                      for node_name, version_string in response_json['versions'].items()},
            job_id=response_json['job_id'],
            job_status=chariots.workers.JobStatus[response_json['job_status']],
        )


[docs]class PipelinesServer(Flask): # pylint: disable=too-many-instance-attributes """ small `Flask` application used to rapidly deploy pipelines: .. testsetup:: >>> import tempfile >>> import shutil >>> from chariots.pipelines import Pipeline, PipelinesServer >>> from chariots.testing import TestOpStoreClient >>> from chariots._helpers.doc_utils import is_odd_pipeline ... >>> app_path = tempfile.mkdtemp() >>> op_store_client = TestOpStoreClient(app_path) >>> op_store_client.server.db.create_all() .. doctest:: >>> my_app = PipelinesServer( ... app_pipelines=[is_odd_pipeline], ... op_store_client=op_store_client, ... import_name='my_app' ... ) you can then deploy the app as you would with the flask command: .. code-block:: console $ flask or if you have used :doc:`the chariots' template <.../template>`, you can use the predefined cli once the project is installed: .. code-block:: console $ my_great_project start once the app is started you can use it with the client (that handles creating the requests and serializing to the right format) to query your pipelines: .. testsetup:: >>> from chariots.testing import TestPipelinesClient >>> client = TestPipelinesClient(my_app) .. doctest:: >>> client.call_pipeline(is_odd_pipeline, 4).value False .. testsetup:: >>> shutil.rmtree(app_path) alternatively, you can query the `Chariots` server directly as you would for any normal micro-service. The server has the following routes: - `/pipelines/<pipeline_name>/main` - `/pipelines/<pipeline_name>/versions` - `/pipelines/<pipeline_name>/load` - `/pipelines/<pipeline_name>/save` - `/pipelines/<pipeline_name>/health_check` for each pipeline that was registered to the `Chariots` app. It also creates some common routes for all pipelines: - `/health_check` - `/available_pipelines` :param app_pipelines: the pipelines this app will serve :param path: the path to mount the app on (whether on local or remote saver). for isntance using a `LocalFileSaver` and '/chariots' will mean all the information persisted by the `Chariots` server (past versions, trained models, datasets) will be persisted there :param saver_cls: the saver class to use. if None the `FileSaver` class will be used as default :param runner: the runner to use to run the pipelines. If None the `SequentialRunner` will be used as default :param default_pipeline_callbacks: pipeline callbacks to be added to every pipeline this app will serve. :param worker_pool: worker pool to be used if some jobs are to be executed asynchronously (using a worker master config) :param use_workers: whether or not to use workers to execute all pipeline execution requests (if set to false, you can still choose to use workers on pipeline to pipeline basis) :param args: additional positional arguments to be passed to the Flask app :param kwargs: additional keywords arguments to be added to the Flask app """
[docs] def __init__(self, app_pipelines: List[Pipeline], *args, op_store_client=None, runner: Optional[runners.BaseRunner] = None, default_pipeline_callbacks: Optional[List[callbacks.PipelineCallback]] = None, worker_pool: 'Optional[chariots.workers.BaseWorkerPool]' = None, use_workers: Optional[bool] = None, **kwargs): super().__init__(*args, **kwargs) self.runner = runner or runners.SequentialRunner() self.op_store_client = op_store_client # adding the default pipeline callbacks to all the pipelines of the app for pipeline in app_pipelines: pipeline.callbacks.extend(default_pipeline_callbacks or []) self._pipelines = { pipe.name: pipe for pipe in app_pipelines } self._loaded_pipelines = { pipe.name: False for pipe in app_pipelines } self._init_pipelines() self._build_route() self._build_error_handlers() self._worker_pool = worker_pool self.use_workers = use_workers
def _init_pipelines(self): for pipeline_name, pipeline in self._pipelines.items(): if self.op_store_client.pipeline_exists(pipeline_name): self._loaded_pipelines[pipeline_name] = True continue self.op_store_client.register_new_pipeline(pipeline) pipeline.save(self.op_store_client) self._loaded_pipelines[pipeline_name] = True def _build_error_handlers(self): self.register_error_handler(errors.VersionError, lambda error: error.handle()) @staticmethod def _should_execute_pipeline_async(app_worker_config: Union[bool, None], pipeline_worker_config: Union[bool, None], request_worker_config: Union[bool, None]) -> bool: """ whether or not an specific pipeline call should be executed async. If either one of the config is False, the output will be False. If there are `None` and at least one True the ou tput is True. If all `None` the output is False """ should_execute_pipeline_async = False for param in [app_worker_config, pipeline_worker_config, request_worker_config]: if param is None: continue if not param: return False should_execute_pipeline_async = True return should_execute_pipeline_async def _build_route(self): def serve_pipeline(pipeline_name): if not self._loaded_pipelines[pipeline_name]: raise ValueError('pipeline not loaded, load before execution') pipeline = self._pipelines[pipeline_name] pipeline_input = request.json.get('pipeline_input') if request.json else None if self._should_execute_pipeline_async(app_worker_config=self.use_workers, pipeline_worker_config=pipeline.use_worker, request_worker_config=request.json.get('use_worker')): if self._worker_pool is None: raise ValueError('execution requested using workers, however no WorkerPool was provided at init') job_id = self._worker_pool.execute_pipeline_async(pipeline, pipeline_input, self) return self._worker_pool.get_pipeline_response_json_for_id(job_id) response = PipelineResponse(self.runner.run(pipeline, pipeline_input), pipeline.get_pipeline_versions(), job_id=None, job_status=chariots.workers.JobStatus.done) return json.dumps(response.json()) self.add_url_rule('/pipelines/<pipeline_name>/main', 'serve_pipeline', serve_pipeline, methods=['POST']) def load_pipeline(pipeline_name): self._load_single_pipeline(pipeline_name) return json.dumps({}) self.add_url_rule('/pipelines/<pipeline_name>/load', 'load_pipeline', load_pipeline, methods=['POST']) def pipeline_versions(pipeline_name): pipeline = self._pipelines[pipeline_name] return json.dumps({node.name: str(version) for node, version in pipeline.get_pipeline_versions().items()}) self.add_url_rule('/pipelines/<pipeline_name>/versions', 'pipeline_versions', pipeline_versions, methods=['POST']) def save_pipeline(pipeline_name): pipeline = self._pipelines[pipeline_name] pipeline.save(self.op_store_client) return json.dumps({}) self.add_url_rule('/pipelines/<pipeline_name>/save', 'save_pipeline', save_pipeline, methods=['POST']) def pipeline_health_check(pipeline_name): is_loaded = self._loaded_pipelines[pipeline_name] return json.dumps({'is_loaded': is_loaded}), 200 if is_loaded else 419 self.add_url_rule('/pipelines/<pipeline_name>/health_check', 'pipeline_health_check', pipeline_health_check, methods=['GET']) def health_check(): return json.dumps(self._loaded_pipelines) self.add_url_rule('/health_check', 'health_check', health_check, methods=['GET']) def fetch_job(): job_id = request.json['job_id'] if job_id is None: raise ValueError('job id is None, results are probably already present') return self._worker_pool.get_pipeline_response_json_for_id(job_id) self.add_url_rule('/jobs/fetch/', 'fetch_job', fetch_job, methods=['POST']) def all_pipelines(): return json.dumps(list(self._pipelines.keys())) self.add_url_rule('/available_pipelines', 'all_pipelines', all_pipelines, methods=['GET']) def _load_pipelines(self): for pipeline in self._pipelines.values(): try: self._load_single_pipeline(pipeline.name) except ValueError: continue def _load_single_pipeline(self, pipeline_name): self._pipelines[pipeline_name].load(self.op_store_client) self._loaded_pipelines[pipeline_name] = True