Api Docs¶
-
class
chariots.
Pipeline
(pipeline_nodes: List[base.BaseNode], name: str, pipeline_callbacks: Optional[List[chariots.callbacks._pipeline_callback.PipelineCallback]] = None)[source]¶ a pipeline is a collection of linked nodes that have to be executed one on top of each other. pipelines are the main way to use Chariots.
to build a simple pipeline you can do as such:
>>> pipeline = Pipeline([ ... Node(AddOneOp(), input_nodes=["__pipeline_input__"], output_nodes=["added_number"]), ... Node(IsOddOp(), input_nodes=["added_number"], output_nodes=["__pipeline_output__"]) ... ], "simple_pipeline")
here we have just created a very simple pipeline with two nodes, one that adds one to the provided number and one that returns whether or not the resulting number is odd
to use our pipeline, we can either do it manually with a runner:
>>> from chariots.runners import SequentialRunner >>> runner = SequentialRunner() >>> runner.run(pipeline=pipeline, pipeline_input=4) True
you can also as easily deploy your pipeline to a Chariots app (small micro-service to run your pipeline)
>>> from chariots import Chariots >>> app = Chariots([pipeline], path=app_path, import_name="simple_app")
Once this is done you can deploy your app as a flask app and get the result of the pipeline using a client:
>>> client.call_pipeline(pipeline, 4) True
- Parameters
pipeline_nodes – the nodes of the pipeline. each node has to be linked to previous node (or __pipeline_input__). nodes can create branches but the only output remaining has to be __pipeline_output__ (or no ouptut)
name – the name of the pipeline. this will be used to create the route at which to query the pipeline in the Chariots app
pipeline_callbacks – callbacks to be used with this pipeline (monitoring and logging for instance)
-
execute
(runner: chariots.base._base_runner.BaseRunner, pipeline_input=None)[source]¶ present for inheritance purposes from the Op Class, this will automatically raise
-
execute_node
(node: chariots.base._base_nodes.BaseNode, intermediate_results: Dict[NodeReference, Any], runner: chariots.base._base_runner.BaseRunner)[source]¶ executes a node from the pipeline, this method is called by the runners to make the pipeline execute one of it’s node and all necessary callbacks
- Parameters
node – the node to be executed
intermediate_results – the intermediate result to look in in order to fin the node’s inputs
runner – a runner to be used in case the node needs a runner to be executed (internal pipeline)
- Raises
ValueError – if the output of the node does not correspond to the length of it’s output references
- Returns
the final result of the node after the execution
-
static
extract_results
(results: Dict[chariots.base._base_nodes.NodeReference, Any]) → Any[source]¶ extracts the output of a pipeline once all the nodes have been computed. This method is used by runners when once all the nodes are computed in order to check and get the final result to return
- Parameters
results – the outputs left unused once the graph has been ran.
- Raises
ValueError – if some output was unused once every node is computed and the remaining is not the output of the pipeline
- Returns
the final result of the pipeline as needs to be returned to the use
-
get_pipeline_versions
() → Mapping[chariots.base._base_nodes.BaseNode, chariots.versioning._version.Version][source]¶ returns the versions of every op in the pipeline
- Returns
the mapping version for node
-
load
(op_store: chariots._op_store.OpStore)[source]¶ loads all the latest versions of the nodes in the pipeline if they are compatible from an OpStore. if the latest version is not compatible, it will raise a VersionError
- Parameters
op_store – the op store to look for existing versions if any and to load the bytes of said version if possible
- Raises
VersionError – if a node is incompatible with one of it’s input. For instance if a node has not been trained on the latest version of it’s input in an inference pipeline
- Returns
this pipeline once it has been fully loaded
-
property
name
¶ the name of the pipeline
-
property
node_for_name
¶ utils mapping that has node names in input and the nodes objects in values
-
property
nodes
¶ the nodes of the pipeline
-
prepare
(saver: chariots.base._base_saver.BaseSaver)[source]¶ prepares the pipeline to be served. This is manly used to attach the correct saver to the nodes that need one (data saving and loading nodes for instance).
- Parameters
saver – the saver to attach to all the nodes that need one
-
save
(op_store: chariots._op_store.OpStore)[source]¶ persists all the nodes (that need saving) in an OpStore. this is used for instance when a training pipeline has been executed and needs to save it’s trained node(s) for the inference pipeline to load them. This method also updates the versions available for the store to serve in the future
- Parameters
op_store – the store to persist the nodes and their versions in
-
class
chariots.
Chariots
(app_pipelines: List[chariots._pipeline.Pipeline], path, saver_cls: Type[chariots.base._base_saver.BaseSaver] = <class 'chariots.savers._file_saver.FileSaver'>, runner: Optional[chariots.base._base_runner.BaseRunner] = None, default_pipeline_callbacks: Optional[List[chariots.callbacks._pipeline_callback.PipelineCallback]] = None, *args, **kwargs)[source]¶ small Flask application used to rapidly deploy pipelines:
>>> my_app = Chariots(app_pipelines=[is_odd_pipeline], path=app_path, import_name="my_app")
you can then deploy the app as you would with the flask comand:
$ flask
or if you have used the chariots’ template, you can use the predefined cli once the project is installed:
$ my_great_project start
once the app is started you can use it with the client (that handles creating the requests and serializing to the right format) to query your pipelines:
>>> client.call_pipeline(is_odd_pipeline, 4) False
alternatively, you can query the Chariots server directly as you would for any normal micro-service. The server has the following routes:
/pipelines/<pipeline_name>/main
/pipelines/<pipeline_name>/versions
/pipelines/<pipeline_name>/load
/pipelines/<pipeline_name>/save
/pipelines/<pipeline_name>/health_check
for each pipeline that was registered to the Chariots app. It also creates some common routes for all pipelines:
/health_check
/available_pipelines
- Parameters
app_pipelines – the pipelines this app will serve
path – the path to mount the app on (whether on local or remote saver). for isntance using a LocalFileSaver and ‘/chariots’ will mean all the information persisted by the Chariots server (past versions, trained models, datasets) will be persisted there
saver_cls – the saver class to use. if None the FileSaver class will be used as default
runner – the runner to use to run the pipelines. If None the SequentialRunner will be used as default
default_pipeline_callbacks – pipeline callbacks to be added to every pipeline this app will serve.
args – additional positional arguments to be passed to the Flask app
kwargs – additional keywords arguments to be added to the Flask app
-
class
chariots.
Client
(backend_url: str = 'http://127.0.0.1:5000')[source]¶ Client to query/save/load the pipelines served by a (remote) Chariots app.
for instance if you have built your app as such and deployed it:
>>> train_pca = Pipeline([Node(IrisXDataSet(), output_nodes=["x"]), Node(PCAOp(mode=MLMode.FIT), ... input_nodes=["x"])], "train_pca") >>> train_logistic = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train_logistics') >>> pred = Pipeline([ ... Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes="__pipeline_output__") ... ], "pred") >>> app = Chariots([train_pca, train_logistic, pred], app_path, import_name="iris_app")
you can then train save and load your pipelines remotely from the client
>>> client.call_pipeline(train_pca) >>> client.save_pipeline(train_pca) >>> client.load_pipeline(train_logistic) >>> client.call_pipeline(train_logistic) >>> client.save_pipeline(train_logistic) >>> client.load_pipeline(pred) >>> client.call_pipeline(pred, [[1, 2, 3, 4]]) [1]
but if you execute them in the wrong order the client will propagate the errors that occur on the Chariots server
>>> client.call_pipeline(train_pca) >>> client.save_pipeline(train_pca) >>> client.load_pipeline(pred) Traceback (most recent call last): ... chariots.errors.VersionError: the pipeline you requested cannot be loaded because of version incompatibilityHINT: retrain and save/reload in order to have a loadable version
this example is overkill as you can use MLMode.FitPredict flag (not used here to demonstrate the situations where VersionError will be raised). this would reduce the amount of saving/loading to get to the prediction.
-
call_pipeline
(pipeline: chariots._pipeline.Pipeline, pipeline_input: Optional[Any] = None) → Any¶ sends a request to the Chariots server in order to get this pipeline executed remotely on the server.
>>> client.call_pipeline(is_odd_pipeline, 4) False >>> client.call_pipeline(is_odd_pipeline, 5) True
here you can get the user gets the output of the pipeline that got executed in our Chariots micro service
- Parameters
pipeline – the pipeline that needs to be executed in the remote Chariots server
pipeline_input – the input of the pipeline (will be provided to the node with __pipeline__input__ in it’s input_nodes). If none of the nodes accept a __pipeline_input__ and this is provided the execution of the pipeline will fail. pipeline_input needs to be JSON serializable
- Raises
ValueError – if the pipeline requested is not present in the Chariots app.
ValueError – if the execution of the pipeline fails
- Returns
the result of the pipeline. it needs to be JSON serializable for chariots to be able to pass it through http
-
is_pipeline_loaded
(pipeline: chariots._pipeline.Pipeline) → bool¶ checks whether or not the pipeline has been loaded
- Parameters
pipeline – the pipeline to check
-
load_pipeline
(pipeline: chariots._pipeline.Pipeline)¶ reloads all the nodes in a pipeline. this is usually used to load the updates of a node/model in the inference pipeline after the training pipeline(s) have been executed. If the latest version of a saved node is incompatible with the rest of the pipeline, this will raise a VersionError
- Parameters
pipeline – the pipeline to reload
- Raises
VersionError – If there is a version incompatibility between one of the nodes in the pipeline and one of it’s inputs
-
pipeline_versions
(pipeline: chariots._pipeline.Pipeline) → Mapping[str, chariots.versioning._version.Version]¶ gets all the versions of the nodes of the pipeline (different from pipeline.get_pipeline_versions as the client will return the version of the loaded/trained version on the (remote) Chariots server)
- Parameters
pipeline – the pipeline to get the versions for
- Returns
mapping with the node names in keys and the version object in value
-
save_pipeline
(pipeline: chariots._pipeline.Pipeline)¶ persists the state of the pipeline on the remote Chariots server (usually used for saving the nodes that were trained in a train pipeline in order to load them inside the inference pipelines).
- Parameters
pipeline – the pipeline to save on the remote server. Beware: any changes made to the pipeline param will not be persisted (Only changes made on the remote version of the pipeline)
-
-
class
chariots.
TestClient
(app: chariots._deployment.app.Chariots)[source]¶ mock up of the client to test a full app without having to create a server
-
class
chariots.
OpStore
(saver: chariots.base._base_saver.BaseSaver)[source]¶ Bases:
object
A Chariots OpStore handles the persisting of Ops and their versions as well as the accepted versions of each op’s inputs.
the OpStore persists all this metadata about persisted ops in the /_meta.json file using the saver provided at init
all the serialized ops are saved at /models/<op name>/<version>
The OpStore is mostly used by the Pipelines and the nodes at saving time to:
persist the ops that they have updated
register new versions
register links between different ops and different versions that are valid (for instance this versions of the PCA is valid for this new version of the RandomForest
and at loading time to:
check latest available version of an op
check if this version is valid with the rest of the pipeline
recover the bytes of the latest version if it is valid
the OpStore identifies op’s by there name (usually a snake case of the Class of your op) so changing this name (or changing the class name) might make it hard to recover the metadata and serialized bytes of the Ops
- param saver
the saver the op_store will use to retrieve it’s metadata and subsequent ops
-
get_all_versions_of_op
(op: chariots.base._base_op.BaseOp) → Optional[List[chariots.versioning._version.Version]][source]¶ returns all the available versions of an op ever persisted in the OpGraph (or any Opgraph using the same _meta.json)
- Parameters
op – the op to get the previous persisted versions
-
get_op_bytes_for_version
(op: chariots.base._base_op.BaseOp, version: chariots.versioning._version.Version) → bytes[source]¶ loads the persisted bytes of op for a specific version
- Parameters
op – the op that needs to be loaded
version – the version of the op to load
- Returns
the bytes of the op
-
get_validated_links
(downstream_op_name: str, upstream_op_name: str) → Optional[Set[chariots.versioning._version.Version]][source]¶
-
register_valid_link
(downstream_op: Optional[str], upstream_op: str, upstream_op_version: chariots.versioning._version.Version)[source]¶ registers a link between an upstream and a downstream op. This means that in future relaods the downstream op will whitelist this version for this upstream op
- Parameters
downstream_op – the op that needs to whitelist one of it’s inputs’ new version
upstream_op – the op that is getting whitelisted as one of the inputs of the downstream op
upstream_op_version – the valid version of the op that is getting whitelisted
- Returns
-
save
()[source]¶ persists all the metadata about ops and versions available in the store using the store’s saver.
The saved metadata can be found at /_meta.json from the saver’s route.
-
save_op_bytes
(op_to_save: chariots.base._base_op.BaseOp, version: chariots.versioning._version.Version, op_bytes: bytes)[source]¶ saves op_bytes of a specific op to the path /models/<op name>/<version>.
the version that is used here is the node version (and not the op_version) as nodes might be able to modify some behaviors of the versioning of their underlying op
- Parameters
op_to_save – the op that needs to be saved (this will not be saved as is - only the bytes)
version – the exact version to be used when persisting
op_bytes – the bytes of the op to save that will be persisted