Source code for noob.network.loop

import asyncio
import threading

try:
    import zmq
    from tornado.ioloop import IOLoop
except ImportError as e:
    raise ImportError(
        "Attempted to import zmq runner, but zmq deps are not installed. install with `noob[zmq]`",
    ) from e


[docs] class EventloopMixin: def __init__(self): self._context = None self._loop = None self._quitting = asyncio.Event() self._thread: threading.Thread | None = None @property def context(self) -> zmq.Context: if self._context is None: self._context = zmq.Context.instance() return self._context @property def loop(self) -> IOLoop: # tornado requires an eventloop to be created manually now try: asyncio.get_running_loop() except RuntimeError: # Create a new asyncio event loop for this thread. loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) if self._loop is None: self._loop = IOLoop.current() return self._loop
[docs] def start_loop(self) -> None: if self._thread is not None: raise RuntimeWarning("Node already started") self._quitting.clear() _ready = threading.Event() def _signal_ready() -> None: _ready.set() def _run() -> None: if hasattr(self, "logger"): self.logger.debug("Starting eventloop") while not self._quitting.is_set(): try: self.loop.add_callback(_signal_ready) self.loop.start() except RuntimeError: # loop already started if hasattr(self, "logger"): self.logger.debug("Eventloop already started, quitting") break if hasattr(self, "logger"): self.logger.debug("Eventloop stopped") self._thread = None self._thread = threading.Thread(target=_run) self._thread.start() # wait until the loop has started _ready.wait(5) if hasattr(self, "logger"): self.logger.debug("Event loop started")
[docs] def stop_loop(self) -> None: if self._thread is None: return self._quitting.set() self.loop.add_callback(self.loop.stop)