base

class TubeRunner(tube: ~noob.tube.Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[~collections.abc.Callable[[~noob.event.Event | ~noob.event.MetaEvent], None]] = <factory>, _logger: ~logging.Logger = None, _runner_id: str | None = None)[source]

Abstract parent class for tube runners.

Tube runners handle calling the nodes and passing the events returned by them to each other. Each runner may do so however it needs to (synchronously, asynchronously, alone or as part of a cluster, etc.) as long as it satisfies this abstract interface.

tube: Tube
store: EventStore
max_iter_loops: int = 100

The max number of times that iter will call process to try and get a result

property runner_id: str
abstractmethod process(**kwargs: Any) None | dict[str, Any] | Any[source]

Process one step of data from each of the sources, passing intermediate data to any subscribed nodes in a chain.

The process method normally does not return anything, except when using the special Return node

Process-scoped input s can be passed as kwargs.

abstractmethod init() None[source]

Start processing data with the tube graph.

Implementations of this method must raise a TubeRunningError if the tube has already been started and is running, (i.e. deinit() has not been called, or the tube has not exhausted itself)

abstractmethod deinit() None[source]

Stop processing data with the tube graph

iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]

Treat the runner as an iterable.

Calls TubeRunner.process() until it yields a result (e.g. multiple times in the case of any gather s that change the cardinality of the graph.)

run(n: int | None = None) None | list[None | dict[str, Any] | Any][source]
abstract property running: bool

Whether the tube is currently running

collect_input(node: Node, epoch: int, input: dict | None = None) tuple[list[Any] | None, dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Any] | None][source]

Gather input to give to the passed Node from the TubeRunner.store

Returns:

kwargs to pass to Node.process() if matching events are present dict: empty dict if Node is a Source None: if no input is available

Return type:

dict

abstractmethod collect_return(epoch: int | None = None) None | dict[str, Any] | Any[source]

If any Return nodes are in the tube, gather their return values to return from TubeRunner.process()

Returns:

of the Return sink’s key mapped to the returned value, None: if there are no Return sinks in the tube

Return type:

dict

add_callback(callback: Callable[[Event | MetaEvent], None]) None[source]
abstractmethod enable_node(node_id: str) None[source]

A method for enabling a node during runtime

abstractmethod disable_node(node_id: str) None[source]

A method for disabling a node during runtime

get_context() RunnerContext[source]
inject_context(fn: Callable) Callable[source]

Wrap function in a partial with the runner context injected, if requested