Source code for noob.state
from typing import Self
from pydantic import BaseModel, Field
from noob.asset import Asset, AssetSpecification
from noob.node.base import Edge
from noob.types import PythonIdentifier
[docs]
class StateSpecification(BaseModel):
"""
Configuration for the assets within a :class:`.State`.
Representation of the yaml-form of a :class:`.State`.
Converted to the runtime-form with :meth:`.State.from_specification`.
Not much, if any validation is performed here on the whole :class:`.State` except
that the assets have the correct fields, ignoring validity of
e.g. type mismatches.
Those require importing and introspecting the specified assets classes,
which should only happen when we try and instantiate the :class:`.State` -
this class is just a carrier for the yaml spec.
"""
assets: dict[str, AssetSpecification] = Field(default_factory=dict)
"""The assets that this :class:`.State` configures"""
[docs]
class State(BaseModel):
"""
A collection of assets storing objects that persist through iterations of the tube.
The target demographics generally include database connections, large arrays and statistics
that traverse multiple processes of the tube.
The :class:`.State` model is a container for a set of assets that are fully instantiated.
It does not handle processing the assets -- that is handled by a TubeRunner.
"""
assets: dict[PythonIdentifier, Asset] = Field(default_factory=dict)
[docs]
@classmethod
def from_specification(cls, spec: StateSpecification) -> Self:
"""
Instantiate a :class:`.State` model from its configuration
Args:
spec (StateSpecification): the :class:`.State` config to instantiate
"""
assets = cls._init_assets(spec)
return cls(assets=assets)
@classmethod
def _init_assets(cls, specs: StateSpecification) -> dict[PythonIdentifier, Asset]:
assets = {spec.id: Asset.from_specification(spec) for spec in specs.assets.values()}
return assets
[docs]
def get(self, signal: str) -> Asset | None:
"""
Get the event with the matching node_id and signal name
Returns the most recent matching event, as for now we assume that
each combination of `node_id` and `signal` is emitted only once per processing cycle,
and we assume processing cycles are independent (and thus our events are cleared)
``None`` in the case that the event has not been emitted
"""
asset = [val for key, val in self.assets.items() if key == signal]
return None if len(asset) == 0 else asset[-1]
[docs]
def collect(self, edges: list[Edge]) -> dict | None:
"""
Gather events into a form that can be consumed by a :meth:`.Node.process` method,
given the collection of inbound edges (usually from :meth:`.Tube.in_edges` ).
If none of the requested events have been emitted, return ``None``.
If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present,
return ``None`` for any missing events.
.. todo::
Add an example
"""
args = {}
for edge in edges:
if edge.source_node == "assets":
assert edge.source_signal is not None, (
"Must set signal name when depending on an asset "
"(assets have no generic 'value' signal)"
)
asset = self.get(edge.source_signal)
obj = None if asset is None else asset.obj
args[edge.target_slot] = obj
return None if not args or all(val is None for val in args.values()) else args
[docs]
def clear(self) -> None:
"""
Clear assets.
"""
self.assets.clear()