style(loop): remove formatting-only changes from upstream PR 881
This commit is contained in:
@@ -56,7 +56,6 @@ class AgentLoop:
|
||||
):
|
||||
from nanobot.config.schema import ExecToolConfig
|
||||
from nanobot.cron.service import CronService
|
||||
|
||||
self.bus = bus
|
||||
self.provider = provider
|
||||
self.workspace = workspace
|
||||
@@ -84,16 +83,16 @@ class AgentLoop:
|
||||
exec_config=self.exec_config,
|
||||
restrict_to_workspace=restrict_to_workspace,
|
||||
)
|
||||
|
||||
|
||||
self._running = False
|
||||
self._mcp_servers = mcp_servers or {}
|
||||
self._mcp_stack: AsyncExitStack | None = None
|
||||
self._mcp_connected = False
|
||||
self._consolidating: set[str] = set() # Session keys with consolidation in progress
|
||||
self._consolidation_tasks: set[asyncio.Task] = set() # Keep strong refs for in-flight tasks
|
||||
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
|
||||
self._consolidation_locks: dict[str, asyncio.Lock] = {}
|
||||
self._register_default_tools()
|
||||
|
||||
|
||||
def _register_default_tools(self) -> None:
|
||||
"""Register the default set of tools."""
|
||||
# File tools (workspace for relative paths, restrict if configured)
|
||||
@@ -102,39 +101,36 @@ class AgentLoop:
|
||||
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||
|
||||
|
||||
# Shell tool
|
||||
self.tools.register(
|
||||
ExecTool(
|
||||
working_dir=str(self.workspace),
|
||||
timeout=self.exec_config.timeout,
|
||||
restrict_to_workspace=self.restrict_to_workspace,
|
||||
)
|
||||
)
|
||||
|
||||
self.tools.register(ExecTool(
|
||||
working_dir=str(self.workspace),
|
||||
timeout=self.exec_config.timeout,
|
||||
restrict_to_workspace=self.restrict_to_workspace,
|
||||
))
|
||||
|
||||
# Web tools
|
||||
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
|
||||
self.tools.register(WebFetchTool())
|
||||
|
||||
|
||||
# Message tool
|
||||
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
|
||||
self.tools.register(message_tool)
|
||||
|
||||
|
||||
# Spawn tool (for subagents)
|
||||
spawn_tool = SpawnTool(manager=self.subagents)
|
||||
self.tools.register(spawn_tool)
|
||||
|
||||
|
||||
# Cron tool (for scheduling)
|
||||
if self.cron_service:
|
||||
self.tools.register(CronTool(self.cron_service))
|
||||
|
||||
|
||||
async def _connect_mcp(self) -> None:
|
||||
"""Connect to configured MCP servers (one-time, lazy)."""
|
||||
if self._mcp_connected or not self._mcp_servers:
|
||||
return
|
||||
self._mcp_connected = True
|
||||
from nanobot.agent.tools.mcp import connect_mcp_servers
|
||||
|
||||
self._mcp_stack = AsyncExitStack()
|
||||
await self._mcp_stack.__aenter__()
|
||||
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
|
||||
@@ -163,13 +159,11 @@ class AgentLoop:
|
||||
@staticmethod
|
||||
def _tool_hint(tool_calls: list) -> str:
|
||||
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
|
||||
|
||||
def _fmt(tc):
|
||||
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
|
||||
if not isinstance(val, str):
|
||||
return tc.name
|
||||
return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")'
|
||||
|
||||
return ", ".join(_fmt(tc) for tc in tool_calls)
|
||||
|
||||
async def _run_agent_loop(
|
||||
@@ -217,15 +211,13 @@ class AgentLoop:
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tc.name,
|
||||
"arguments": json.dumps(tc.arguments, ensure_ascii=False),
|
||||
},
|
||||
"arguments": json.dumps(tc.arguments, ensure_ascii=False)
|
||||
}
|
||||
}
|
||||
for tc in response.tool_calls
|
||||
]
|
||||
messages = self.context.add_assistant_message(
|
||||
messages,
|
||||
response.content,
|
||||
tool_call_dicts,
|
||||
messages, response.content, tool_call_dicts,
|
||||
reasoning_content=response.reasoning_content,
|
||||
)
|
||||
|
||||
@@ -243,13 +235,9 @@ class AgentLoop:
|
||||
# Give them one retry; don't forward the text to avoid duplicates.
|
||||
if not tools_used and not text_only_retried and final_content:
|
||||
text_only_retried = True
|
||||
logger.debug(
|
||||
"Interim text response (no tools used yet), retrying: {}",
|
||||
final_content[:80],
|
||||
)
|
||||
logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80])
|
||||
messages = self.context.add_assistant_message(
|
||||
messages,
|
||||
response.content,
|
||||
messages, response.content,
|
||||
reasoning_content=response.reasoning_content,
|
||||
)
|
||||
final_content = None
|
||||
@@ -266,23 +254,24 @@ class AgentLoop:
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
|
||||
msg = await asyncio.wait_for(
|
||||
self.bus.consume_inbound(),
|
||||
timeout=1.0
|
||||
)
|
||||
try:
|
||||
response = await self._process_message(msg)
|
||||
if response:
|
||||
await self.bus.publish_outbound(response)
|
||||
except Exception as e:
|
||||
logger.error("Error processing message: {}", e)
|
||||
await self.bus.publish_outbound(
|
||||
OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=f"Sorry, I encountered an error: {str(e)}",
|
||||
)
|
||||
)
|
||||
await self.bus.publish_outbound(OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=f"Sorry, I encountered an error: {str(e)}"
|
||||
))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
|
||||
async def close_mcp(self) -> None:
|
||||
"""Close MCP connections."""
|
||||
if self._mcp_stack:
|
||||
@@ -298,13 +287,12 @@ class AgentLoop:
|
||||
logger.info("Agent loop stopping")
|
||||
|
||||
def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock:
|
||||
"""Return a per-session lock for memory consolidation writers."""
|
||||
lock = self._consolidation_locks.get(session_key)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._consolidation_locks[session_key] = lock
|
||||
return lock
|
||||
|
||||
|
||||
async def _process_message(
|
||||
self,
|
||||
msg: InboundMessage,
|
||||
@@ -313,25 +301,25 @@ class AgentLoop:
|
||||
) -> OutboundMessage | None:
|
||||
"""
|
||||
Process a single inbound message.
|
||||
|
||||
|
||||
Args:
|
||||
msg: The inbound message to process.
|
||||
session_key: Override session key (used by process_direct).
|
||||
on_progress: Optional callback for intermediate output (defaults to bus publish).
|
||||
|
||||
|
||||
Returns:
|
||||
The response message, or None if no response needed.
|
||||
"""
|
||||
# System messages route back via chat_id ("channel:chat_id")
|
||||
if msg.channel == "system":
|
||||
return await self._process_system_message(msg)
|
||||
|
||||
|
||||
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
|
||||
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
|
||||
|
||||
|
||||
key = session_key or msg.session_key
|
||||
session = self.sessions.get_or_create(key)
|
||||
|
||||
|
||||
# Handle slash commands
|
||||
cmd = msg.content.strip().lower()
|
||||
if cmd == "/new":
|
||||
@@ -348,24 +336,18 @@ class AgentLoop:
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content="Could not start a new session because memory archival failed. Please try again.",
|
||||
content="Could not start a new session because memory archival failed. Please try again."
|
||||
)
|
||||
|
||||
session.clear()
|
||||
self.sessions.save(session)
|
||||
self.sessions.invalidate(session.key)
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content="New session started. Memory consolidation in progress.",
|
||||
)
|
||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||
content="New session started. Memory consolidation in progress.")
|
||||
if cmd == "/help":
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands",
|
||||
)
|
||||
|
||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
|
||||
|
||||
if len(session.messages) > self.memory_window and session.key not in self._consolidating:
|
||||
self._consolidating.add(session.key)
|
||||
lock = self._get_consolidation_lock(session.key)
|
||||
@@ -376,12 +358,12 @@ class AgentLoop:
|
||||
await self._consolidate_memory(session)
|
||||
finally:
|
||||
self._consolidating.discard(session.key)
|
||||
task = asyncio.current_task()
|
||||
if task is not None:
|
||||
self._consolidation_tasks.discard(task)
|
||||
_task = asyncio.current_task()
|
||||
if _task is not None:
|
||||
self._consolidation_tasks.discard(_task)
|
||||
|
||||
task = asyncio.create_task(_consolidate_and_unlock())
|
||||
self._consolidation_tasks.add(task)
|
||||
_task = asyncio.create_task(_consolidate_and_unlock())
|
||||
self._consolidation_tasks.add(_task)
|
||||
|
||||
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
|
||||
initial_messages = self.context.build_messages(
|
||||
@@ -393,49 +375,42 @@ class AgentLoop:
|
||||
)
|
||||
|
||||
async def _bus_progress(content: str) -> None:
|
||||
await self.bus.publish_outbound(
|
||||
OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=content,
|
||||
metadata=msg.metadata or {},
|
||||
)
|
||||
)
|
||||
await self.bus.publish_outbound(OutboundMessage(
|
||||
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
||||
metadata=msg.metadata or {},
|
||||
))
|
||||
|
||||
final_content, tools_used = await self._run_agent_loop(
|
||||
initial_messages,
|
||||
on_progress=on_progress or _bus_progress,
|
||||
initial_messages, on_progress=on_progress or _bus_progress,
|
||||
)
|
||||
|
||||
if final_content is None:
|
||||
final_content = "I've completed processing but have no response to give."
|
||||
|
||||
|
||||
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
|
||||
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
|
||||
|
||||
|
||||
session.add_message("user", msg.content)
|
||||
session.add_message(
|
||||
"assistant", final_content, tools_used=tools_used if tools_used else None
|
||||
)
|
||||
session.add_message("assistant", final_content,
|
||||
tools_used=tools_used if tools_used else None)
|
||||
self.sessions.save(session)
|
||||
|
||||
|
||||
return OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=final_content,
|
||||
metadata=msg.metadata
|
||||
or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
|
||||
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
|
||||
)
|
||||
|
||||
|
||||
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
|
||||
"""
|
||||
Process a system message (e.g., subagent announce).
|
||||
|
||||
|
||||
The chat_id field contains "original_channel:original_chat_id" to route
|
||||
the response back to the correct destination.
|
||||
"""
|
||||
logger.info("Processing system message from {}", msg.sender_id)
|
||||
|
||||
|
||||
# Parse origin from chat_id (format: "channel:chat_id")
|
||||
if ":" in msg.chat_id:
|
||||
parts = msg.chat_id.split(":", 1)
|
||||
@@ -445,7 +420,7 @@ class AgentLoop:
|
||||
# Fallback
|
||||
origin_channel = "cli"
|
||||
origin_chat_id = msg.chat_id
|
||||
|
||||
|
||||
session_key = f"{origin_channel}:{origin_chat_id}"
|
||||
session = self.sessions.get_or_create(session_key)
|
||||
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
|
||||
@@ -459,15 +434,17 @@ class AgentLoop:
|
||||
|
||||
if final_content is None:
|
||||
final_content = "Background task completed."
|
||||
|
||||
|
||||
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
|
||||
session.add_message("assistant", final_content)
|
||||
self.sessions.save(session)
|
||||
|
||||
|
||||
return OutboundMessage(
|
||||
channel=origin_channel, chat_id=origin_chat_id, content=final_content
|
||||
channel=origin_channel,
|
||||
chat_id=origin_chat_id,
|
||||
content=final_content
|
||||
)
|
||||
|
||||
|
||||
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
|
||||
"""Consolidate old messages into MEMORY.md + HISTORY.md.
|
||||
|
||||
@@ -480,49 +457,29 @@ class AgentLoop:
|
||||
if archive_all:
|
||||
old_messages = session.messages
|
||||
keep_count = 0
|
||||
logger.info(
|
||||
"Memory consolidation (archive_all): {} total messages archived",
|
||||
len(session.messages),
|
||||
)
|
||||
logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
|
||||
else:
|
||||
keep_count = self.memory_window // 2
|
||||
if len(session.messages) <= keep_count:
|
||||
logger.debug(
|
||||
"Session {}: No consolidation needed (messages={}, keep={})",
|
||||
session.key,
|
||||
len(session.messages),
|
||||
keep_count,
|
||||
)
|
||||
logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
|
||||
return
|
||||
|
||||
messages_to_process = len(session.messages) - session.last_consolidated
|
||||
if messages_to_process <= 0:
|
||||
logger.debug(
|
||||
"Session {}: No new messages to consolidate (last_consolidated={}, total={})",
|
||||
session.key,
|
||||
session.last_consolidated,
|
||||
len(session.messages),
|
||||
)
|
||||
logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
|
||||
return
|
||||
|
||||
old_messages = session.messages[session.last_consolidated : -keep_count]
|
||||
old_messages = session.messages[session.last_consolidated:-keep_count]
|
||||
if not old_messages:
|
||||
return
|
||||
logger.info(
|
||||
"Memory consolidation started: {} total, {} new to consolidate, {} keep",
|
||||
len(session.messages),
|
||||
len(old_messages),
|
||||
keep_count,
|
||||
)
|
||||
logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
|
||||
|
||||
lines = []
|
||||
for m in old_messages:
|
||||
if not m.get("content"):
|
||||
continue
|
||||
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
|
||||
lines.append(
|
||||
f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}"
|
||||
)
|
||||
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
|
||||
conversation = "\n".join(lines)
|
||||
current_memory = memory.read_long_term()
|
||||
|
||||
@@ -551,10 +508,7 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
try:
|
||||
response = await self.provider.chat(
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a memory consolidation agent. Respond only with valid JSON.",
|
||||
},
|
||||
{"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
model=self.model,
|
||||
@@ -567,10 +521,7 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
|
||||
result = json_repair.loads(text)
|
||||
if not isinstance(result, dict):
|
||||
logger.warning(
|
||||
"Memory consolidation: unexpected response type, skipping. Response: {}",
|
||||
text[:200],
|
||||
)
|
||||
logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200])
|
||||
return
|
||||
|
||||
if entry := result.get("history_entry"):
|
||||
@@ -589,11 +540,7 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
session.last_consolidated = 0
|
||||
else:
|
||||
session.last_consolidated = len(session.messages) - keep_count
|
||||
logger.info(
|
||||
"Memory consolidation done: {} messages, last_consolidated={}",
|
||||
len(session.messages),
|
||||
session.last_consolidated,
|
||||
)
|
||||
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
|
||||
except Exception as e:
|
||||
logger.error("Memory consolidation failed: {}", e)
|
||||
|
||||
@@ -607,21 +554,24 @@ Respond with ONLY valid JSON, no markdown fences."""
|
||||
) -> str:
|
||||
"""
|
||||
Process a message directly (for CLI or cron usage).
|
||||
|
||||
|
||||
Args:
|
||||
content: The message content.
|
||||
session_key: Session identifier (overrides channel:chat_id for session lookup).
|
||||
channel: Source channel (for tool context routing).
|
||||
chat_id: Source chat ID (for tool context routing).
|
||||
on_progress: Optional callback for intermediate output.
|
||||
|
||||
|
||||
Returns:
|
||||
The agent's response.
|
||||
"""
|
||||
await self._connect_mcp()
|
||||
msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content)
|
||||
|
||||
response = await self._process_message(
|
||||
msg, session_key=session_key, on_progress=on_progress
|
||||
msg = InboundMessage(
|
||||
channel=channel,
|
||||
sender_id="user",
|
||||
chat_id=chat_id,
|
||||
content=content
|
||||
)
|
||||
|
||||
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
|
||||
return response.content if response else ""
|
||||
|
||||
Reference in New Issue
Block a user