chariots.nodes¶
A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:
>>> train_logistics = Pipeline([
... Node(IrisFullDataSet(), output_nodes=["x", "y"]),
... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([
... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
-
class
chariots.nodes.
Node
(op: chariots.base._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.base._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.base._base_nodes.BaseNode]], str, chariots.base._base_nodes.BaseNode] = None)[source]¶ Bases:
chariots.base._base_nodes.BaseNode
Class that handles the interaction between a pipeline and an Op. it handles defining the nodes that are going to be used as the inputs of the op and how the output of the op should be reppresented for the rest of the pipeline.
>>> train_logistics = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([ ... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]), ... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__']) ... ], 'pred')
- Parameters
op – the op this Node represents
input_nodes – the input_nodes that are going to be used as inputs of the inner op the node, the inputs will be given to the op in the order they are defined in this argument.
output_nodes – a symbolic name for the the output(s) of the op, if the op returns a tuple output_noes should be the same length as said tuple
-
check_version_compatibility
(upstream_node: chariots.base._base_nodes.BaseNode, store_to_look_in: chariots._op_store.OpStore)[source]¶ checks that this node is compatible with a potentially new version of an upstream node`
- Parameters
upstream_node – the upstream node to check for version compatibality with
store_to_look_in – the op_store to look for valid relationships between this node and upstream versions
- Raises
VersionError – when the two nodes are not compatible
-
execute
(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]¶ executes the underlying op on params
- Parameters
runner – runner that can be provided if the node needs one (mostly if node is a pipeline)
params – the inputs of the underlying op
- Raises
ValueError – if the runner is not provided but needed
- Returns
the output of the op
-
property
is_loadable
¶ - Returns
whether or not this node and its inner op can be loaded
-
load_latest_version
(store_to_look_in: chariots._op_store.OpStore) → Optional[chariots.base._base_nodes.BaseNode][source]¶ reloads the latest version of the op this node represents by looking for available versions in the store
- Parameters
store_to_look_in – the store to look for new versions in
- Returns
the reloaded node if any older versions where found in the store otherwise None
-
property
name
¶ the name of the node. by default this will be the name of the underlying op.
-
property
node_version
¶ the version of this node
-
persist
(store: chariots._op_store.OpStore, downstream_nodes: Optional[List[BaseNode]]) → Optional[chariots.versioning._version.Version][source]¶ persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class
- Parameters
store – the store in which to store the node
downstream_nodes – the node(s) that are going to accept the current version of this node as upstream
-
property
requires_runner
¶ whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)
-
class
chariots.nodes.
DataLoadingNode
(serializer: chariots.base._base_serializer.BaseSerializer, path: str, output_nodes=None, name: Optional[str] = None, saver: Optional[chariots.base._base_saver.BaseSaver] = None)[source]¶ Bases:
chariots.nodes._data_node.DataNode
a node for loading data from the ap’s saver (if used in an app, otherwise use the attach_save method to define this node’s saver).
You can use this node like any other node except that it doesn’t take a input_nodes parameters
>>> load_and_analyse_iris = Pipeline([ ... DataLoadingNode(serializer=CSVSerializer(), path='/train.csv', output_nodes=["train_df"]), ... Node(AnalyseDataSetOp(), input_nodes=["train_df"], output_nodes=["__pipeline_output__"]), ... ], "analyse")
then you can prepare the pipeline (which attaches the saver) and run the pipeline
>>> load_and_analyse_iris.prepare(saver) >>> runner.run(load_and_analyse_iris) Counter({1: 39, 2: 38, 0: 35})
- Parameters
saver – the saver to use for loading or saving data (if not specified at init, you can use the attach_saver method
serializer – the serializer to use to load the dat
path – the path to load the data from
output_nodes – an optional symbolic name for the node to be called by other node. If this node is the output of the pipeline use “pipeline_output” or ReservedNodes.pipeline_output
name – the name of the op
-
execute
(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]¶ executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node
- Parameters
params – the inputs provided by the input_nodes
- Returns
the output(s) of the node
-
property
node_version
¶ the version of this node
-
class
chariots.nodes.
DataSavingNode
(serializer: chariots.base._base_serializer.BaseSerializer, path: str, input_nodes: Optional[List[Union[AnyStr, Node]]], name: Optional[str] = None, saver: Optional[chariots.base._base_saver.BaseSaver] = None)[source]¶ Bases:
chariots.nodes._data_node.DataNode
a node for saving data into the app’s Saver (if used in an app, otherwise use the attach_save method to define this node’s saver).
You can use this node like any other node except that it doesn’t take a input_nodes parameters
>>> save_train_test = Pipeline([ ... Node(IrisDF(), output_nodes='df'), ... Node(TrainTestSplit(), input_nodes=['df'], output_nodes=['train_df', 'test_df']), ... DataSavingNode(serializer=CSVSerializer(), path='/train.csv', input_nodes=['train_df']), ... DataSavingNode(serializer=DillSerializer(), path='/test.pkl', input_nodes=['test_df']) ... ], "save")
you can then use the prepare method of the pipeline to attach a saver to our various DataNodes and run the pipeline like any other
>>> save_train_test.prepare(saver) >>> runner.run(save_train_test)
- Parameters
saver – the saver to use for loading or saving data (if not specified at init, you can use the attach_saver method
serializer – the serializer to use to load the dat
path – the path to load the data from
input_nodes – the data that needs to be saved
name – the name of the op
-
execute
(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]¶ executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node
- Parameters
params – the inputs provided by the input_nodes
- Returns
the output(s) of the node
-
property
node_version
¶ the version of this node