Api Docs

class chariots.Pipeline(pipeline_nodes: List[base.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.callbacks._pipeline_callback.PipelineCallback]] = None)[source]

a pipeline is a collection of linked nodes that have to be executed one on top of each other. pipelines are the main way to use Chariots.

to build a simple pipeline you can do as such:

>>> pipeline = Pipeline([
...     Node(AddOneOp(), input_nodes=["__pipeline_input__"], output_nodes=["added_number"]),
...     Node(IsOddOp(), input_nodes=["added_number"], output_nodes=["__pipeline_output__"])
... ], "simple_pipeline")

here we have just created a very simple pipeline with two nodes, one that adds one to the provided number and one that returns whether or not the resulting number is odd

to use our pipeline, we can either do it manually with a runner:

>>> from chariots.runners import SequentialRunner
>>> runner = SequentialRunner()
>>> runner.run(pipeline=pipeline, pipeline_input=4)
True

you can also as easily deploy your pipeline to a Chariots app (small micro-service to run your pipeline)

>>> from chariots import Chariots
>>> app = Chariots([pipeline], path=app_path, import_name="simple_app")

Once this is done you can deploy your app as a flask app and get the result of the pipeline using a client:

>>> client.call_pipeline(pipeline, 4)
True
Parameters
  • pipeline_nodes – the nodes of the pipeline. each node has to be linked to previous node (or __pipeline_input__). nodes can create branches but the only output remaining has to be __pipeline_output__ (or no ouptut)

  • name – the name of the pipeline. this will be used to create the route at which to query the pipeline in the Chariots app

  • pipeline_callbacks – callbacks to be used with this pipeline (monitoring and logging for instance)

execute(runner: chariots.base._base_runner.BaseRunner, pipeline_input=None)[source]

present for inheritance purposes from the Op Class, this will automatically raise

execute_node(node: chariots.base._base_nodes.BaseNode, intermediate_results: Dict[NodeReference, Any], runner: chariots.base._base_runner.BaseRunner)[source]

executes a node from the pipeline, this method is called by the runners to make the pipeline execute one of it’s node and all necessary callbacks

Parameters
  • node – the node to be executed

  • intermediate_results – the intermediate result to look in in order to fin the node’s inputs

  • runner – a runner to be used in case the node needs a runner to be executed (internal pipeline)

Raises

ValueError – if the output of the node does not correspond to the length of it’s output references

Returns

the final result of the node after the execution

static extract_results(results: Dict[chariots.base._base_nodes.NodeReference, Any]) → Any[source]

extracts the output of a pipeline once all the nodes have been computed. This method is used by runners when once all the nodes are computed in order to check and get the final result to return

Parameters

results – the outputs left unused once the graph has been ran.

Raises

ValueError – if some output was unused once every node is computed and the remaining is not the output of the pipeline

Returns

the final result of the pipeline as needs to be returned to the use

get_pipeline_versions() → Mapping[chariots.base._base_nodes.BaseNode, chariots.versioning._version.Version][source]

returns the versions of every op in the pipeline

Returns

the mapping version for node

load(op_store: chariots._op_store.OpStore)[source]

loads all the latest versions of the nodes in the pipeline if they are compatible from an OpStore. if the latest version is not compatible, it will raise a VersionError

Parameters

op_store – the op store to look for existing versions if any and to load the bytes of said version if possible

Raises

VersionError – if a node is incompatible with one of it’s input. For instance if a node has not been trained on the latest version of it’s input in an inference pipeline

Returns

this pipeline once it has been fully loaded

property name

the name of the pipeline

property node_for_name

utils mapping that has node names in input and the nodes objects in values

property nodes

the nodes of the pipeline

prepare(saver: chariots.base._base_saver.BaseSaver)[source]

prepares the pipeline to be served. This is manly used to attach the correct saver to the nodes that need one (data saving and loading nodes for instance).

Parameters

saver – the saver to attach to all the nodes that need one

save(op_store: chariots._op_store.OpStore)[source]

persists all the nodes (that need saving) in an OpStore. this is used for instance when a training pipeline has been executed and needs to save it’s trained node(s) for the inference pipeline to load them. This method also updates the versions available for the store to serve in the future

Parameters

op_store – the store to persist the nodes and their versions in

