base¶
- class TubeRunner(tube: ~noob.tube.Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[~collections.abc.Callable[[~noob.event.Event | ~noob.event.MetaEvent], None]] = <factory>, _logger: ~logging.Logger = None, _runner_id: str | None = None)[source]¶
Abstract parent class for tube runners.
Tube runners handle calling the nodes and passing the events returned by them to each other. Each runner may do so however it needs to (synchronously, asynchronously, alone or as part of a cluster, etc.) as long as it satisfies this abstract interface.
- store: EventStore¶
- max_iter_loops: int = 100¶
The max number of times that iter will call process to try and get a result
- abstractmethod process(**kwargs: Any) None | dict[str, Any] | Any[source]¶
Process one step of data from each of the sources, passing intermediate data to any subscribed nodes in a chain.
The process method normally does not return anything, except when using the special
ReturnnodeProcess-scoped
inputs can be passed as kwargs.
- abstractmethod init() None[source]¶
Start processing data with the tube graph.
Implementations of this method must raise a
TubeRunningErrorif the tube has already been started and is running, (i.e.deinit()has not been called, or the tube has not exhausted itself)
- iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]¶
Treat the runner as an iterable.
Calls
TubeRunner.process()until it yields a result (e.g. multiple times in the case of anygathers that change the cardinality of the graph.)
- collect_input(node: Node, epoch: int, input: dict | None = None) tuple[list[Any] | None, dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Any] | None][source]¶
Gather input to give to the passed Node from the
TubeRunner.store- Returns:
kwargs to pass to
Node.process()if matching events are present dict: empty dict if Node is aSourceNone: if no input is available- Return type:
- abstractmethod collect_return(epoch: int | None = None) None | dict[str, Any] | Any[source]¶
If any
Returnnodes are in the tube, gather their return values to return fromTubeRunner.process()
- abstractmethod disable_node(node_id: str) None[source]¶
A method for disabling a node during runtime
- get_context() RunnerContext[source]¶