Source code for noob.runner.sync
from __future__ import annotations
from dataclasses import dataclass
from threading import Event as ThreadEvent
from typing import Any
from noob.input import InputScope
from noob.node import Return
from noob.runner.base import TubeRunner
from noob.types import ReturnNodeType
[docs]
@dataclass
class SynchronousRunner(TubeRunner):
"""
Simple, synchronous tube runner.
Just run the nodes in topological order and return from return nodes.
"""
def __post_init__(self):
super().__post_init__()
self._running = ThreadEvent()
[docs]
def init(self) -> None:
"""
Start processing data with the tube graph.
"""
# TODO: lock for re-entry
if self._running.is_set():
# fine!
return
self._running.set()
for node in self.tube.enabled_nodes.values():
self.inject_context(node.init)()
for asset in self.tube.state.assets.values():
self.inject_context(asset.init)()
[docs]
def deinit(self) -> None:
"""Stop all nodes processing"""
# TODO: lock to ensure we've been started
for node in self.tube.enabled_nodes.values():
node.deinit()
for asset in self.tube.state.assets.values():
asset.deinit()
self._running.clear()
@property
def running(self) -> bool:
"""Whether the tube is currently running"""
return self._running.is_set()
[docs]
def process(self, **kwargs: Any) -> ReturnNodeType:
"""
Iterate through nodes in topological order,
calling their process method and passing events as they are emitted.
Process-scoped ``input`` s can be passed as kwargs.
"""
if not self._running.is_set():
self.init()
input = self.tube.input_collection.validate_input(InputScope.process, kwargs)
self.store.clear()
scheduler = self.tube.scheduler
scheduler.add_epoch()
while scheduler.is_active():
ready = scheduler.get_ready()
if not ready:
scheduler.end_epoch()
break
for node_info in ready:
node_id, epoch = node_info["value"], node_info["epoch"]
if node_id in ("assets", "input"):
# graph autogenerates "assets" node if something depends on it
scheduler.done(epoch, node_id)
continue
node = self.tube.nodes[node_id]
if not node.enabled:
# nodes can be in the graph while disabled if something else depends on them
# FIXME when we stop using the builtin graphlib
continue
maybe_args, maybe_kwargs = self.collect_input(node, epoch, input)
# need to eventually distinguish "still waiting" vs "there is none"
args = [] if maybe_args is None else maybe_args
kwargs = {} if maybe_kwargs is None else maybe_kwargs
value = node.process(*args, **kwargs)
# take the value from state first. if it's taken by an asset,
# the value is converted to its id, and returned again.
events = self.store.add_value(node.signals, value, node_id, epoch)
if events is None:
continue
all_events = scheduler.update(events)
self._call_callbacks(all_events)
self._logger.debug("Node %s emitted %s in epoch %s", node_id, value, epoch)
return self.collect_return()
[docs]
def enable_node(self, node_id: str) -> None:
self.tube.nodes[node_id].init()
self.tube.enable_node(node_id)
[docs]
def disable_node(self, node_id: str) -> None:
self.tube.nodes[node_id].deinit()
self.tube.disable_node(node_id)
[docs]
def collect_return(self, epoch: int | None = None) -> ReturnNodeType:
"""The return node holds values from a single epoch, get and transform them"""
if epoch is not None:
raise ValueError("Sync runner only stores a single epoch at a time")
ret_nodes = [n for n in self.tube.enabled_nodes.values() if isinstance(n, Return)]
if not ret_nodes:
return None
ret_node = ret_nodes[0]
return ret_node.get(keep=False)