Source code for hydra_router.utils.HydraMQ

# hydra_router/utils/HydraMQ.py
#
#    Hydra Router
#    Author: Nadim-Daniel Ghaznavi
#    Copyright: (c) 2025-2026 Nadim-Daniel Ghaznavi
#    GitHub: https://github.com/NadimGhaznavi/hydra_router
#    Website: https://hydra-router.readthedocs.io/en/latest
#    License: GPL 3.0
#

import asyncio
import sys
import time
from collections.abc import Awaitable, Callable

import zmq
import zmq.asyncio
from zmq.sugar.frame import Frame

from hydra_router.constants.DHydra import (
    DHydra,
    DHydraRouterDef,
    DHydraServerDef,
    DMethod,
    DModule,
)
from hydra_router.constants.DHydraTui import DLabel
from hydra_router.utils.HydraMsg import HydraMsg


def _ensure_bytes(data: bytes | Frame) -> bytes:
    return data.bytes if isinstance(data, Frame) else data


[docs] class HydraMQ: """ Async ZeroMQ client for HydraRouter communication. HydraMQ provides an async DEALER socket client that connects to a HydraRouter instance. It handles message serialization, heartbeats, and connection lifecycle. The client uses a DEALER socket which allows asynchronous bidirectional communication through a ROUTER-based message broker. Example: mq = HydraMQ( router_address="localhost", router_port=5757, id="my-service" ) # Send message msg = HydraMsg( sender=mq.identity, target="other-service", method="ping", payload={"data": "test"} ) await mq.send(msg) # Receive message response = await mq.recv() print(response.payload) # Cleanup await mq.quit() """
[docs] def __init__( self, router_address: str = DHydraRouterDef.HOSTNAME, router_port: int = DHydraRouterDef.PORT, router_hb_port: int = DHydraRouterDef.HEARTBEAT_PORT, identity: str = DModule.HYDRA_MQ, srv_bind_address: str = "*", srv_bind_port: int = DHydraServerDef.PORT, srv_methods: ( dict[str, Callable[[HydraMsg], object | Awaitable[object]]] | None ) = None, ) -> None: """ Initialize HydraMQ client. Args: router_address: Hostname/IP of the HydraRouter router_port: Port number of the HydraRouter id: Base identifier for this client (random suffix added) heartbeat_enabled: Whether to send periodic heartbeats Returns: None """ self.router = router_address self.port = router_port self.hb_port = router_hb_port # Legacy parameters retained for compatibility with existing callers. self.srv_methods = srv_methods or {} # Create async ZeroMQ context and DEALER socket self.ctx = zmq.asyncio.Context() self.socket = self.ctx.socket(zmq.DEALER) self.hb_socket = self.ctx.socket(zmq.DEALER) # Generate unique identity: base-id + random 4-char suffix self.identity = identity # Set ZeroMQ socket identity (must be bytes) self.socket.setsockopt(zmq.IDENTITY, self.identity.encode("utf-8")) self.hb_socket.setsockopt(zmq.IDENTITY, self.identity.encode("utf-8")) # Build router address self.router_addr = f"tcp://{self.router}:{self.port}" self.router_hb_addr = f"tcp://{self.router}:{self.hb_port}" # Asyncio control events self.stop_event = asyncio.Event() self.heartbeat_stop_event = asyncio.Event() # Connect to router self.socket.connect(self.router_addr) self.hb_socket.connect(self.router_hb_addr) # Placeholder for heartbeat task self.heartbeat_task: asyncio.Task[None] | None = None self.srv_task: asyncio.Task[None] | None = None if self.srv_methods: self.srv_stop_event = asyncio.Event() self.srv_pause_event = asyncio.Event() # A float holding time.time() for when the last heartbeat reply was received self._last_heartbeat: float = 0.0 # Flag to determine if start() has been called self._started = False
[docs] async def bg_listen(self) -> None: try: while not self.srv_stop_event.is_set(): # Handle pause if self.srv_pause_event.is_set(): await asyncio.sleep(0.1) continue try: message_data = await asyncio.wait_for( self.socket.recv(copy=True), timeout=DHydra.NETWORK_TIMEOUT ) hydra_msg = HydraMsg.from_json(_ensure_bytes(message_data)) method = hydra_msg.method handler = self.srv_methods.get(method) if handler is not None: result = handler(hydra_msg) if asyncio.iscoroutine(result): await result else: print(f"{DLabel.ERROR}: Unhandled method {method}") except asyncio.TimeoutError: # No message was received, continue... pass except Exception as e: print(f"{DLabel.ERROR}: {e}") except Exception as e: print(f"{DLabel.ERROR}: {e}") sys.exit(1)
[docs] def connected(self) -> bool: if self._last_heartbeat == 0: return False interval = time.time() - self._last_heartbeat if interval > (2 * DHydra.HEARTBEAT_INTERVAL): return False return True
[docs] async def quit(self) -> None: """ Cleanly shutdown the HydraMQ client. Stops heartbeat task, disconnects from router, and cleans up ZeroMQ resources. Returns: None """ # Stop heartbeat task if self.heartbeat_task is not None: self.heartbeat_stop_event.set() await asyncio.sleep(0.1) # Give task time to exit self.heartbeat_task.cancel() try: await self.heartbeat_task except asyncio.CancelledError: pass if self.srv_task is not None: self.srv_stop_event.set() await asyncio.sleep(0.1) self.srv_task.cancel() try: await self.srv_task except asyncio.CancelledError: pass # Disconnect and cleanup try: self.socket.disconnect(self.router_addr) self.socket.close(linger=0) self.hb_socket.disconnect(self.router_hb_addr) self.hb_socket.close(linger=0) finally: self.ctx.term()
[docs] async def recv(self) -> HydraMsg: """ Receive a HydraMsg from the router. Waits for an incoming message, deserializes it, and returns a HydraMsg instance. Args: timeout: Maximum time to wait for a message in seconds Returns: HydraMsg instance Raises: asyncio.TimeoutError: If no message received within timeout zmq.ZMQError: If receive operation fails json.JSONDecodeError: If message is not valid JSON """ # DEALER socket receives single frame from ROUTER # ROUTER sends [client_identity, message], but DEALER # automatically strips the identity, leaving just [message] message_data = None message_data = await asyncio.wait_for( self.socket.recv(copy=True), timeout=DHydra.NETWORK_TIMEOUT ) if message_data is not None: return HydraMsg.from_json(_ensure_bytes(message_data))
[docs] async def send(self, msg: HydraMsg) -> None: """ Send a HydraMsg through the router. Serializes the message to JSON and sends it through the DEALER socket to the connected ROUTER. Args: msg: HydraMsg instance to send Returns: None Raises: zmq.ZMQError: If send operation fails """ # DEALER socket automatically prepends identity when sending to ROUTER await self.socket.send(msg.to_json())
def _split_router_frames( self, frames: list[bytes] ) -> tuple[bytes, bytes, list[bytes]]: """ Returns (sender, payload, routing_prefix) routing_prefix is what you should echo back before payload. """ if len(frames) < 2: raise ValueError(f"Expected >=2 frames, got {len(frames)}") sender = frames[0] if len(frames) >= 3 and frames[1] == b"": return sender, frames[-1], [sender, b""] else: return sender, frames[-1], [sender]
[docs] def start(self): if self._started: return if self.srv_methods and self.srv_task is None: self.srv_task = asyncio.create_task(self.bg_listen()) # Start heartbeat task self.heartbeat_task = asyncio.create_task(self.start_heartbeat_bg()) self._started = True
[docs] def started(self) -> bool: return self._started
[docs] async def start_heartbeat_bg(self) -> None: """ Periodic heartbeat loop to keep connection alive. Sends heartbeat messages to the router at regular intervals to indicate this client is still active. Returns: None """ while not self.heartbeat_stop_event.is_set(): msg = HydraMsg( sender=self.identity, target=DModule.HYDRA_ROUTER, method=DMethod.HEARTBEAT, ) await self.hb_socket.send(msg.to_json()) try: message_data = await asyncio.wait_for( self.hb_socket.recv(copy=True), timeout=DHydra.NETWORK_TIMEOUT ) reply = HydraMsg.from_json(_ensure_bytes(message_data)) if reply.method == DMethod.HEARTBEAT_REPLY: self._last_heartbeat = time.time() except asyncio.TimeoutError: # Just continue and try again pass await asyncio.sleep(DHydra.HEARTBEAT_INTERVAL)