class chariots.Chariots(app_pipelines: List[chariots._pipeline.Pipeline], path, saver_cls: Type[chariots.base._base_saver.BaseSaver] = <class 'chariots.savers._file_saver.FileSaver'>, runner: Optional[chariots.base._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.callbacks._pipeline_callback.PipelineCallback]] = None, *args, **kwargs)[source]

small Flask application used to rapidly deploy pipelines:

>>> my_app = Chariots(app_pipelines=[is_odd_pipeline], path=app_path, import_name="my_app")

you can then deploy the app as you would with the flask comand:

$ flask

or if you have used the chariots’ template, you can use the predefined cli once the project is installed:

$ my_great_project start

once the app is started you can use it with the client (that handles creating the requests and serializing to the right format) to query your pipelines:

>>> client.call_pipeline(is_odd_pipeline, 4)
False

alternatively, you can query the Chariots server directly as you would for any normal micro-service. The server has the following routes:

  • /pipelines/<pipeline_name>/main

  • /pipelines/<pipeline_name>/versions

  • /pipelines/<pipeline_name>/load

  • /pipelines/<pipeline_name>/save

  • /pipelines/<pipeline_name>/health_check

for each pipeline that was registered to the Chariots app. It also creates some common routes for all pipelines:

  • /health_check

  • /available_pipelines

Parameters
  • app_pipelines – the pipelines this app will serve

  • path – the path to mount the app on (whether on local or remote saver). for isntance using a LocalFileSaver and ‘/chariots’ will mean all the information persisted by the Chariots server (past versions, trained models, datasets) will be persisted there

  • saver_cls – the saver class to use. if None the FileSaver class will be used as default

  • runner – the runner to use to run the pipelines. If None the SequentialRunner will be used as default

  • default_pipeline_callbacks – pipeline callbacks to be added to every pipeline this app will serve.

  • args – additional positional arguments to be passed to the Flask app

  • kwargs – additional keywords arguments to be added to the Flask app

class chariots.Client(backend_url: str = 'http://127.0.0.1:5000')[source]

Client to query/save/load the pipelines served by a (remote) Chariots app.

for instance if you have built your app as such and deployed it:

>>> train_pca = Pipeline([Node(IrisXDataSet(), output_nodes=["x"]), Node(PCAOp(mode=MLMode.FIT),
...                       input_nodes=["x"])], "train_pca")

>>> train_logistic = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

>>> pred = Pipeline([
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes="__pipeline_output__")
... ], "pred")

>>> app = Chariots([train_pca, train_logistic, pred], app_path, import_name="iris_app")

you can then train save and load your pipelines remotely from the client

>>> client.call_pipeline(train_pca)
>>> client.save_pipeline(train_pca)
>>> client.load_pipeline(train_logistic)
>>> client.call_pipeline(train_logistic)
>>> client.save_pipeline(train_logistic)
>>> client.load_pipeline(pred)
>>> client.call_pipeline(pred, [[1, 2, 3, 4]])
[1]

but if you execute them in the wrong order the client will propagate the errors that occur on the Chariots server

>>> client.call_pipeline(train_pca)
>>> client.save_pipeline(train_pca)
>>> client.load_pipeline(pred)
Traceback (most recent call last):
...
chariots.errors.VersionError: the pipeline you requested cannot be loaded because of version incompatibilityHINT: retrain and save/reload in order to have a loadable version

this example is overkill as you can use MLMode.FitPredict flag (not used here to demonstrate the situations where VersionError will be raised). this would reduce the amount of saving/loading to get to the prediction.

call_pipeline(pipeline: chariots._pipeline.Pipeline, pipeline_input: Optional[Any] = None) → Any

sends a request to the Chariots server in order to get this pipeline executed remotely on the server.

>>> client.call_pipeline(is_odd_pipeline, 4)
False
>>> client.call_pipeline(is_odd_pipeline, 5)
True

here you can get the user gets the output of the pipeline that got executed in our Chariots micro service

Parameters
  • pipeline – the pipeline that needs to be executed in the remote Chariots server

  • pipeline_input – the input of the pipeline (will be provided to the node with __pipeline__input__ in it’s input_nodes). If none of the nodes accept a __pipeline_input__ and this is provided the execution of the pipeline will fail. pipeline_input needs to be JSON serializable

Raises
  • ValueError – if the pipeline requested is not present in the Chariots app.

  • ValueError – if the execution of the pipeline fails

Returns

