Source code for noob.state

from typing import Self

from pydantic import BaseModel, Field

from noob.asset import Asset, AssetSpecification
from noob.node.base import Edge
from noob.types import PythonIdentifier


[docs] class StateSpecification(BaseModel): """ Configuration for the assets within a :class:`.State`. Representation of the yaml-form of a :class:`.State`. Converted to the runtime-form with :meth:`.State.from_specification`. Not much, if any validation is performed here on the whole :class:`.State` except that the assets have the correct fields, ignoring validity of e.g. type mismatches. Those require importing and introspecting the specified assets classes, which should only happen when we try and instantiate the :class:`.State` - this class is just a carrier for the yaml spec. """ assets: dict[str, AssetSpecification] = Field(default_factory=dict) """The assets that this :class:`.State` configures"""
[docs] class State(BaseModel): """ A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube. The :class:`.State` model is a container for a set of assets that are fully instantiated. It does not handle processing the assets -- that is handled by a TubeRunner. """ assets: dict[PythonIdentifier, Asset] = Field(default_factory=dict)
[docs] @classmethod def from_specification(cls, spec: StateSpecification) -> Self: """ Instantiate a :class:`.State` model from its configuration Args: spec (StateSpecification): the :class:`.State` config to instantiate """ assets = cls._init_assets(spec) return cls(assets=assets)
@classmethod def _init_assets(cls, specs: StateSpecification) -> dict[PythonIdentifier, Asset]: assets = {spec.id: Asset.from_specification(spec) for spec in specs.assets.values()} return assets
[docs] def get(self, signal: str) -> Asset | None: """ Get the event with the matching node_id and signal name Returns the most recent matching event, as for now we assume that each combination of `node_id` and `signal` is emitted only once per processing cycle, and we assume processing cycles are independent (and thus our events are cleared) ``None`` in the case that the event has not been emitted """ asset = [val for key, val in self.assets.items() if key == signal] return None if len(asset) == 0 else asset[-1]
[docs] def collect(self, edges: list[Edge]) -> dict | None: """ Gather events into a form that can be consumed by a :meth:`.Node.process` method, given the collection of inbound edges (usually from :meth:`.Tube.in_edges` ). If none of the requested events have been emitted, return ``None``. If all of the requested events have been emitted, return a kwarg-like dict If some of the requested events are missing but others are present, return ``None`` for any missing events. .. todo:: Add an example """ args = {} for edge in edges: if edge.source_node == "assets": assert edge.source_signal is not None, ( "Must set signal name when depending on an asset " "(assets have no generic 'value' signal)" ) asset = self.get(edge.source_signal) obj = None if asset is None else asset.obj args[edge.target_slot] = obj return None if not args or all(val is None for val in args.values()) else args
[docs] def clear(self) -> None: """ Clear assets. """ self.assets.clear()