state

pydantic model StateSpecification[source]

Configuration for the assets within a State.

Representation of the yaml-form of a State. Converted to the runtime-form with State.from_specification().

Not much, if any validation is performed here on the whole State except that the assets have the correct fields, ignoring validity of e.g. type mismatches. Those require importing and introspecting the specified assets classes, which should only happen when we try and instantiate the State - this class is just a carrier for the yaml spec.

Show JSON schema
{
   "title": "StateSpecification",
   "description": "Configuration for the assets within a :class:`.State`.\n\nRepresentation of the yaml-form of a :class:`.State`.\nConverted to the runtime-form with :meth:`.State.from_specification`.\n\nNot much, if any validation is performed here on the whole :class:`.State` except\nthat the assets have the correct fields, ignoring validity of\ne.g. type mismatches.\nThose require importing and introspecting the specified assets classes,\nwhich should only happen when we try and instantiate the :class:`.State` -\nthis class is just a carrier for the yaml spec.",
   "type": "object",
   "properties": {
      "assets": {
         "additionalProperties": {
            "$ref": "#/$defs/AssetSpecification"
         },
         "title": "Assets",
         "type": "object"
      }
   },
   "$defs": {
      "AssetScope": {
         "enum": [
            "runner",
            "process",
            "node"
         ],
         "title": "AssetScope",
         "type": "string"
      },
      "AssetSpecification": {
         "description": "Specification for a single asset within a tube .yaml file.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "type": {
               "title": "Type",
               "type": "string"
            },
            "scope": {
               "$ref": "#/$defs/AssetScope"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            }
         },
         "required": [
            "id",
            "type",
            "scope"
         ],
         "title": "AssetSpecification",
         "type": "object"
      }
   }
}

Fields:
field assets: dict[str, AssetSpecification] [Optional]

The assets that this State configures

pydantic model State[source]

A collection of assets storing objects that persist through iterations of the tube. The target demographics generally include database connections, large arrays and statistics that traverse multiple processes of the tube.

The State model is a container for a set of assets that are fully instantiated. It does not handle processing the assets – that is handled by a TubeRunner.

Show JSON schema
{
   "title": "State",
   "description": "A collection of assets storing objects that persist through iterations of the tube.\nThe target demographics generally include database connections, large arrays and statistics\nthat traverse multiple processes of the tube.\n\nThe :class:`.State` model is a container for a set of assets that are fully instantiated.\nIt does not handle processing the assets -- that is handled by a TubeRunner.",
   "type": "object",
   "properties": {
      "assets": {
         "additionalProperties": {
            "$ref": "#/$defs/Asset"
         },
         "title": "Assets",
         "type": "object"
      }
   },
   "$defs": {
      "Asset": {
         "additionalProperties": false,
         "description": "An asset within a processing tube.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "spec": {
               "$ref": "#/$defs/AssetSpecification"
            },
            "scope": {
               "$ref": "#/$defs/AssetScope"
            },
            "params": {
               "additionalProperties": true,
               "title": "Params",
               "type": "object"
            },
            "obj": {
               "anyOf": [
                  {},
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Obj"
            }
         },
         "required": [
            "id",
            "spec",
            "scope"
         ],
         "title": "Asset",
         "type": "object"
      },
      "AssetScope": {
         "enum": [
            "runner",
            "process",
            "node"
         ],
         "title": "AssetScope",
         "type": "string"
      },
      "AssetSpecification": {
         "description": "Specification for a single asset within a tube .yaml file.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "type": {
               "title": "Type",
               "type": "string"
            },
            "scope": {
               "$ref": "#/$defs/AssetScope"
            },
            "params": {
               "anyOf": [
                  {
                     "additionalProperties": true,
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Params"
            },
            "depends": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Depends"
            }
         },
         "required": [
            "id",
            "type",
            "scope"
         ],
         "title": "AssetSpecification",
         "type": "object"
      }
   }
}

Fields:
field assets: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Asset] [Optional]
classmethod from_specification(spec: StateSpecification) Self[source]

Instantiate a State model from its configuration

Parameters:

spec (StateSpecification) – the State config to instantiate

clear() None[source]

Clear assets.

collect(edges: list[Edge]) dict | None[source]

Gather events into a form that can be consumed by a Node.process() method, given the collection of inbound edges (usually from Tube.in_edges() ).

If none of the requested events have been emitted, return None.

If all of the requested events have been emitted, return a kwarg-like dict

If some of the requested events are missing but others are present, return None for any missing events.

Todo

Add an example

get(signal: str) Asset | None[source]

Get the event with the matching node_id and signal name

Returns the most recent matching event, as for now we assume that each combination of node_id and signal is emitted only once per processing cycle, and we assume processing cycles are independent (and thus our events are cleared)

None in the case that the event has not been emitted