chariots.nodes¶
A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:
>>> train_logistics = Pipeline([
... Node(IrisFullDataSet(), output_nodes=["x", "y"]),
... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"])
... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([
... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]),
... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"),
... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__'])
... ], 'pred')
-
class
chariots.pipelines.nodes.
Node
(op: chariots.pipelines.ops._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]], str, chariots.pipelines.nodes._base_nodes.BaseNode] = None)[source]¶ Bases:
chariots.pipelines.nodes._base_nodes.BaseNode
Class that handles the interaction between a pipeline and an Op. it handles defining the nodes that are going to be used as the inputs of the op and how the output of the op should be reppresented for the rest of the pipeline.
>>> train_logistics = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([ ... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]), ... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__']) ... ], 'pred')
- Parameters
op – the op this Node represents
input_nodes – the input_nodes that are going to be used as inputs of the inner op the node, the inputs will be given to the op in the order they are defined in this argument.
output_nodes – a symbolic name for the the output(s) of the op, if the op returns a tuple output_noes should be the same length as said tuple
-
__init__
(op: chariots.pipelines.ops._base_op.BaseOp, input_nodes: Optional[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]]] = None, output_nodes: Union[List[Union[str, chariots.pipelines.nodes._base_nodes.BaseNode]], str, chariots.pipelines.nodes._base_nodes.BaseNode] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
check_version_compatibility
(upstream_node: chariots.pipelines.nodes._base_nodes.BaseNode, store_to_look_in: chariots.op_store._op_store_client.OpStoreClient)[source]¶ checks that this node is compatible with a potentially new version of an upstream node`
- Parameters
upstream_node – the upstream node to check for version compatibality with
store_to_look_in – the op_store_client to look for valid relationships between this node and upstream versions
- Raises
VersionError – when the two nodes are not compatible
-
execute
(params: List[Any], runner: Optional[chariots.pipelines.runners._base_runner.BaseRunner] = None) → Any[source]¶ executes the underlying op on params
- Parameters
runner – runner that can be provided if the node needs one (mostly if node is a pipeline)
params – the inputs of the underlying op
- Raises
ValueError – if the runner is not provided but needed
- Returns
the output of the op
-
property
is_loadable
¶ - Returns
whether or not this node and its inner op can be loaded
-
load_latest_version
(store_to_look_in: chariots.op_store._op_store_client.OpStoreClient) → Optional[chariots.pipelines.nodes._base_nodes.BaseNode][source]¶ reloads the latest version of the op this node represents by looking for available versions in the store
- Parameters
store_to_look_in – the store to look for new versions in
- Returns
the reloaded node if any older versions where found in the store otherwise None
-
property
name
¶ the name of the node. by default this will be the name of the underlying op.
-
property
node_version
¶ the version of this node
-
persist
(store: chariots.op_store._op_store_client.OpStoreClient, downstream_nodes: Optional[List[BaseNode]]) → Optional[chariots.versioning._version.Version][source]¶ persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class
- Parameters
store – the store in which to store the node
downstream_nodes – the node(s) that are going to accept the current version of this node as upstream
-
property
requires_runner
¶ whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)
-
class
chariots.pipelines.nodes.
ReservedNodes
[source]¶ Bases:
enum.Enum
enum of reserved node names
-
pipeline_input
= '__pipeline_input__'¶
-
pipeline_output
= '__pipeline_output__'¶
-
property
reference
¶ the output references of the reserved nodes
-
-
class
chariots.pipelines.nodes.
BaseNode
(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]¶ Bases:
abc.ABC
A node represents a step in a Pipeline. It is linked to one or several inputs and can produce one or several ouptuts:
>>> train_logistics = Pipeline([ ... Node(IrisFullDataSet(), output_nodes=["x", "y"]), ... Node(PCAOp(MLMode.FIT_PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.FIT), input_nodes=["x_transformed", "y"]) ... ], 'train_logistics')
you can also link the first and/or the last node of your pipeline to the pipeline input and output:
>>> pred = Pipeline([ ... Node(IrisFullDataSet(),input_nodes=['__pipeline_input__'], output_nodes=["x"]), ... Node(PCAOp(MLMode.PREDICT), input_nodes=["x"], output_nodes="x_transformed"), ... Node(LogisticOp(MLMode.PREDICT), input_nodes=["x_transformed"], output_nodes=['__pipeline_output__']) ... ], 'pred')
Here we are showing the behavior of nodes using the Node subclass (used with ops).
If you want to create your own Node you will need to define the
node_version property that gives the version of the node
name property
execute method that defines the execution behavior of your custom Node
load_latest_version that defines how to load the latest version of this node
- Parameters
input_nodes – the input_nodes on which this node should be executed
output_nodes – an optional symbolic name for the outputs of this node (to be used by downstream nodes in the pipeline. If this node is the output of the pipeline use __pipeline_output__ or ReservedNodes.pipeline_output. If the output of the node should be split (for different downstream nodes to consume) use a list
-
__init__
(input_nodes: Optional[List[Union[str, BaseNode]]] = None, output_nodes: Union[List[str], str] = None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
check_version_compatibility
(upstream_node: chariots.pipelines.nodes._base_nodes.BaseNode, store_to_look_in: chariots.op_store._op_store_client.OpStoreClient)[source]¶ checks that this node is compatible with a potentially new version of an upstream node`
- Parameters
upstream_node – the upstream node to check for version compatibality with
store_to_look_in – the op_store_client to look for valid relationships between this node and upstream versions
- Raises
VersionError – when the two nodes are not compatible
-
abstract
execute
(*params) → Any[source]¶ executes the computation represented byt this node (loads/saves dataset for dataset nodes, executes underlyin op for Node
- Parameters
params – the inputs provided by the input_nodes
- Returns
the output(s) of the node
-
property
is_loadable
¶ whether or not this node can be loaded (this is used by pipelined to know which nodes to load
-
abstract
load_latest_version
(store_to_look_in: chariots.op_store._op_store_client.OpStoreClient) → chariots.pipelines.nodes._base_nodes.BaseNode[source]¶ reloads the latest available version of thid node by looking for all available versions in the OpStore
- Parameters
store_to_look_in – the store to look for new versions and eventually for bytes of serialized ops
- Returns
this node once it has been loaded
-
abstract property
name
¶ the name of the node
-
abstract property
node_version
¶ the version of this node
-
property
output_references
¶ the different outputs of this nodes
-
persist
(store: chariots.op_store._op_store_client.OpStoreClient, downstream_nodes: Optional[List[BaseNode]]) → chariots.versioning._version.Version[source]¶ persists this nodes’s data (usually this means saving the serialized bytes of the inner op of this node (for the Node class
- Parameters
store – the store in which to store the node
downstream_nodes – the node(s) that are going to accept the current version of this node as upstream
-
replace_symbolic_references
(symbolic_to_real_node: Mapping[str, NodeReference]) → chariots.pipelines.nodes._base_nodes.BaseNode[source]¶ replaces all the symbolic references of this node: if an input_node or output_node was defined with a string by the user, it will try to find the node represented by this string.
- Parameters
symbolic_to_real_node – the mapping of all NodeReference found so far in the pipeline
- Raises
ValueError – if a node with multiple outputs was used directly (object used rather than strings)
- Returns
this node with all it’s inputs and outputs as NodeReferences rather than strings
-
property
require_saver
¶ whether or not this node requires a saver to be executed this is usualy True by data nodes
-
property
requires_runner
¶ whether or not this node requires a runner to be executed (typically if the inner op is a pipelines)