zmq

  • Central command pub/sub

  • each sub-runner has its own set of sockets for publishing and consuming events

  • use the node_id.signal etc. as basically a feed address

Todo

Currently only IPC is supported, and thus the zmq runner can’t run across machines. Supporting TCP is WIP, it will require some degree of authentication among nodes to prevent arbitrary code execution, since we shouldn’t count on users to properly firewall their runners.

Todo

The socket spawning and event handling is awfully manual here. Leaving it as is because it’s somewhat unlikely we’ll need to generalize it, but otherwise it would be great to standardize socket names and have event handler decorators like:

@on_router(MessageType.sometype)

class CommandNode(runner_id: str, protocol: str = 'ipc', port: int | None = None)[source]

Pub node that controls the state of the other nodes/announces addresses

  • one PUB socket to distribute commands

  • one ROUTER socket to receive return messages from runner nodes

  • one SUB socket to subscribe to all events

The wrapping runner should register callbacks with add_callback to handle incoming messages.

property pub_address: str

Address the publisher bound to

property router_address: str

Address the return router is bound to

start() None[source]
stop() None[source]
announce() None[source]
process() None[source]

Emit a ProcessMsg to process a single round through the graph

add_callback(type_: Literal['inbox', 'router'], cb: Callable[[Message], Any]) None[source]

Add a callback called for message received - by the inbox: the subscriber that receives all events from node runners - by the router: direct messages sent by node runners to the command node

await_ready(node_ids: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]]) None[source]

Wait until all the node_ids have announced themselves

on_router(msg: list[bytes]) None[source]
on_inbox(msg: list[bytes]) None[source]
on_identify(msg: IdentifyMsg) None[source]
on_status(msg: StatusMsg) None[source]
class NodeRunner(spec: NodeSpecification, runner_id: str, command_outbox: str, command_router: str, input_collection: InputCollection, protocol: str = 'ipc')[source]

Runner for a single node

  • DEALER to communicate with command inbox

  • PUB (outbox) to publish events

  • SUB (inbox) to subscribe to events from other nodes.

property outbox_address: str
property depends: tuple[tuple[str, str], ...] | None

(node, signal) tuples of the wrapped node’s dependencies

property status: NodeStatus
classmethod run(spec: NodeSpecification, **kwargs: Any) None[source]

Target for multiprocessing.run, init the class and start it!

await_inputs() Generator[tuple[list[Any], dict[str, Any], int]][source]
update_graph(events: list[Event]) None[source]
publish_events(events: list[Event]) None[source]
init() None[source]
deinit() None[source]
identify() None[source]

Send the command node an announce to say we’re alive

update_status(status: NodeStatus) None[source]

Update our internal status and announce it to the command node

start_sockets() None[source]
init_node() None[source]
on_dealer(msg: list[bytes]) None[source]
on_inbox(msg: list[bytes]) None[source]
on_announce(msg: AnnounceMsg) None[source]

Store map, connect to the nodes we depend on

on_event(msg: EventMsg) None[source]
on_process(msg: ProcessMsg) None[source]

Process a single graph iteration

on_stop(msg: StopMsg) None[source]

Stop processing!

error(err: Exception) None[source]

Capture the error and traceback context from an exception using traceback.TracebackException and send to command node to re-raise

class ZMQRunner(tube: Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[Callable[[Event | MetaEvent], None]] = <factory>, _logger: Logger = None, _runner_id: str | None = None, node_procs: dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], ~multiprocessing.Process] = <factory>, command: ~noob.runner.zmq.CommandNode | None = None, quit_timeout: float = 10, _running: ~multiprocessing.synchronize.Event = <factory>, _return_node: ~noob.node.return_.Return | None = None, _init_lock: ~_thread.lock = <factory>, _to_throw: ~noob.network.message.ErrorValue | None = None)[source]

A concurrent runner that uses zmq to broker events between nodes running in separate processes

node_procs: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Process]
command: CommandNode | None = None
quit_timeout: float = 10

time in seconds to wait after calling deinit to wait before killing runner processes

store: EventStore
property running: bool
init() None[source]
deinit() None[source]
process(**kwargs: Any) None | dict[str, Any] | Any[source]
on_event(msg: Message) None[source]
on_router(msg: Message) None[source]
collect_return(epoch: int | None = None) Any[source]
enable_node(node_id: str) None[source]
disable_node(node_id: str) None[source]