Chariots Pipelines

module that allows you to create chariots pipelines. Chariots pipelines are constructed from nodes and ops.

class chariots.pipelines.Pipeline(pipeline_nodes: List[nodes.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, use_worker: Optional[bool] = None)[source]

a pipeline is a collection of linked nodes that have to be executed one on top of each other. pipelines are the main way to use Chariots.

to build a simple pipeline you can do as such:

>>> pipeline = Pipeline([
...     Node(AddOneOp(), input_nodes=["__pipeline_input__"], output_nodes=["added_number"]),
...     Node(IsOddOp(), input_nodes=["added_number"], output_nodes=["__pipeline_output__"])
... ], "simple_pipeline")

here we have just created a very simple pipeline with two nodes, one that adds one to the provided number and one that returns whether or not the resulting number is odd

to use our pipeline, we can either do it manually with a runner:

>>> from chariots.pipelines.runners import SequentialRunner
>>> runner = SequentialRunner()
>>> runner.run(pipeline=pipeline, pipeline_input=4)
True

you can also as easily deploy your pipeline to a Chariots app (small micro-service to run your pipeline)

>>> from chariots.pipelines import PipelinesServer
>>> app = PipelinesServer([pipeline], op_store_client=op_store_client, import_name="simple_app")

Once this is done you can deploy your app as a flask app and get the result of the pipeline using a client:

>>> client.call_pipeline(pipeline, 4).value
True
Parameters
  • pipeline_nodes – the nodes of the pipeline. each node has to be linked to previous node (or __pipeline_input__). nodes can create branches but the only output remaining has to be __pipeline_output__ (or no ouptut)

  • name – the name of the pipeline. this will be used to create the route at which to query the pipeline in the Chariots app

  • pipeline_callbacks – callbacks to be used with this pipeline (monitoring and logging for instance)

  • use_worker – whether or not to execute this pipeline in a separate worker (rather than in the main server) by default. If set to False, the setting can still be overridden on an execution per execution basis using the client.

__init__(pipeline_nodes: List[nodes.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, use_worker: Optional[bool] = None)[source]
execute(runner: base.BaseRunner, pipeline_input=None)[source]

present for inheritance purposes from the Op Class, this will automatically raise

execute_node(node: chariots.pipelines.nodes._base_nodes.BaseNode, intermediate_results: Dict[NodeReference, Any], runner: chariots.pipelines.runners._base_runner.BaseRunner)[source]

executes a node from the pipeline, this method is called by the runners to make the pipeline execute one of it’s node and all necessary callbacks

Parameters
  • node – the node to be executed

  • intermediate_results – the intermediate result to look in in order to fin the node’s inputs

  • runner – a runner to be used in case the node needs a runner to be executed (internal pipeline)

Raises

ValueError – if the output of the node does not correspond to the length of it’s output references

Returns

the final result of the node after the execution

static extract_results(results: Dict[chariots.pipelines.nodes._base_nodes.NodeReference, Any]) → Any[source]

extracts the output of a pipeline once all the nodes have been computed. This method is used by runners when once all the nodes are computed in order to check and get the final result to return

Parameters

results – the outputs left unused once the graph has been ran.

Raises

ValueError – if some output was unused once every node is computed and the remaining is not the output of the pipeline

Returns

the final result of the pipeline as needs to be returned to the use

gets all the links present in the pipeline

get_pipeline_versions() → Mapping[chariots.pipelines.nodes._base_nodes.BaseNode, chariots.versioning._version.Version][source]

returns the versions of every op in the pipeline

Returns

the mapping version for node

load(op_store_client: chariots.op_store._op_store_client.OpStoreClient)[source]

loads all the latest versions of the nodes in the pipeline if they are compatible from an OpStore. if the latest version is not compatible, it will raise a VersionError

Parameters

op_store_client – the op store to look for existing versions if any and to load the bytes of said version

if possible

Raises

VersionError – if a node is incompatible with one of it’s input. For instance if a node has not been trained on the latest version of it’s input in an inference pipeline

Returns

this pipeline once it has been fully loaded

property name

the name of the pipeline

property node_for_name

utils mapping that has node names in input and the nodes objects in values

property pipeline_nodes

the nodes of the pipeline

save(op_store_client: chariots.op_store._op_store_client.OpStoreClient)[source]

persists all the nodes (that need saving) in an OpStore. this is used for instance when a training pipeline has been executed and needs to save it’s trained node(s) for the inference pipeline to load them. This method also updates the versions available for the store to serve in the future

Parameters

op_store_client – the store to persist the nodes and their versions in

class chariots.pipelines.PipelinesServer(app_pipelines: List[chariots.pipelines._pipeline.Pipeline], *args, op_store_client=None, runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, worker_pool: Optional[chariots.workers._base_worker_pool.BaseWorkerPool] = None, use_workers: Optional[bool] = None, **kwargs)[source]

small Flask application used to rapidly deploy pipelines:

>>> my_app = PipelinesServer(
...                          app_pipelines=[is_odd_pipeline],
...                          op_store_client=op_store_client,
...                          import_name='my_app'
...                         )

you can then deploy the app as you would with the flask command:

$ flask

or if you have used the chariots’ template, you can use the predefined cli once the project is installed:

$ my_great_project start

once the app is started you can use it with the client (that handles creating the requests and serializing to the right format) to query your pipelines:

>>> client.call_pipeline(is_odd_pipeline, 4).value
False

alternatively, you can query the Chariots server directly as you would for any normal micro-service. The server has the following routes:

  • /pipelines/<pipeline_name>/main

  • /pipelines/<pipeline_name>/versions

  • /pipelines/<pipeline_name>/load

  • /pipelines/<pipeline_name>/save

  • /pipelines/<pipeline_name>/health_check

for each pipeline that was registered to the Chariots app. It also creates some common routes for all pipelines:

  • /health_check

  • /available_pipelines

Parameters
  • app_pipelines – the pipelines this app will serve

  • path – the path to mount the app on (whether on local or remote saver). for isntance using a LocalFileSaver and ‘/chariots’ will mean all the information persisted by the Chariots server (past versions, trained models, datasets) will be persisted there

  • saver_cls – the saver class to use. if None the FileSaver class will be used as default

  • runner – the runner to use to run the pipelines. If None the SequentialRunner will be used as default

  • default_pipeline_callbacks – pipeline callbacks to be added to every pipeline this app will serve.

  • worker_pool – worker pool to be used if some jobs are to be executed asynchronously (using a worker master config)

  • use_workers – whether or not to use workers to execute all pipeline execution requests (if set to false, you can still choose to use workers on pipeline to pipeline basis)

  • args – additional positional arguments to be passed to the Flask app

  • kwargs – additional keywords arguments to be added to the Flask app

__init__(app_pipelines: List[chariots.pipelines._pipeline.Pipeline], *args, op_store_client=None, runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.pipelines.callbacks._pipeline_callback.PipelineCallback]] = None, worker_pool: Optional[chariots.workers._base_worker_pool.BaseWorkerPool] = None, use_workers: Optional[bool] = None, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

class chariots.pipelines.PipelinesClient(backend_url: str = 'http://127.0.0.1:5000')[source]

Client to query/save/load the pipelines served by a (remote) Chariots app.

for instance if you have built your app as such and deployed it:

>>> train_pca = Pipeline([nodes.Node(IrisXDataSet(), output_nodes=["x"]), nodes.Node(PCAOp(mode=MLMode.FIT),
...                       input_nodes=["x"])], "train_pca")

>>> train_logistic = Pipeline([
...     nodes.Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     nodes.Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     nodes.Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]),
... ], 'train_logistics')

