chariots.nodes

A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
class chariots.nodes.Node(op: chariots.base._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.base._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.base._base_nodes.BaseNode]], str, chariots.base._base_nodes.BaseNode] = None)[source]

Bases: chariots.base._base_nodes.BaseNode

Class that handles the interaction between a pipeline and an Op. it handles defining the nodes that are going to be used as the inputs of the op and how the output of the op should be reppresented for the rest of the pipeline.

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
Parameters
  • op – the op this Node represents

  • input_nodes – the input_nodes that are going to be used as inputs of the inner op the node, the inputs will be given to the op in the order they are defined in this argument.

  • output_nodes – a symbolic name for the the output(s) of the op, if the op returns a tuple output_noes should be the same length as said tuple

check_version_compatibility(upstream_node: chariots.base._base_nodes.BaseNode, store_to_look_in: chariots._op_store.OpStore)[source]

checks that this node is compatible with a potentially new version of an upstream node`

Parameters
  • upstream_node – the upstream node to check for version compatibality with

  • store_to_look_in – the op_store to look for valid relationships between this node and upstream versions

Raises

VersionError – when the two nodes are not compatible

execute(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]

executes the underlying op on params

Parameters
  • runner – runner that can be provided if the node needs one (mostly if node is a pipeline)

  • params – the inputs of the underlying op

Raises

ValueError – if the runner is not provided but needed

Returns

the output of the op

property is_loadable
Returns

whether or not this node and its inner op can be loaded

load_latest_version(store_to_look_in: chariots._op_store.OpStore) → Optional[chariots.base._base_nodes.BaseNode][source]

reloads the latest version of the op this node represents by looking for available versions in the store

Parameters

store_to_look_in – the store to look for new versions in

Returns

the reloaded node if any older versions where found in the store otherwise None

property name

the name of the node. by default this will be the name of the underlying op.

property node_version

the version of this node

persist(store: chariots._op_store.OpStore, downstream_nodes: Optional[List[BaseNode]]) → Optional[chariots.versioning._version.Version][source]

persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class

Parameters
  • store – the store in which to store the node

  • downstream_nodes – the node(s) that are going to accept the current version of this node as upstream

property requires_runner

whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)

class chariots.nodes.DataLoadingNode(serializer: chariots.base._base_serializer.BaseSerializer, path: str, output_nodes=None, name: Optional[str] = None, saver: Optional[chariots.base._base_saver.BaseSaver] = None)[source]

Bases: chariots.nodes._data_node.DataNode

a node for loading data from the ap’s saver (if used in an app, otherwise use the attach_save method to define this node’s saver).

You can use this node like any other node except that it doesn’t take a input_nodes parameters

>>> load_and_analyse_iris = Pipeline([
...     DataLoadingNode(serializer=CSVSerializer(), path='/train.csv', output_nodes=["train_df"]),
...     Node(AnalyseDataSetOp(), input_nodes=["train_df"], output_nodes=["__pipeline_output__"]),
... ], "analyse")

then you can prepare the pipeline (which attaches the saver) and run the pipeline

>>> load_and_analyse_iris.prepare(saver)
>>> runner.run(load_and_analyse_iris)
Counter({1: 39, 2: 38, 0: 35})
Parameters
  • saver – the saver to use for loading or saving data (if not specified at init, you can use the attach_saver method

  • serializer – the serializer to use to load the dat

  • path – the path to load the data from

  • output_nodes – an optional symbolic name for the node to be called by other node. If this node is the output of the pipeline use “pipeline_output” or ReservedNodes.pipeline_output

  • name – the name of the op

execute(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]

executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node

Parameters

params – the inputs provided by the input_nodes

Returns

the output(s) of the node

property node_version

the version of this node

class chariots.nodes.DataSavingNode(serializer: chariots.base._base_serializer.BaseSerializer, path: str, input_nodes: Optional[List[Union[AnyStr, Node]]], name: Optional[str] = None, saver: Optional[chariots.base._base_saver.BaseSaver] = None)[source]

Bases: chariots.nodes._data_node.DataNode

a node for saving data into the app’s Saver (if used in an app, otherwise use the attach_save method to define this node’s saver).

You can use this node like any other node except that it doesn’t take a input_nodes parameters

>>> save_train_test = Pipeline([
...     Node(IrisDF(), output_nodes='df'),
...     Node(TrainTestSplit(), input_nodes=['df'], output_nodes=['train_df', 'test_df']),
...     DataSavingNode(serializer=CSVSerializer(), path='/train.csv', input_nodes=['train_df']),
...     DataSavingNode(serializer=DillSerializer(), path='/test.pkl', input_nodes=['test_df'])
... ], "save")

you can then use the prepare method of the pipeline to attach a saver to our various DataNodes and run the pipeline like any other

>>> save_train_test.prepare(saver)
>>> runner.run(save_train_test)
Parameters
  • saver – the saver to use for loading or saving data (if not specified at init, you can use the attach_saver method

  • serializer – the serializer to use to load the dat

  • path – the path to load the data from

  • input_nodes – the data that needs to be saved

  • name – the name of the op

execute(params: List[Any], runner: Optional[chariots.base._base_runner.BaseRunner] = None) → Any[source]

executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node

Parameters

params – the inputs provided by the input_nodes

Returns

the output(s) of the node

property node_version

the version of this node

class chariots.nodes.ReservedNodes[source]

Bases: enum.Enum

enum of reserved node names

pipeline_input = '__pipeline_input__'
pipeline_output = '__pipeline_output__'
property reference

the output references of the reserved nodes