chariots.nodes

A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
class chariots.pipelines.nodes.Node(op: chariots.pipelines.ops._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]], str, chariots.pipelines.nodes._base_nodes.BaseNode] = None)[source]

Bases: chariots.pipelines.nodes._base_nodes.BaseNode

Class that handles the interaction between a pipeline and an Op. it handles defining the nodes that are going to be used as the inputs of the op and how the output of the op should be reppresented for the rest of the pipeline.

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
Parameters
  • op – the op this Node represents

  • input_nodes – the input_nodes that are going to be used as inputs of the inner op the node, the inputs will be given to the op in the order they are defined in this argument.

  • output_nodes – a symbolic name for the the output(s) of the op, if the op returns a tuple output_noes should be the same length as said tuple

__init__(op: chariots.pipelines.ops._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]], str, chariots.pipelines.nodes._base_nodes.BaseNode] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

check_version_compatibility(upstream_node: chariots.pipelines.nodes._base_nodes.BaseNode, store_to_look_in: chariots.op_store._op_store_client.OpStoreClient)[source]

checks that this node is compatible with a potentially new version of an upstream node`

Parameters
  • upstream_node – the upstream node to check for version compatibality with

  • store_to_look_in – the op_store_client to look for valid relationships between this node and upstream versions

Raises

VersionError – when the two nodes are not compatible

execute(params: List[Any], runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None) → Any[source]

executes the underlying op on params

Parameters
  • runner – runner that can be provided if the node needs one (mostly if node is a pipeline)

  • params – the inputs of the underlying op

Raises

ValueError – if the runner is not provided but needed

Returns

the output of the op

property is_loadable
Returns

whether or not this node and its inner op can be loaded

load_latest_version(store_to_look_in: chariots.op_store._op_store_client.OpStoreClient) → Optional[chariots.pipelines.nodes._base_nodes.BaseNode][source]

reloads the latest version of the op this node represents by looking for available versions in the store

Parameters

store_to_look_in – the store to look for new versions in

Returns

the reloaded node if any older versions where found in the store otherwise None

property name

the name of the node. by default this will be the name of the underlying op.

property node_version

the version of this node

persist(store: chariots.op_store._op_store_client.OpStoreClient, downstream_nodes: Optional[List[BaseNode]]) → Optional[chariots.versioning._version.Version][source]

persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class

Parameters
  • store – the store in which to store the node

  • downstream_nodes – the node(s) that are going to accept the current version of this node as upstream

property requires_runner

whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)

class chariots.pipelines.nodes.ReservedNodes[source]

Bases: enum.Enum

enum of reserved node names

pipeline_input = '__pipeline_input__'
pipeline_output = '__pipeline_output__'
property reference

the output references of the reserved nodes

class chariots.pipelines.nodes.BaseNode(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]

Bases: abc.ABC

A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:

>>> train_logistics = Pipeline([
...     Node(IrisFullDataSet(), output_nodes=["x", "y"]),
...     Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')

you can also link the first and/or the last node of your pipeline to the pipeline input and output:

>>> pred = Pipeline([
...     Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
...     Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
...     Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')

Here we are showing the behavior of nodes using the Node subclass (used with ops).

If you want to create your own Node you will need to define the

  • node_version property that gives the version of the node

  • name property

  • execute method that defines the execution behavior of your custom Node

  • load_latest_version that defines how to load the latest version of this node

Parameters
  • input_nodes – the input_nodes on which this node should be executed

  • output_nodes – an optional symbolic name for the outputs of this node (to be used by downstream nodes in the pipeline. If this node is the output of the pipeline use __pipeline_output__ or ReservedNodes.pipeline_output. If the output of the node should be split (for different downstream nodes to consume) use a list

__init__(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]

Initialize self. See help(type(self)) for accurate signature.

check_version_compatibility(upstream_node: chariots.pipelines.nodes._base_nodes.BaseNode, store_to_look_in: chariots.op_store._op_store_client.OpStoreClient)[source]

checks that this node is compatible with a potentially new version of an upstream node`

Parameters
  • upstream_node – the upstream node to check for version compatibality with

  • store_to_look_in – the op_store_client to look for valid relationships between this node and upstream versions

Raises

VersionError – when the two nodes are not compatible

abstract execute(*params) → Any[source]

executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node

Parameters

params – the inputs provided by the input_nodes

Returns

the output(s) of the node

property is_loadable

whether or not this node can be loaded (this is used by pipelined to know which nodes to load

abstract load_latest_version(store_to_look_in: chariots.op_store._op_store_client.OpStoreClient) → chariots.pipelines.nodes._base_nodes.BaseNode[source]

reloads the latest available version of thid node by looking for all available versions in the OpStore

Parameters

store_to_look_in – the store to look for new versions and eventually for bytes of serialized ops

Returns

this node once it has been loaded

abstract property name

the name of the node

abstract property node_version

the version of this node

property output_references

the different outputs of this nodes

persist(store: chariots.op_store._op_store_client.OpStoreClient, downstream_nodes: Optional[List[BaseNode]]) → chariots.versioning._version.Version[source]

persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class

Parameters
  • store – the store in which to store the node

  • downstream_nodes – the node(s) that are going to accept the current version of this node as upstream

replace_symbolic_references(symbolic_to_real_node: Mapping[str, NodeReference]) → chariots.pipelines.nodes._base_nodes.BaseNode[source]

replaces all the symbolic references of this node: if an input_node or output_node was defined with a string by the user, it will try to find the node represented by this string.

Parameters

symbolic_to_real_node – the mapping of all NodeReference found so far in the pipeline

Raises

ValueError – if a node with multiple outputs was used directly (object used rather than strings)

Returns

this node with all it’s inputs and outputs as NodeReferences rather than strings

property require_saver

whether or not this node requires a saver to be executed this is usualy True by data nodes

property requires_runner

whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)