>>> pred = Pipeline([
...     nodes.Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"),
...     nodes.Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes="pred"),
...     nodes.Node(FromArray(), input_nodes=['pred'], output_nodes='__pipeline_output__')
... ], "pred")

>>> app = PipelinesServer([train_pca, train_logistic, pred], op_store_client=op_store_client,
...                       import_name="iris_app")

you can then train save and load your pipelines remotely from the client

>>> response = client.call_pipeline(train_pca)
>>> client.save_pipeline(train_pca)
>>> client.load_pipeline(train_logistic)
>>> response = client.call_pipeline(train_logistic)
>>> client.save_pipeline(train_logistic)
>>> client.load_pipeline(pred)
>>> response = client.call_pipeline(pred, [[1, 2, 3, 4]])
>>> response.value
[1]

but if you execute them in the wrong order the client will propagate the errors that occur on the Chariots server

>>> response = client.call_pipeline(train_pca)
>>> client.save_pipeline(train_pca)
>>> client.load_pipeline(pred)
Traceback (most recent call last):
...
chariots.errors.VersionError: the pipeline you requested cannot be loaded because of version incompatibilityHINT: retrain and save/reload in order to have a loadable version

this example is overkill as you can use MLMode.FitPredict flag (not used here to demonstrate the situations where VersionError will be raised). this would reduce the amount of saving/loading to get to the prediction.

