zmq¶
Central command pub/sub
each sub-runner has its own set of sockets for publishing and consuming events
use the node_id.signal etc. as basically a feed address
Todo
Currently only IPC is supported, and thus the zmq runner can’t run across machines. Supporting TCP is WIP, it will require some degree of authentication among nodes to prevent arbitrary code execution, since we shouldn’t count on users to properly firewall their runners.
Todo
The socket spawning and event handling is awfully manual here. Leaving it as is because it’s somewhat unlikely we’ll need to generalize it, but otherwise it would be great to standardize socket names and have event handler decorators like:
@on_router(MessageType.sometype)
- class CommandNode(runner_id: str, protocol: str = 'ipc', port: int | None = None)[source]¶
Pub node that controls the state of the other nodes/announces addresses
one PUB socket to distribute commands
one ROUTER socket to receive return messages from runner nodes
one SUB socket to subscribe to all events
The wrapping runner should register callbacks with add_callback to handle incoming messages.
- add_callback(type_: Literal['inbox', 'router'], cb: Callable[[Message], Any]) None[source]¶
Add a callback called for message received - by the inbox: the subscriber that receives all events from node runners - by the router: direct messages sent by node runners to the command node
- await_ready(node_ids: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]]) None[source]¶
Wait until all the node_ids have announced themselves
- on_identify(msg: IdentifyMsg) None[source]¶
- class NodeRunner(spec: NodeSpecification, runner_id: str, command_outbox: str, command_router: str, input_collection: InputCollection, protocol: str = 'ipc')[source]¶
Runner for a single node
DEALER to communicate with command inbox
PUB (outbox) to publish events
SUB (inbox) to subscribe to events from other nodes.
- property depends: tuple[tuple[str, str], ...] | None¶
(node, signal) tuples of the wrapped node’s dependencies
- property status: NodeStatus¶
- classmethod run(spec: NodeSpecification, **kwargs: Any) None[source]¶
Target for multiprocessing.run, init the class and start it!
- update_status(status: NodeStatus) None[source]¶
Update our internal status and announce it to the command node
- on_announce(msg: AnnounceMsg) None[source]¶
Store map, connect to the nodes we depend on
- on_process(msg: ProcessMsg) None[source]¶
Process a single graph iteration
- error(err: Exception) None[source]¶
Capture the error and traceback context from an exception using
traceback.TracebackExceptionand send to command node to re-raise
- class ZMQRunner(tube: Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[Callable[[Event | MetaEvent], None]] = <factory>, _logger: Logger = None, _runner_id: str | None = None, node_procs: dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], ~multiprocessing.Process] = <factory>, command: ~noob.runner.zmq.CommandNode | None = None, quit_timeout: float = 10, _running: ~multiprocessing.synchronize.Event = <factory>, _return_node: ~noob.node.return_.Return | None = None, _init_lock: ~_thread.lock = <factory>, _to_throw: ~noob.network.message.ErrorValue | None = None)[source]¶
A concurrent runner that uses zmq to broker events between nodes running in separate processes
- node_procs: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Process]¶
- command: CommandNode | None = None¶
- quit_timeout: float = 10¶
time in seconds to wait after calling deinit to wait before killing runner processes
- store: EventStore¶