Chariots Pipelines¶
module that allows you to create chariots pipelines. Chariots pipelines are constructed from nodes and ops.
-
class
chariots.pipelines.
Pipeline
(pipeline_nodes: List[nodes.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, use_worker: Optional[bool] = None)[source]¶ a pipeline is a collection of linked nodes that have to be executed one on top of each other. pipelines are the main way to use Chariots.
to build a simple pipeline you can do as such:
>>> pipeline = Pipeline([ ... Node(AddOneOp(), input_nodes=["__pipeline_input__"], output_nodes=["added_number"]), ... Node(IsOddOp(), input_nodes=["added_number"], output_nodes=["__pipeline_output__"]) ... ], "simple_pipeline")
here we have just created a very simple pipeline with two nodes, one that adds one to the provided number and one that returns whether or not the resulting number is odd
to use our pipeline, we can either do it manually with a runner:
>>> from chariots.pipelines.runners import SequentialRunner >>> runner = SequentialRunner() >>> runner.run(pipeline=pipeline, pipeline_input=4) True
you can also as easily deploy your pipeline to a Chariots app (small micro-service to run your pipeline)
>>> from chariots.pipelines import PipelinesServer >>> app = PipelinesServer([pipeline], op_store_client=op_store_client, import_name="simple_app")
Once this is done you can deploy your app as a flask app and get the result of the pipeline using a client:
>>> client.call_pipeline(pipeline, 4).value True
- Parameters
pipeline_nodes – the nodes of the pipeline. each node has to be linked to previous node (or __pipeline_input__). nodes can create branches but the only output remaining has to be __pipeline_output__ (or no ouptut)
name – the name of the pipeline. this will be used to create the route at which to query the pipeline in the Chariots app
pipeline_callbacks – callbacks to be used with this pipeline (monitoring and logging for instance)
use_worker – whether or not to execute this pipeline in a separate worker (rather than in the main server) by default. If set to False, the setting can still be overridden on an execution per execution basis using the client.
-
__init__
(pipeline_nodes: List[nodes.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, use_worker: Optional[bool] = None)[source]¶
-
execute
(runner: base.BaseRunner, pipeline_input=None)[source]¶ present for inheritance purposes from the Op Class, this will automatically raise
-
execute_node
(node: chariots.pipelines.nodes._base_nodes.BaseNode, intermediate_results: Dict[NodeReference, Any], runner: chariots.pipelines.runners._base_runner.BaseRunner)[source]¶ executes a node from the pipeline, this method is called by the runners to make the pipeline execute one of it’s node and all necessary callbacks
- Parameters
node – the node to be executed
intermediate_results – the intermediate result to look in in order to fin the node’s inputs
runner – a runner to be used in case the node needs a runner to be executed (internal pipeline)
- Raises
ValueError – if the output of the node does not correspond to the length of it’s output references
- Returns
the final result of the node after the execution
-
static
extract_results
(results: Dict[chariots.pipelines.nodes._base_nodes.NodeReference, Any]) → Any[source]¶ extracts the output of a pipeline once all the nodes have been computed. This method is used by runners when once all the nodes are computed in order to check and get the final result to return
- Parameters
results – the outputs left unused once the graph has been ran.
- Raises
ValueError – if some output was unused once every node is computed and the remaining is not the output of the pipeline
- Returns
the final result of the pipeline as needs to be returned to the use
-
get_all_op_links
() → List[Tuple[chariots.pipelines.nodes._base_nodes.BaseNode, chariots.pipelines.nodes._base_nodes.BaseNode]][source]¶ gets all the links present in the pipeline
-
get_pipeline_versions
() → Mapping[chariots.pipelines.nodes._base_nodes.BaseNode, chariots.versioning._version.Version][source]¶ returns the versions of every op in the pipeline
- Returns
the mapping version for node
-
load
(op_store_client: chariots.op_store._op_store_client.OpStoreClient)[source]¶ loads all the latest versions of the nodes in the pipeline if they are compatible from an OpStore. if the latest version is not compatible, it will raise a VersionError
- Parameters
op_store_client – the op store to look for existing versions if any and to load the bytes of said version
if possible
- Raises
VersionError – if a node is incompatible with one of it’s input. For instance if a node has not been trained on the latest version of it’s input in an inference pipeline
- Returns
this pipeline once it has been fully loaded
-
property
name
¶ the name of the pipeline
-
property
node_for_name
¶ utils mapping that has node names in input and the nodes objects in values
-
property
pipeline_nodes
¶ the nodes of the pipeline
-
save
(op_store_client: chariots.op_store._op_store_client.OpStoreClient)[source]¶ persists all the nodes (that need saving) in an OpStore. this is used for instance when a training pipeline has been executed and needs to save it’s trained node(s) for the inference pipeline to load them. This method also updates the versions available for the store to serve in the future
- Parameters
op_store_client – the store to persist the nodes and their versions in
-
class
chariots.pipelines.
PipelinesServer
(app_pipelines: List[chariots.pipelines._pipeline.Pipeline], *args, op_store_client=None, runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, worker_pool: Optional[chariots.workers._base_worker_pool.BaseWorkerPool] = None, use_workers: Optional[bool] = None, **kwargs)[source]¶ small Flask application used to rapidly deploy pipelines:
>>> my_app = PipelinesServer( ... app_pipelines=[is_odd_pipeline], ... op_store_client=op_store_client, ... import_name='my_app' ... )
you can then deploy the app as you would with the flask command:
$ flask
or if you have used the chariots’ template, you can use the predefined cli once the project is installed:
$ my_great_project start
once the app is started you can use it with the client (that handles creating the requests and serializing to the right format) to query your pipelines:
>>> client.call_pipeline(is_odd_pipeline, 4).value False
alternatively, you can query the Chariots server directly as you would for any normal micro-service. The server has the following routes:
/pipelines/<pipeline_name>/main
/pipelines/<pipeline_name>/versions
/pipelines/<pipeline_name>/load
/pipelines/<pipeline_name>/save
/pipelines/<pipeline_name>/health_check
for each pipeline that was registered to the Chariots app. It also creates some common routes for all pipelines:
/health_check
/available_pipelines
- Parameters
app_pipelines – the pipelines this app will serve
path – the path to mount the app on (whether on local or remote saver). for isntance using a LocalFileSaver and ‘/chariots’ will mean all the information persisted by the Chariots server (past versions, trained models, datasets) will be persisted there
saver_cls – the saver class to use. if None the FileSaver class will be used as default
runner – the runner to use to run the pipelines. If None the SequentialRunner will be used as default
default_pipeline_callbacks – pipeline callbacks to be added to every pipeline this app will serve.
worker_pool – worker pool to be used if some jobs are to be executed asynchronously (using a worker master config)
use_workers – whether or not to use workers to execute all pipeline execution requests (if set to false, you can still choose to use workers on pipeline to pipeline basis)
args – additional positional arguments to be passed to the Flask app
kwargs – additional keywords arguments to be added to the Flask app
-
__init__
(app_pipelines: List[chariots.pipelines._pipeline.Pipeline], *args, op_store_client=None, runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, worker_pool: Optional[chariots.workers._base_worker_pool.BaseWorkerPool] = None, use_workers: Optional[bool] = None, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
class
chariots.pipelines.
PipelinesClient
(backend_url: str = 'http://127.0.0.1:5000')[source]¶ Client to query/save/load the pipelines served by a (remote) Chariots app.
for instance if you have built your app as such and deployed it:
>>> train_pca = Pipeline([nodes.Node(IrisXDataSet(), output_nodes=["x"]), nodes.Node(PCAOp(mode=MLMode.FIT), ... input_nodes=["x"])], "train_pca") >>> train_logistic = Pipeline([ ... nodes.Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... nodes.Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... nodes.Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]), ... ], 'train_logistics') >>> pred = Pipeline([ ... nodes.Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"), ... nodes.Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes="pred"), ... nodes.Node(FromArray(), input_nodes=['pred'], output_nodes='__pipeline_output__') ... ], "pred") >>> app = PipelinesServer([train_pca, train_logistic, pred], op_store_client=op_store_client, ... import_name="iris_app")
you can then train save and load your pipelines remotely from the client
>>> response = client.call_pipeline(train_pca) >>> client.save_pipeline(train_pca) >>> client.load_pipeline(train_logistic) >>> response = client.call_pipeline(train_logistic) >>> client.save_pipeline(train_logistic) >>> client.load_pipeline(pred) >>> response = client.call_pipeline(pred, [[1, 2, 3, 4]]) >>> response.value [1]
but if you execute them in the wrong order the client will propagate the errors that occur on the Chariots server
>>> response = client.call_pipeline(train_pca) >>> client.save_pipeline(train_pca) >>> client.load_pipeline(pred) Traceback (most recent call last): ... chariots.errors.VersionError: the pipeline you requested cannot be loaded because of version incompatibilityHINT: retrain and save/reload in order to have a loadable version
this example is overkill as you can use MLMode.FitPredict flag (not used here to demonstrate the situations where VersionError will be raised). this would reduce the amount of saving/loading to get to the prediction.
-
__init__
(backend_url: str = 'http://127.0.0.1:5000')[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
call_pipeline
(pipeline: chariots.pipelines._pipeline.Pipeline, pipeline_input: Optional[Any] = None, use_worker: Optional[bool] = None) → chariots.pipelines.pipelines_server.PipelineResponse¶ sends a request to the Chariots server in order to get this pipeline executed remotely on the server.
>>> client.call_pipeline(is_odd_pipeline, 4).value False >>> client.call_pipeline(is_odd_pipeline, 5).value True
If you have a long lasting pipeline (training for instance) that you don’t want executed n the http request but asynchronously in a separate worker, you can use the use_worker argument:
here you can get the user gets the output of the pipeline that got executed in our Chariots micro service
- Parameters
pipeline – the pipeline that needs to be executed in the remote Chariots server
pipeline_input – the input of the pipeline (will be provided to the node with __pipeline__input__ in it’s input_nodes). If none of the nodes accept a __pipeline_input__ and this is provided the execution of the pipeline will fail. pipeline_input needs to be JSON serializable
use_worker – whether or not to execute this request in a separate worker.
- Raises
ValueError – if the pipeline requested is not present in the Chariots app.
ValueError – if the execution of the pipeline fails
- Returns
a PiplineResponse object
-
fetch_job
(job_id: str, pipeline: chariots.pipelines._pipeline.Pipeline) → chariots.pipelines.pipelines_server.PipelineResponse¶ fectches a Job launched previously
- Parameters
job_id – the job id that can be found in PipelineResponse.job_id
pipeline – the pipeline object to fectch
- Returns
the updated Pipeline response
-
is_pipeline_loaded
(pipeline: chariots.pipelines._pipeline.Pipeline) → bool¶ checks whether or not the pipeline has been loaded
- Parameters
pipeline – the pipeline to check
-
load_pipeline
(pipeline: chariots.pipelines._pipeline.Pipeline)¶ reloads all the nodes in a pipeline. this is usually used to load the updates of a node/model in the inference pipeline after the training pipeline(s) have been executed. If the latest version of a saved node is incompatible with the rest of the pipeline, this will raise a VersionError
- Parameters
pipeline – the pipeline to reload
- Raises
VersionError – If there is a version incompatibility between one of the nodes in the pipeline and one of it’s inputs
-
pipeline_versions
(pipeline: chariots.pipelines._pipeline.Pipeline) → Mapping[str, chariots.versioning._version.Version]¶ gets all the versions of the nodes of the pipeline (different from pipeline.get_pipeline_versions as the client will return the version of the loaded/trained version on the (remote) Chariots server)
- Parameters
pipeline – the pipeline to get the versions for
- Returns
mapping with the node names in keys and the version object in value
-
save_pipeline
(pipeline: chariots.pipelines._pipeline.Pipeline)¶ persists the state of the pipeline on the remote Chariots server (usually used for saving the nodes that were trained in a train pipeline in order to load them inside the inference pipelines).
- Parameters
pipeline – the pipeline to save on the remote server. Beware: any changes made to the pipeline param will not be persisted (Only changes made on the remote version of the pipeline)
-