chariots.base¶
The Base Module Gathers all the main classes in the Chariots framework that can be subclassed to create custom behaviors:
creating new Ops for preprocessing and feature extraction (subclassing BaseOp)
supporting new ML frameworks with BaseMLOp
creating a custom node (ABTesting, …) with the BaseNode
changing the execution behavior of pipelines (Multiprocessing, cluster computing, …) with BaseRunner
saving your ops and metadata to a different cloud provider with BaseSaver
creating new serialisation formats for datasets and models with BaseSerializer
-
class
chariots.base.
BaseOp
(op_callbacks: Optional[List[chariots.callbacks._op_callback.OpCallBack]] = None)[source]¶ Bases:
object
The ops are the atomic computation units of the Chariots framework. Whereas a Node represents a slot in a pipeline and the interactions between that spot and the rest of the pipeline, the op will actually be doing the computation.
To subclass the BaseOp class and create a new Op, you need to override the execute method:
>>> class AddOp(BaseOp): ... number_to_add = 1 ... ... def execute(self, op_input): ... return op_input + self.number_to_add
and then you can execute the op alone:
>>> AddOp().execute(3) 4
or within a pipeline (that can be deployed)
>>> pipeline = Pipeline([Node(AddOp(), ["__pipeline_input__"], "__pipeline_output__")], "simple_pipeline") >>> runner.run(pipeline, 3) # of course you can use a `Chariots` server to serve our pipeline and op(s) 4
The BaseOp class is a versioned class (see the versioning module for more info) so you can use VersionedField with it
>>> class AddOp(BaseOp): ... number_to_add = VersionedField(3, VersionType.MAJOR) ... ... def execute(self, op_input): ... return op_input + self.number_to_add >>> AddOp.__version__ <Version, major:36d3c, minor: 94e72, patch: 94e72> >>> AddOp.number_to_add 3
and changing the field will change the version:
>>> class AddOp(BaseOp): ... number_to_add = VersionedField(4, VersionType.MAJOR) ... ... def execute(self, op_input): ... return op_input + self.number_to_add >>> AddOp.__version__ <Version, major:8ad66, minor: 94e72, patch: 94e72>
- Parameters
op_callbacks – OpCallbacks objects to change the behavior of the op by executing some action before or after the op’execution
-
after_execution
(args: List[Any], output: Any) → Any[source]¶ method used to create a one-off (compared to using a callback) custom behavior that gets executed after the the op itself
- Parameters
args – the arguments that were passed to the op
output – the output of the op
-
property
allow_version_change
¶ whether or not this op accepts to be loaded with the wrong version. this is usually False but is useful when loading an op for retraining
-
before_execution
(args: List[Any])[source]¶ method used to create a one-off (compared to using a callback) custom behavior that gets executed before the the op itself
- Parameters
args – the arguments that are going to be passed to the operation
-
execute
(*args, **kwargs)[source]¶ main method to override. it defines the behavior of the op. In the pipeline the argument of the pipeline will be passed from the node with one argument per input (in the order of the input nodes)
-
execute_with_all_callbacks
(args)[source]¶ executes the op itself alongside all it’s callbacks (op callbacks and before/after_execution methods)
- Parameters
args – the arguments to be passed to the execute method of the op
- Returns
the result of the op
-
property
name
¶ the name of the op. this is mainly use to find previous versions and saved ops of this op in the op_store
-
property
op_version
¶ the version the op uses to pass to the pipeline to identify itself. This differs from the __version__ method in that it can add some information besides the class Fields (for instance last training time for ML Ops)
-
class
chariots.base.
BaseMLOp
(mode: chariots._ml_mode.MLMode, op_callbacks: Optional[List[chariots.callbacks._op_callback.OpCallBack]] = None)[source]¶ Bases:
chariots.ops._loadable_op.LoadableOp
an BaseMLOp are ops designed specifically to be machine learning models (whether for training or inference). You can initialize the op in three distinctive ml mode:
FIT for training the model
PREDICT to perform inference
FIT_PREDICT to do both (train and predict on the same dataset
the usual workflow is to a have a training and a prediction pipeline. and to:
execute the training pipeline:
save the training pipeline
reload the prediction pipeline
use the prediction pipeline
here is an example:
first create your pipelines:
>>> train = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train') >>> pred = Pipeline([ ... Node(PCAOp(MLMode.PREDICT), input_nodes=["__pipeline_input__"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__']) ... ], 'pred')
and then to train your pipelines and make some predictions:
>>> client.call_pipeline(train) >>> client.save_pipeline(train) >>> client.load_pipeline(pred) >>> client.call_pipeline(pred, [[1, 2, 3, 4]]) [1]
If you want to create a new MLOp class (to accommodate an unsupported framework for instance), you need to define:
how to fit your op with the fit method
how to perform inference with your op with the predict method
define how to initialize a new model with the _init_model method
and eventually you can change the serializer_cls class attribute to change the serialization format of your model
- Parameters
op_callbacks – OpCallbacks objects to change the behavior of the op by executing some action before or after the op’s execution
-
property
allow_version_change
¶ whether or not this op accepts to be loaded with the wrong version. this is usually False but is useful when loading an op for retraining
-
execute
(*args, **kwargs)[source]¶ executes the model action that is required (train, test or both depending in what the op was initialized with
-
abstract
fit
(*args, **kwargs)[source]¶ fits the inner model of the op on data (in args and kwargs) this method must not return any data (use the FIT_PREDICT mode to predict on the same data the op was trained on)
-
load
(serialized_object: bytes)[source]¶ Receives serialize bytes of a newer version of this class and sets the internals of he op accordingly.
- Parameters
serialized_object – the serialized bytes of this op (as where outputed by the serialize method
-
property
mode
¶ the mode this op was instantiated with
-
property
op_version
¶ the version the op uses to pass to the pipeline to identify itself. This differs from the __version__ method in that it can add some information besides the class Fields (for instance last training time for ML Ops)
-
abstract
predict
(*args, **kwargs) → Any[source]¶ the method used to do predictions/inference once the model has been fitted/loaded
-
serialize
() → bytes[source]¶ serializes the object into bytes (to be persisted with a Saver) to be reloaded in the future (you must ensure the compatibility with the load method
- Returns
the serialized bytes representing this operation
-
serializer_cls
¶ alias of
chariots.serializers._dill_serializer.DillSerializer
-
training_update_version
= 'patch'¶
-
class
chariots.base.
BaseRunner
[source]¶ Bases:
abc.ABC
a runner is used to define the execution behavior of a Pipeline. there main entry point is the run method
>>> runner.run(is_odd_pipeline, 3) True
To create a new runner (for instance to execute your pipeline on a cluster) you only have to override run method and use the Pipeline’s class methods (for instance you might want to look at extract_results, execute_node)
-
abstract
run
(pipeline: chariots._pipeline.Pipeline, pipeline_input: Optional[Any] = None)[source]¶ runs a pipeline, provides it with the correct input and extracts the results if any
- Parameters
pipeline – the pipeline to run
pipeline_input – the input to be given to the pipeline
- Returns
the output of the graph called on the input if applicable
-
abstract
-
class
chariots.base.
BaseSaver
(root_path: str)[source]¶ Bases:
abc.ABC
abstraction of a file system used to persist/load assets and ops this can be used on the actual local file system of the machine the Chariots server is running or on a bottomless storage service (not implemented, PR welcome)
To create a new Saver class you only need to define the Save and Load behaviors
- Parameters
root_path – the root path to use when mounting the saver (for instance the base path to use in the the file system when using the FileSaver)
-
load
(path: str) → bytes[source]¶ loads the bytes serialized at a specific path
- Parameters
path – the path to load the bytes from.You should not include the root_path of the saver in this path: loading to /foo/bar.txt on a saver with /my/root/path as root path will load /my/root/path/foo/bar.txt
- Returns
saved bytes
-
save
(serialized_object: bytes, path: str) → bool[source]¶ saves bytes to a specific path.
- Parameters
serialized_object – the bytes to persist
path – the path to save the bytes to. You should not include the root_path of the saver in this path: saving to /foo/bar.txt on a saver with /my/root/path as root path will create/update /my/root/path/foo/bar.txt
- Returns
whether or not the object was correctly serialized.
-
class
chariots.base.
BaseSerializer
[source]¶ Bases:
abc.ABC
serializers are helper classes for communication and persistence through out the Chariots framework. There mostly used by data nodes and and MLOps.
For instance if you want to make a pipeline that downloads the iris dataset splits it between train and test and use two different formats for the train and test (please don’t …):
>>> save_train_test = Pipeline([ ... Node(IrisDF(), output_nodes='df'), ... Node(TrainTestSplit(), input_nodes=['df'], output_nodes=['train_df', 'test_df']), ... DataSavingNode(serializer=CSVSerializer(), path='/train.csv', input_nodes=['train_df']), ... DataSavingNode(serializer=DillSerializer(), path='/test.pkl', input_nodes=['test_df']) ... ], "save")
fot MLOps if you want to change the default serialization format (for the model to be saved), you will need to change the serializer_cls class attribute
-
abstract
deserialize_object
(serialized_object: bytes) → Any[source]¶ returns the deserialized object from serialized bytes (that will be loaded from a saver)
- Parameters
serialized_object – the serialized bytes
- Returns
the deserialized objects
-
abstract
serialize_object
(target: Any) → bytes[source]¶ serializes the object into bytes (for ml ops target will be the model itself and not the op, for the data ops the target will be the input of the node )
- Parameters
target – the object that will be serialized
- Returns
the bytes of the serialized object
-
abstract
-
class
chariots.base.
BaseNode
(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]¶ Bases:
abc.ABC
A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:
>>> train_logistics = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([ ... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]), ... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__']) ... ], 'pred')
Here we are showing the behavior of nodes using the Node subclass (used with ops).
If you want to create your own Node you will need to define the
node_version property that gives the version of the node
name property
execute method that defines the execution behavior of your custom Node
load_latest_version that defines how to load the latest version of this node
- Parameters
input_nodes – the input_nodes on which this node should be executed
output_nodes – an optional symbolic name for the outputs of this node (to be used by downstream nodes in the pipeline. If this node is the output of the pipeline use __pipeline_output__ or ReservedNodes.pipeline_output. If the output of the node should be split (for different downstream nodes to consume) use a list
-
check_version_compatibility
(upstream_node: chariots.base._base_nodes.BaseNode, store_to_look_in: chariots._op_store.OpStore)[source]¶ checks that this node is compatible with a potentially new version of an upstream node`
- Parameters
upstream_node – the upstream node to check for version compatibality with
store_to_look_in – the op_store to look for valid relationships between this node and upstream versions
- Raises
VersionError – when the two nodes are not compatible
-
abstract
execute
(*params) → Any[source]¶ executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node
- Parameters
params – the inputs provided by the input_nodes
- Returns
the output(s) of the node
-
property
is_loadable
¶ whether or not this node can be loaded (this is used by pipelined to know which nodes to load
-
abstract
load_latest_version
(store_to_look_in: chariots._op_store.OpStore) → chariots.base._base_nodes.BaseNode[source]¶ reloads the latest available version of thid node by looking for all available versions in the OpStore
- Parameters
store_to_look_in – the store to look for new versions and eventually for bytes of serialized ops
- Returns
this node once it has been loaded
-
abstract property
name
¶ the name of the node
-
abstract property
node_version
¶ the version of this node
-
property
output_references
¶ the different outputs of this nodes
-
persist
(store: chariots._op_store.OpStore, downstream_nodes: Optional[List[BaseNode]]) → chariots.versioning._version.Version[source]¶ persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class
- Parameters
store – the store in which to store the node
downstream_nodes – the node(s) that are going to accept the current version of this node as upstream
-
replace_symbolic_references
(symbolic_to_real_node: Mapping[str, NodeReference]) → chariots.base._base_nodes.BaseNode[source]¶ replaces all the symbolic references of this node: if an input_node or output_node was defined with a string by the user, it will try to find the node represented by this string.
- Parameters
symbolic_to_real_node – the mapping of all NodeReference found so far in the pipeline
- Raises
ValueError – if a node with multiple outputs was used directly (object used rather than strings)
- Returns
this node with all it’s inputs and outputs as NodeReferences rather than strings
-
property
require_saver
¶ whether or not this node requires a saver to be executed this is usualy True by data nodes
-
property
requires_runner
¶ whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)