__init__(backend_url: str = 'http://127.0.0.1:5000')[source]

Initialize self. See help(type(self)) for accurate signature.

call_pipeline(pipeline: chariots.pipelines._pipeline.Pipeline, pipeline_input: Optional[Any] = None, use_worker: Optional[bool] = None) → chariots.pipelines.pipelines_server.PipelineResponse

sends a request to the Chariots server in order to get this pipeline executed remotely on the server.

>>> client.call_pipeline(is_odd_pipeline, 4).value
False
>>> client.call_pipeline(is_odd_pipeline, 5).value
True

If you have a long lasting pipeline (training for instance) that you don’t want executed n the http request but asynchronously in a separate worker, you can use the use_worker argument:


here you can get the user gets the output of the pipeline that got executed in our Chariots micro service

Parameters
  • pipeline – the pipeline that needs to be executed in the remote Chariots server

  • pipeline_input – the input of the pipeline (will be provided to the node with __pipeline__input__ in it’s input_nodes). If none of the nodes accept a __pipeline_input__ and this is provided the execution of the pipeline will fail. pipeline_input needs to be JSON serializable

  • use_worker – whether or not to execute this request in a separate worker.

Raises
  • ValueError – if the pipeline requested is not present in the Chariots app.

  • ValueError – if the execution of the pipeline fails

Returns

a PiplineResponse object

fetch_job(job_id: str, pipeline: chariots.pipelines._pipeline.Pipeline) → chariots.pipelines.pipelines_server.PipelineResponse

fectches a Job launched previously

Parameters
  • job_id – the job id that can be found in PipelineResponse.job_id

  • pipeline – the pipeline object to fectch

Returns

the updated Pipeline response

is_pipeline_loaded(pipeline: chariots.pipelines._pipeline.Pipeline) → bool

checks whether or not the pipeline has been loaded

Parameters

pipeline – the pipeline to check

load_pipeline(pipeline: chariots.pipelines._pipeline.Pipeline)

reloads all the nodes in a pipeline. this is usually used to load the updates of a node/model in the inference pipeline after the training pipeline(s) have been executed. If the latest version of a saved node is incompatible with the rest of the pipeline, this will raise a VersionError

Parameters

pipeline – the pipeline to reload

Raises

VersionError – If there is a version incompatibility between one of the nodes in the pipeline and one of it’s inputs

pipeline_versions(pipeline: chariots.pipelines._pipeline.Pipeline) → Mapping[str, chariots.versioning._version.Version]

gets all the versions of the nodes of the pipeline (different from pipeline.get_pipeline_versions as the client will return the version of the loaded/trained version on the (remote) Chariots server)

Parameters

pipeline – the pipeline to get the versions for

Returns

mapping with the node names in keys and the version object in value

save_pipeline(pipeline: chariots.pipelines._pipeline.Pipeline)

persists the state of the pipeline on the remote Chariots server (usually used for saving the nodes that were trained in a train pipeline in order to load them inside the inference pipelines).

Parameters

pipeline – the pipeline to save on the remote server. Beware: any changes made to the pipeline param will not be persisted (Only changes made on the remote version of the pipeline)