the result of the pipeline. it needs to be JSON serializable for chariots to be able to pass it through http

is_pipeline_loaded(pipeline: chariots._pipeline.Pipeline) → bool

checks whether or not the pipeline has been loaded

Parameters

pipeline – the pipeline to check

load_pipeline(pipeline: chariots._pipeline.Pipeline)

reloads all the nodes in a pipeline. this is usually used to load the updates of a node/model in the inference pipeline after the training pipeline(s) have been executed. If the latest version of a saved node is incompatible with the rest of the pipeline, this will raise a VersionError

Parameters

pipeline – the pipeline to reload

Raises

VersionError – If there is a version incompatibility between one of the nodes in the pipeline and one of it’s inputs

pipeline_versions(pipeline: chariots._pipeline.Pipeline) → Mapping[str, chariots.versioning._version.Version]

gets all the versions of the nodes of the pipeline (different from pipeline.get_pipeline_versions as the client will return the version of the loaded/trained version on the (remote) Chariots server)

Parameters

pipeline – the pipeline to get the versions for

Returns

mapping with the node names in keys and the version object in value

save_pipeline(pipeline: chariots._pipeline.Pipeline)

persists the state of the pipeline on the remote Chariots server (usually used for saving the nodes that were trained in a train pipeline in order to load them inside the inference pipelines).

Parameters

pipeline – the pipeline to save on the remote server. Beware: any changes made to the pipeline param will not be persisted (Only changes made on the remote version of the pipeline)

class chariots.TestClient(app: chariots._deployment.app.Chariots)[source]

mock up of the client to test a full app without having to create a server

class chariots.OpStore(saver: chariots.base._base_saver.BaseSaver)[source]

Bases: object

A Chariots OpStore handles the persisting of Ops and their versions as well as the accepted versions of each op’s inputs.

the OpStore persists all this metadata about persisted ops in the /_meta.json file using the saver provided at init

all the serialized ops are saved at /models/<op name>/<version>

The OpStore is mostly used by the Pipelines and the nodes at saving time to:

  • persist the ops that they have updated

  • register new versions

  • register links between different ops and different versions that are valid (for instance this versions of the PCA is valid for this new version of the RandomForest

and at loading time to:

  • check latest available version of an op

  • check if this version is valid with the rest of the pipeline

  • recover the bytes of the latest version if it is valid

the OpStore identifies op’s by there name (usually a snake case of the Class of your op) so changing this name (or changing the class name) might make it hard to recover the metadata and serialized bytes of the Ops

param saver

the saver the op_store will use to retrieve it’s metadata and subsequent ops

get_all_versions_of_op(op: chariots.base._base_op.BaseOp) → Optional[List[chariots.versioning._version.Version]][source]

returns all the available versions of an op ever persisted in the OpGraph (or any Opgraph using the same _meta.json)

Parameters

op – the op to get the previous persisted versions

get_op_bytes_for_version(op: chariots.base._base_op.BaseOp, version: chariots.versioning._version.Version) → bytes[source]

loads the persisted bytes of op for a specific version

Parameters
  • op – the op that needs to be loaded

  • version – the version of the op to load

Returns

the bytes of the op

registers a link between an upstream and a downstream op. This means that in future relaods the downstream op will whitelist this version for this upstream op

Parameters
  • downstream_op – the op that needs to whitelist one of it’s inputs’ new version

  • upstream_op – the op that is getting whitelisted as one of the inputs of the downstream op

  • upstream_op_version – the valid version of the op that is getting whitelisted

Returns

save()[source]

persists all the metadata about ops and versions available in the store using the store’s saver.

The saved metadata can be found at /_meta.json from the saver’s route.

save_op_bytes(op_to_save: chariots.base._base_op.BaseOp, version: chariots.versioning._version.Version, op_bytes: bytes)[source]

saves op_bytes of a specific op to the path /models/<op name>/<version>.

the version that is used here is the node version (and not the op_version) as nodes might be able to modify some behaviors of the versioning of their underlying op

Parameters
  • op_to_save – the op that needs to be saved (this will not be saved as is - only the bytes)

  • version – the exact version to be used when persisting

  • op_bytes – the bytes of the op to save that will be persisted

class chariots.MLMode[source]

Bases: enum.Enum

mode in which to put the op (prediction of training) enum

FIT = 'fit'
FIT_PREDICT = 'fit_predict'
PREDICT = 'predict'