# hydra_router/utils/HydraMsg.py
#
# Hydra Router
# Author: Nadim-Daniel Ghaznavi
# Copyright: (c) 2025-2026 Nadim-Daniel Ghaznavi
# GitHub: https://github.com/NadimGhaznavi/hydra_router
# Website: https://hydra-router.readthedocs.io/en/latest
# License: GPL 3.0
import json
from typing import Any
from hydra_router.constants.DHydra import DHydra, DHydraMsg
[docs]
class HydraMsg:
"""
Structured message class for HydraRouter communication protocol.
HydraMsg encapsulates messages for ZeroMQ-based RPC communication.
It handles serialization/deserialization and provides a clean API
for creating and accessing message components.
Internal representation uses Python objects (dict for payload).
Serialization to JSON happens only when converting to wire format.
# Serialize for transmission
json_bytes = msg.to_json()
# Deserialize from received data
msg = HydraMsg.from_json(json_bytes)
"""
[docs]
def __init__(
self,
sender: str,
target: str,
method: str,
payload: dict[str, Any] | None = None,
) -> None:
"""
Initialize a new HydraMsg instance.
Args:
sender: Identifier of the message sender
target: Identifier of the intended message recipient
method: RPC method or action to be performed
payload: Message data as a dictionary (not JSON string)
Returns:
None
"""
self._sender = sender
self._target = target
self._method = method
self._payload = payload if payload is not None else {}
@property
def sender(self) -> str:
"""Get the message sender identifier."""
return self._sender
@sender.setter
def sender(self, value: str) -> None:
"""Set the message sender identifier."""
self._sender = value
@property
def target(self) -> str:
"""Get the message target identifier."""
return self._target
@target.setter
def target(self, value: str) -> None:
"""Set the message target identifier."""
self._target = value
@property
def method(self) -> str:
"""Get the RPC method name."""
return self._method
@method.setter
def method(self, value: str) -> None:
"""Set the RPC method name."""
self._method = value
@property
def payload(self) -> dict[str, Any]:
"""Get the message payload as a dictionary."""
return self._payload
@payload.setter
def payload(self, value: dict[str, Any]) -> None:
"""Set the message payload from a dictionary."""
self._payload = value
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "HydraMsg":
"""
Create a HydraMsg from a dictionary.
Args:
data: Dictionary containing message fields
Returns:
HydraMsg instance
Raises:
KeyError: If required fields are missing
"""
pv = data[DHydraMsg.PROTOCOL_VERSION]
if pv != DHydra.PROTOCOL_VERSION:
raise ValueError(f"Unsupported Hydra protocol version: {pv}")
payload = data.get(DHydraMsg.PAYLOAD) or {}
if not isinstance(payload, dict):
raise TypeError("Payload must be a dict")
return cls(
sender=data[DHydraMsg.SENDER],
target=data[DHydraMsg.TARGET],
method=data[DHydraMsg.METHOD],
payload=payload,
)
[docs]
@classmethod
def from_json(cls, json_data: bytes) -> "HydraMsg":
"""
Create a HydraMsg from JSON bytes.
Args:
json_data: JSON-encoded message as bytes
Returns:
HydraMsg instance
Raises:
json.JSONDecodeError: If json_data is not valid JSON
UnicodeDecodeError: If json_data cannot be decoded as UTF-8
"""
return cls.from_dict(json.loads(json_data.decode("utf-8")))
[docs]
def to_dict(self) -> dict[str, Any]:
"""
Convert message to dictionary representation.
Returns:
Dictionary containing all message fields including version
"""
return {
DHydraMsg.SENDER: self._sender,
DHydraMsg.TARGET: self._target,
DHydraMsg.METHOD: self._method,
DHydraMsg.PAYLOAD: self._payload,
DHydraMsg.PROTOCOL_VERSION: DHydra.PROTOCOL_VERSION,
}
[docs]
def to_json(self) -> bytes:
"""
Convert message to JSON bytes for transmission.
Returns:
JSON-encoded message as UTF-8 bytes ready for ZeroMQ
Raises:
TypeError: If message contains non-serializable objects
"""
return json.dumps(self.to_dict()).encode("utf-8")
[docs]
def __repr__(self) -> str:
"""
Return string representation of message for debugging.
Returns:
String showing message structure
"""
return (
f"HydraMsg:{self._sender}->{self._target}:{self._method}({self._payload})"
)
[docs]
def __str__(self) -> str:
"""
Return human-readable string representation.
Returns:
Formatted string with message details
"""
return f"HydraMsg:{self._sender}->{self._target}:{self._method}()"