chariots.base

The Base Module Gathers all the main classes in the Chariots framework that can be subclassed to create custom behaviors:

  • creating new Ops for preprocessing and feature extraction (subclassing BaseOp)

  • supporting new ML frameworks with BaseMLOp

  • creating a custom node (ABTesting, …) with the BaseNode

  • changing the execution behavior of pipelines (Multiprocessing, cluster computing, …) with BaseRunner

  • saving your ops and metadata to a different cloud provider with BaseSaver

  • creating new serialisation formats for datasets and models with BaseSerializer

class chariots.base.BaseOp(op_callbacks: Optional[List[chariots.callbacks._op_callback.OpCallBack]] = None)[source]

Bases: object

The ops are the atomic computation units of the Chariots framework. Whereas a Node represents a slot in a pipeline and the interactions between that spot and the rest of the pipeline, the op will actually be doing the computation.

To subclass the BaseOp class and create a new Op, you need to override the execute method:

>>> class AddOp(BaseOp):
...     number_to_add = 1
...
...     def execute(self, op_input):
...         return op_input + self.number_to_add

and then you can execute the op alone:

>>> AddOp().execute(3)
4

or within a pipeline (that can be deployed)

>>> pipeline = Pipeline([Node(AddOp(), ["__pipeline_input__"], "__pipeline_output__")], "simple_pipeline")
>>> runner.run(pipeline, 3)  # of course you can use a `Chariots` server to serve our pipeline and op(s)
4

The BaseOp class is a versioned class (see the versioning module for more info) so you can use VersionedField with it

>>> class AddOp(BaseOp):
...     number_to_add = VersionedField(3, VersionType.MAJOR)
...
...     def execute(self, op_input):
...         return op_input + self.number_to_add


>>> AddOp.__version__
<Version, major:36d3c, minor: 94e72, patch: 94e72>
>>> AddOp.number_to_add
3

and changing the field will change the version:

>>> class AddOp(BaseOp):
...     number_to_add = VersionedField(4, VersionType.MAJOR)
...
...     def execute(self, op_input):
...         return op_input + self.number_to_add


>>> AddOp.__version__
<Version, major:8ad66, minor: 94e72, patch: 94e72>
Parameters

op_callbacksOpCallbacks objects to change the behavior of the op by executing some action before or after the op’execution

after_execution(args: List[Any], output: Any) → Any[source]

method used to create a one-off (compared to using a callback) custom behavior that gets executed after the the op itself

Parameters
  • args – the arguments that were passed to the op

  • output – the output of the op

property allow_version_change

whether or not this op accepts to be loaded with the wrong version. this is usually False but is useful when loading an op for retraining

before_execution(args: List[Any])[source]

method used to create a one-off (compared to using a callback) custom behavior that gets executed before the the op itself

Parameters

args – the arguments that are going to be passed to the operation

execute(*args, **kwargs)[source]

main method to override. it defines the behavior of the op. In the pipeline the argument of the pipeline will be passed from the node with one argument per input (in the order of the input nodes)

execute_with_all_callbacks(args)[source]

executes the op itself alongside all it’s callbacks (op callbacks and before/after_execution methods)

Parameters

args – the arguments to be passed to the execute method of the op

Returns

the result of the op

property name

the name of the op. this is mainly use to find previous versions and saved ops of this op in the op_store

property op_version

the version the op uses to pass to the pipeline to identify itself. This differs from the __version__ method in that it can add some information besides the class Fields (for instance last training time for ML Ops)

class chariots.base.BaseMLOp(mode: chariots._ml_mode.MLMode, op_callbacks: Optional[List[chariots.callbacks._op_callback.OpCallBack]] = None)[source]

Bases: chariots.ops._loadable_op.LoadableOp

