state¶
- pydantic model StateSpecification[source]¶
Configuration for the assets within a
State.Representation of the yaml-form of a
State. Converted to the runtime-form withState.from_specification().Not much, if any validation is performed here on the whole
Stateexcept that the assets have the correct fields, ignoring validity of e.g. type mismatches. Those require importing and introspecting the specified assets classes, which should only happen when we try and instantiate theState- this class is just a carrier for the yaml spec.Show JSON schema
{ "title": "StateSpecification", "description": "Configuration for the assets within a :class:`.State`.\n\nRepresentation of the yaml-form of a :class:`.State`.\nConverted to the runtime-form with :meth:`.State.from_specification`.\n\nNot much, if any validation is performed here on the whole :class:`.State` except\nthat the assets have the correct fields, ignoring validity of\ne.g. type mismatches.\nThose require importing and introspecting the specified assets classes,\nwhich should only happen when we try and instantiate the :class:`.State` -\nthis class is just a carrier for the yaml spec.", "type": "object", "properties": { "assets": { "additionalProperties": { "$ref": "#/$defs/AssetSpecification" }, "title": "Assets", "type": "object" } }, "$defs": { "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Depends" } }, "required": [ "id", "type", "scope" ], "title": "AssetSpecification", "type": "object" } } }
- field assets: dict[str, AssetSpecification] [Optional]¶
The assets that this
Stateconfigures
- pydantic model State[source]¶
A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube.
The
Statemodel is a container for a set of assets that are fully instantiated. It does not handle processing the assets – that is handled by a TubeRunner.Show JSON schema
{ "title": "State", "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.", "type": "object", "properties": { "assets": { "additionalProperties": { "$ref": "#/$defs/Asset" }, "title": "Assets", "type": "object" } }, "$defs": { "Asset": { "additionalProperties": false, "description": "An asset within a processing tube.", "properties": { "id": { "title": "Id", "type": "string" }, "spec": { "$ref": "#/$defs/AssetSpecification" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "additionalProperties": true, "title": "Params", "type": "object" }, "obj": { "anyOf": [ {}, { "type": "null" } ], "default": null, "title": "Obj" } }, "required": [ "id", "spec", "scope" ], "title": "Asset", "type": "object" }, "AssetScope": { "enum": [ "runner", "process", "node" ], "title": "AssetScope", "type": "string" }, "AssetSpecification": { "description": "Specification for a single asset within a tube .yaml file.", "properties": { "id": { "title": "Id", "type": "string" }, "type": { "title": "Type", "type": "string" }, "scope": { "$ref": "#/$defs/AssetScope" }, "params": { "anyOf": [ { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Params" }, "depends": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Depends" } }, "required": [ "id", "type", "scope" ], "title": "AssetSpecification", "type": "object" } } }
- field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Asset] [Optional]¶
- classmethod from_specification(spec: StateSpecification) Self[source]¶
Instantiate a
Statemodel from its configuration- Parameters:
spec (
StateSpecification) – theStateconfig to instantiate
- collect(edges: list[Edge]) dict | None[source]¶
Gather events into a form that can be consumed by a
Node.process()method, given the collection of inbound edges (usually fromTube.in_edges()).If none of the requested events have been emitted, return
None.If all of the requested events have been emitted, return a kwarg-like dict
If some of the requested events are missing but others are present, return
Nonefor any missing events.Todo
Add an example
- get(signal: str) Asset | None[source]¶
Get the event with the matching node_id and signal name
Returns the most recent matching event, as for now we assume that each combination of node_id and signal is emitted only once per processing cycle, and we assume processing cycles are independent (and thus our events are cleared)
Nonein the case that the event has not been emitted