Source code for noob.network.loop
import asyncio
import threading
try:
import zmq
from tornado.ioloop import IOLoop
except ImportError as e:
raise ImportError(
"Attempted to import zmq runner, but zmq deps are not installed. install with `noob[zmq]`",
) from e
[docs]
class EventloopMixin:
def __init__(self):
self._context = None
self._loop = None
self._quitting = asyncio.Event()
self._thread: threading.Thread | None = None
@property
def context(self) -> zmq.Context:
if self._context is None:
self._context = zmq.Context.instance()
return self._context
@property
def loop(self) -> IOLoop:
# tornado requires an eventloop to be created manually now
try:
asyncio.get_running_loop()
except RuntimeError:
# Create a new asyncio event loop for this thread.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if self._loop is None:
self._loop = IOLoop.current()
return self._loop
[docs]
def start_loop(self) -> None:
if self._thread is not None:
raise RuntimeWarning("Node already started")
self._quitting.clear()
_ready = threading.Event()
def _signal_ready() -> None:
_ready.set()
def _run() -> None:
if hasattr(self, "logger"):
self.logger.debug("Starting eventloop")
while not self._quitting.is_set():
try:
self.loop.add_callback(_signal_ready)
self.loop.start()
except RuntimeError:
# loop already started
if hasattr(self, "logger"):
self.logger.debug("Eventloop already started, quitting")
break
if hasattr(self, "logger"):
self.logger.debug("Eventloop stopped")
self._thread = None
self._thread = threading.Thread(target=_run)
self._thread.start()
# wait until the loop has started
_ready.wait(5)
if hasattr(self, "logger"):
self.logger.debug("Event loop started")
[docs]
def stop_loop(self) -> None:
if self._thread is None:
return
self._quitting.set()
self.loop.add_callback(self.loop.stop)