an BaseMLOp are ops designed specifically to be machine learning models (whether for training or inference). You can initialize the op in three distinctive ml mode:

  • FIT for training the model

  • PREDICT to perform inference

  • FIT_PREDICT to do both (train and predict on the same dataset

the usual workflow is to a have a training and a prediction pipeline. and to:

  • execute the training pipeline:

  • save the training pipeline

  • reload the prediction pipeline

  • use the prediction pipeline

here is an example:

first create your pipelines:

>>> train = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train')

>>> pred = Pipeline([
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')

and then to train your pipelines and make some predictions:

>>> client.call_pipeline(train)
>>> client.save_pipeline(train)
>>> client.load_pipeline(pred)
>>> client.call_pipeline(pred, [[1, 2, 3, 4]])
[1]

If you want to create a new MLOp class (to accommodate an unsupported framework for instance), you need to define:

  • how to fit your op with the fit method

  • how to perform inference with your op with the predict method

  • define how to initialize a new model with the _init_model method

and eventually you can change the serializer_cls class attribute to change the serialization format of your model

Parameters

op_callbacksOpCallbacks objects to change the behavior of the op by executing some action before or after the op’s execution

property allow_version_change

whether or not this op accepts to be loaded with the wrong version. this is usually False but is useful when loading an op for retraining

execute(*args, **kwargs)[source]

executes the model action that is required (train, test or both depending in what the op was initialized with

abstract fit(*args, **kwargs)[source]

fits the inner model of the op on data (in args and kwargs) this method must not return any data (use the FIT_PREDICT mode to predict on the same data the op was trained on)

load(serialized_object: bytes)[source]

Receives serialize bytes of a newer version of this class and sets the internals of he op accordingly.

Parameters

serialized_object – the serialized bytes of this op (as where outputed by the serialize method

property mode

the mode this op was instantiated with

property op_version

the version the op uses to pass to the pipeline to identify itself. This differs from the __version__ method in that it can add some information besides the class Fields (for instance last training time for ML Ops)

abstract predict(*args, **kwargs) → Any[source]

the method used to do predictions/inference once the model has been fitted/loaded

serialize() → bytes[source]

serializes the object into bytes (to be persisted with a Saver) to be reloaded in the future (you must ensure the compatibility with the load method

Returns

the serialized bytes representing this operation

serializer_cls

alias of chariots.serializers._dill_serializer.DillSerializer

training_update_version = 'patch'
class chariots.base.BaseRunner[source]

Bases: abc.ABC

a runner is used to define the execution behavior of a Pipeline. there main entry point is the run method

>>> runner.run(is_odd_pipeline, 3)
True

To create a new runner (for instance to execute your pipeline on a cluster) you only have to override run method and use the Pipeline’s class methods (for instance you might want to look at extract_results, execute_node)

abstract run(pipeline: chariots._pipeline.Pipeline, pipeline_input: Optional[Any] = None)[source]

runs a pipeline, provides it with the correct input and extracts the results if any

Parameters
  • pipeline – the pipeline to run

  • pipeline_input – the input to be given to the pipeline

Returns

the output of the graph called on the input if applicable

class chariots.base.BaseSaver(root_path: str)[source]

Bases: abc.ABC

abstraction of a file system used to persist/load assets and ops this can be used on the actual local file system of the machine the Chariots server is running or on a bottomless storage service (not implemented, PR welcome)

To create a new Saver class you only need to define the Save and Load behaviors

Parameters

root_path – the root path to use when mounting the saver (for instance the base path to use in the the file system when using the FileSaver)

load(path: str) → bytes[source]

loads the bytes serialized at a specific path

Parameters

path – the path to load the bytes from.You should not include the root_path of the saver in this path: loading to /foo/bar.txt on a saver with /my/root/path as root path will load /my/root/path/foo/bar.txt

Returns

saved bytes

save(serialized_object: bytes, path: str) → bool[source]

saves bytes to a specific path.

Parameters
  • serialized_object – the bytes to persist

  • path – the path to save the bytes to. You should not include the root_path of the saver in this path: saving to /foo/bar.txt on a saver with /my/root/path as root path will create/update /my/root/path/foo/bar.txt

Returns

whether or not the object was correctly serialized.

class chariots.base.BaseSerializer[source]

Bases: abc.ABC

serializers are helper classes for communication and persistence through out the Chariots framework. There mostly used by data nodes and and MLOps.

For instance if you want to make a pipeline that downloads the iris dataset splits it between train and test and use two different formats for the train and test (please don’t …):

>>> save_train_test = Pipeline([
...     Node(IrisDF(), output_nodes='df'),
...     Node(TrainTestSplit(), input_nodes=['df'], output_nodes=['train_df', 'test_df']),
...     DataSavingNode(serializer=CSVSerializer(), path='/train.csv', input_nodes=['train_df']),
...     DataSavingNode(serializer=DillSerializer(), path='/test.pkl', input_nodes=['test_df'])
... ], "save")

fot MLOps if you want to change the default serialization format (for the model to be saved), you will need to change the serializer_cls class attribute

abstract deserialize_object(serialized_object: bytes) → Any[source]

returns the deserialized object from serialized bytes (that will be loaded from a saver)

Parameters

serialized_object – the serialized bytes

Returns

the deserialized objects

abstract serialize_object(target: Any) → bytes[source]

serializes the object into bytes (for ml ops target will be the model itself and not the op, for the data ops the target will be the input of the node )

Parameters

target – the object that will be serialized

Returns

the bytes of the serialized object

class chariots.base.BaseNode(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]

Bases: abc.ABC

A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')

Here we are showing the behavior of nodes using the Node subclass (used with ops).

If you want to create your own Node you will need to define the

  • node_version property that gives the version of the node

  • name property

  • execute method that defines the execution behavior of your custom Node

  • load_latest_version that defines how to load the latest version of this node

Parameters
  • input_nodes – the input_nodes on which this node should be executed

  • output_nodes – an optional symbolic name for the outputs of this node (to be used by downstream nodes in the pipeline. If this node is the output of the pipeline use __pipeline_output__ or ReservedNodes.pipeline_output. If the output of the node should be split (for different downstream nodes to consume) use a list

check_version_compatibility(upstream_node: chariots.base._base_nodes.BaseNode, store_to_look_in: chariots._op_store.OpStore)[source]

checks that this node is compatible with a potentially new version of an upstream node`

Parameters
  • upstream_node – the upstream node to check for version compatibality with

  • store_to_look_in – the op_store to look for valid relationships between this node and upstream versions

Raises

VersionError – when the two nodes are not compatible

abstract execute(*params) → Any[source]

executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node

Parameters

params – the inputs provided by the input_nodes

Returns

the output(s) of the node

property is_loadable

whether or not this node can be loaded (this is used by pipelined to know which nodes to load

abstract load_latest_version(store_to_look_in: chariots._op_store.OpStore) → chariots.base._base_nodes.BaseNode[source]

reloads the latest available version of thid node by looking for all available versions in the OpStore

Parameters

store_to_look_in – the store to look for new versions and eventually for bytes of serialized ops

Returns

this node once it has been loaded

abstract property name

the name of the node

abstract property node_version

the version of this node

property output_references

the different outputs of this nodes

persist(store: chariots._op_store.OpStore, downstream_nodes: Optional[List[BaseNode]]) → chariots.versioning._version.Version[source]

persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class

Parameters
  • store – the store in which to store the node

  • downstream_nodes – the node(s) that are going to accept the current version of this node as upstream

replace_symbolic_references(symbolic_to_real_node: Mapping[str, NodeReference]) → chariots.base._base_nodes.BaseNode[source]

replaces all the symbolic references of this node: if an input_node or output_node was defined with a string by the user, it will try to find the node represented by this string.

Parameters

symbolic_to_real_node – the mapping of all NodeReference found so far in the pipeline

Raises

ValueError – if a node with multiple outputs was used directly (object used rather than strings)

Returns

this node with all it’s inputs and outputs as NodeReferences rather than strings

property require_saver

whether or not this node requires a saver to be executed this is usualy True by data nodes

property requires_runner

whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)