chariots.callbacks

Callbacks are use to change the default behavior of an op or a pipeline in a reusable way, you can create callbacks to log performance or timing check output distribution or what ever you need around the pipeline or the ops execution.

There are two main types of callbacks:

  • operation callbacks that give ou entry points before and after the execution of this specific op

  • pipeline callback that give you entry points before and after the execution of the pipeline and in between each node

the order of execution of the callbacks are as follows:

  • pipeline callbacks’ before_execution

  • pipeline callbacks’ before_node_execution (for each node)

  • op callbacks’ before_execution

  • op’ before_execution method

  • op’s execute method

  • op’s after_execution method

  • op callbacks’ after_execution

  • pipeline callbacks’ after_node_execution

During the pipeline’s execution, the inputs and outputs of the execution are being provided (when applicable), these are provided for information, DO NOT TRY TO MODIFY those (this is undefined behavior)

class chariots.pipelines.callbacks.OpCallBack[source]

Bases: object

an op callback is used to perform specific instructions at certain points around the operation’s execution

to create your own op callback, you need to override either the before_execution or the after_execution method ( or both)

>>> class PrintOpName(OpCallBack):
...
...     def before_execution(self, op: "base.BaseOp", args: List[Any]):
...         print('{} called with {}'.format(op.name, args))
>>> is_even_pipeline = Pipeline([
...     Node(AddOneOp(), input_nodes=['__pipeline_input__'], output_nodes='modified'),
...     Node(IsOddOp(op_callbacks=[PrintOpName()]), input_nodes=['modified'],
...          output_nodes=['__pipeline_output__'])
... ], 'simple_pipeline')
>>> runner.run(is_even_pipeline, 3)
isoddop called with [4]
False
after_execution(callback_op: chariots.pipelines.ops._base_op.BaseOp, args: List[Any], output: Any)[source]

called after the operation has been executed (and after it’s after_execution’s method).

Parameters
  • callback_op – the operation that was executed

  • args – the arguments that were passed to the op

  • output – the output the op produced. DO NOT MODIFY the output reference as it might cause some undefined behavior

before_execution(callback_op: chariots.pipelines.ops._base_op.BaseOp, args: List[Any])[source]

called before the operation is executed (and before the operation’s before_execution’s method).

Parameters
  • callback_op – the operation that is going to be executed

  • args – the list of arguments that are going to be passed to the operation. DO NOT MODIFY those references as this might cause some undefined behavior

class chariots.pipelines.callbacks.PipelineCallback[source]

Bases: object

a pipeline callback is used to define instructions that need to be executed at certain points in the pipeline execution:

  • before the pipeline is ran

  • before each node of the pipeline

  • after each node of the pipeline

  • after the pipeline is ran

to create your own, you need to overide one or more of the before_execution, after_execution, before_node_execution, after_node_execution methods:

>>> class MyPipelineLogger(PipelineCallback):
...
...     def before_execution(self, pipeline: "chariots.Pipeline", args: List[Any]):
...         print('running {}'.format(pipeline))
...
...     def before_node_execution(self, pipeline: "chariots.Pipeline", node: "BaseNode", args: List[Any]):
...         print('running {} for {}'.format(node.name, pipeline.name))
>>> is_even_pipeline = Pipeline([
...     Node(AddOneOp(), input_nodes=['__pipeline_input__'], output_nodes='modified'),
...     Node(IsOddOp(), input_nodes=['modified'],
...          output_nodes=['__pipeline_output__'])
... ], 'simple_pipeline', pipeline_callbacks=[MyPipelineLogger()])
>>> runner.run(is_even_pipeline, 3)
running <OP simple_pipeline>
running addoneop for simple_pipeline
running isoddop for simple_pipeline
False
after_execution(pipeline: chariots.Pipeline, args: List[Any], output: Any)[source]

called after all the nodes of the pipeline have been ran with the pipeline being run and the output of the run

Parameters
  • pipeline – the pipeline being run

  • args – the pipeline input that as given at the beginning of the run

  • output – the output of the pipeline run. DO NOT MODIFY those references as this might cause some undefined behavior

after_node_execution(pipeline: chariots.Pipeline, node: BaseNode, args: List[Any], output: Any)[source]

called after each node is executed. The pipeline the node is in as well as the node are provided alongside the input/output of the node that ran

Parameters
  • pipeline – the pipeline being run

  • node – the node that is about to run

  • args – the arguments that was given to the node

  • output – the output the node produced. . DO NOT MODIFY those references as this might cause some undefined behavior

before_execution(pipeline: chariots.Pipeline, args: List[Any])[source]

called before any node in the pipeline is ran. provides the pipeline that is being run and the pipeline input

Parameters
  • pipeline – the pipeline being ran

  • args – the pipeline inputs. DO NOT MODIFY those references as this might cause some undefined behavior

before_node_execution(pipeline: chariots.Pipeline, node: BaseNode, args: List[Any])[source]

called before each node is executed the pipeline the node is in as well as the node are provided alongside the arguments the node is going to be given

Parameters
  • pipeline – the pipeline being run

  • node – the node that is about to run

  • args – the arguments that are going to be given to the node. DO NOT MODIFY those references as this might cause some undefined behavior