fix: remove dead pub/sub code from MessageBus
`subscribe_outbound()`, `dispatch_outbound()`, and `stop()` have zero callers — `ChannelManager._dispatch_outbound()` handles all outbound routing via `consume_outbound()` directly. Remove the dead methods and their unused imports (`Callable`, `Awaitable`, `logger`). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,9 +1,6 @@
|
|||||||
"""Async message queue for decoupled channel-agent communication."""
|
"""Async message queue for decoupled channel-agent communication."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Callable, Awaitable
|
|
||||||
|
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from nanobot.bus.events import InboundMessage, OutboundMessage
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||||||
|
|
||||||
@@ -11,70 +8,36 @@ from nanobot.bus.events import InboundMessage, OutboundMessage
|
|||||||
class MessageBus:
|
class MessageBus:
|
||||||
"""
|
"""
|
||||||
Async message bus that decouples chat channels from the agent core.
|
Async message bus that decouples chat channels from the agent core.
|
||||||
|
|
||||||
Channels push messages to the inbound queue, and the agent processes
|
Channels push messages to the inbound queue, and the agent processes
|
||||||
them and pushes responses to the outbound queue.
|
them and pushes responses to the outbound queue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
|
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
|
||||||
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
|
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()
|
||||||
self._outbound_subscribers: dict[str, list[Callable[[OutboundMessage], Awaitable[None]]]] = {}
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
async def publish_inbound(self, msg: InboundMessage) -> None:
|
async def publish_inbound(self, msg: InboundMessage) -> None:
|
||||||
"""Publish a message from a channel to the agent."""
|
"""Publish a message from a channel to the agent."""
|
||||||
await self.inbound.put(msg)
|
await self.inbound.put(msg)
|
||||||
|
|
||||||
async def consume_inbound(self) -> InboundMessage:
|
async def consume_inbound(self) -> InboundMessage:
|
||||||
"""Consume the next inbound message (blocks until available)."""
|
"""Consume the next inbound message (blocks until available)."""
|
||||||
return await self.inbound.get()
|
return await self.inbound.get()
|
||||||
|
|
||||||
async def publish_outbound(self, msg: OutboundMessage) -> None:
|
async def publish_outbound(self, msg: OutboundMessage) -> None:
|
||||||
"""Publish a response from the agent to channels."""
|
"""Publish a response from the agent to channels."""
|
||||||
await self.outbound.put(msg)
|
await self.outbound.put(msg)
|
||||||
|
|
||||||
async def consume_outbound(self) -> OutboundMessage:
|
async def consume_outbound(self) -> OutboundMessage:
|
||||||
"""Consume the next outbound message (blocks until available)."""
|
"""Consume the next outbound message (blocks until available)."""
|
||||||
return await self.outbound.get()
|
return await self.outbound.get()
|
||||||
|
|
||||||
def subscribe_outbound(
|
|
||||||
self,
|
|
||||||
channel: str,
|
|
||||||
callback: Callable[[OutboundMessage], Awaitable[None]]
|
|
||||||
) -> None:
|
|
||||||
"""Subscribe to outbound messages for a specific channel."""
|
|
||||||
if channel not in self._outbound_subscribers:
|
|
||||||
self._outbound_subscribers[channel] = []
|
|
||||||
self._outbound_subscribers[channel].append(callback)
|
|
||||||
|
|
||||||
async def dispatch_outbound(self) -> None:
|
|
||||||
"""
|
|
||||||
Dispatch outbound messages to subscribed channels.
|
|
||||||
Run this as a background task.
|
|
||||||
"""
|
|
||||||
self._running = True
|
|
||||||
while self._running:
|
|
||||||
try:
|
|
||||||
msg = await asyncio.wait_for(self.outbound.get(), timeout=1.0)
|
|
||||||
subscribers = self._outbound_subscribers.get(msg.channel, [])
|
|
||||||
for callback in subscribers:
|
|
||||||
try:
|
|
||||||
await callback(msg)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error dispatching to {msg.channel}: {e}")
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
def stop(self) -> None:
|
|
||||||
"""Stop the dispatcher loop."""
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def inbound_size(self) -> int:
|
def inbound_size(self) -> int:
|
||||||
"""Number of pending inbound messages."""
|
"""Number of pending inbound messages."""
|
||||||
return self.inbound.qsize()
|
return self.inbound.qsize()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def outbound_size(self) -> int:
|
def outbound_size(self) -> int:
|
||||||
"""Number of pending outbound messages."""
|
"""Number of pending outbound messages."""
|
||||||
|
|||||||
Reference in New Issue
Block a user