style(loop): drop formatting-only churn against upstream main

This commit is contained in:
Alexander Minges
2026-02-20 13:57:39 +01:00
parent df022febaf
commit 426ef71ce7

View File

@@ -6,7 +6,7 @@ import json
import json_repair import json_repair
from pathlib import Path from pathlib import Path
import re import re
from typing import Awaitable, Callable from typing import Any, Awaitable, Callable
from loguru import logger from loguru import logger
@@ -56,7 +56,6 @@ class AgentLoop:
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
self.bus = bus self.bus = bus
self.provider = provider self.provider = provider
self.workspace = workspace self.workspace = workspace
@@ -104,13 +103,11 @@ class AgentLoop:
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool # Shell tool
self.tools.register( self.tools.register(ExecTool(
ExecTool(
working_dir=str(self.workspace), working_dir=str(self.workspace),
timeout=self.exec_config.timeout, timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace, restrict_to_workspace=self.restrict_to_workspace,
) ))
)
# Web tools # Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebSearchTool(api_key=self.brave_api_key))
@@ -134,7 +131,6 @@ class AgentLoop:
return return
self._mcp_connected = True self._mcp_connected = True
from nanobot.agent.tools.mcp import connect_mcp_servers from nanobot.agent.tools.mcp import connect_mcp_servers
self._mcp_stack = AsyncExitStack() self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__() await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
@@ -163,13 +159,11 @@ class AgentLoop:
@staticmethod @staticmethod
def _tool_hint(tool_calls: list) -> str: def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'.""" """Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc): def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str): if not isinstance(val, str):
return tc.name return tc.name
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")' return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls) return ", ".join(_fmt(tc) for tc in tool_calls)
async def _run_agent_loop( async def _run_agent_loop(
@@ -217,15 +211,13 @@ class AgentLoop:
"type": "function", "type": "function",
"function": { "function": {
"name": tc.name, "name": tc.name,
"arguments": json.dumps(tc.arguments, ensure_ascii=False), "arguments": json.dumps(tc.arguments, ensure_ascii=False)
}, }
} }
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, messages, response.content, tool_call_dicts,
response.content,
tool_call_dicts,
reasoning_content=response.reasoning_content, reasoning_content=response.reasoning_content,
) )
@@ -243,13 +235,9 @@ class AgentLoop:
# Give them one retry; don't forward the text to avoid duplicates. # Give them one retry; don't forward the text to avoid duplicates.
if not tools_used and not text_only_retried and final_content: if not tools_used and not text_only_retried and final_content:
text_only_retried = True text_only_retried = True
logger.debug( logger.debug("Interim text response (no tools used yet), retrying: {}", final_content[:80])
"Interim text response (no tools used yet), retrying: {}",
final_content[:80],
)
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, messages, response.content,
response.content,
reasoning_content=response.reasoning_content, reasoning_content=response.reasoning_content,
) )
final_content = None final_content = None
@@ -266,20 +254,21 @@ class AgentLoop:
while self._running: while self._running:
try: try:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
try: try:
response = await self._process_message(msg) response = await self._process_message(msg)
if response: if response:
await self.bus.publish_outbound(response) await self.bus.publish_outbound(response)
except Exception as e: except Exception as e:
logger.error("Error processing message: {}", e) logger.error("Error processing message: {}", e)
await self.bus.publish_outbound( await self.bus.publish_outbound(OutboundMessage(
OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}", content=f"Sorry, I encountered an error: {str(e)}"
) ))
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
@@ -343,7 +332,7 @@ class AgentLoop:
cmd = msg.content.strip().lower() cmd = msg.content.strip().lower()
if cmd == "/new": if cmd == "/new":
lock = self._get_consolidation_lock(session.key) lock = self._get_consolidation_lock(session.key)
messages_to_archive = [] messages_to_archive: list[dict[str, Any]] = []
try: try:
async with lock: async with lock:
messages_to_archive = session.messages[session.last_consolidated :].copy() messages_to_archive = session.messages[session.last_consolidated :].copy()
@@ -369,17 +358,11 @@ class AgentLoop:
self.sessions.save(session) self.sessions.save(session)
self.sessions.invalidate(session.key) self.sessions.invalidate(session.key)
self._prune_consolidation_lock(session.key, lock) self._prune_consolidation_lock(session.key, lock)
return OutboundMessage( return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
channel=msg.channel, content="New session started. Memory consolidation in progress.")
chat_id=msg.chat_id,
content="New session started. Memory consolidation in progress.",
)
if cmd == "/help": if cmd == "/help":
return OutboundMessage( return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
channel=msg.channel, content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands",
)
if len(session.messages) > self.memory_window and session.key not in self._consolidating: if len(session.messages) > self.memory_window and session.key not in self._consolidating:
self._consolidating.add(session.key) self._consolidating.add(session.key)
@@ -409,18 +392,13 @@ class AgentLoop:
) )
async def _bus_progress(content: str) -> None: async def _bus_progress(content: str) -> None:
await self.bus.publish_outbound( await self.bus.publish_outbound(OutboundMessage(
OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content,
channel=msg.channel,
chat_id=msg.chat_id,
content=content,
metadata=msg.metadata or {}, metadata=msg.metadata or {},
) ))
)
final_content, tools_used = await self._run_agent_loop( final_content, tools_used = await self._run_agent_loop(
initial_messages, initial_messages, on_progress=on_progress or _bus_progress,
on_progress=on_progress or _bus_progress,
) )
if final_content is None: if final_content is None:
@@ -430,17 +408,15 @@ class AgentLoop:
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
session.add_message("user", msg.content) session.add_message("user", msg.content)
session.add_message( session.add_message("assistant", final_content,
"assistant", final_content, tools_used=tools_used if tools_used else None tools_used=tools_used if tools_used else None)
)
self.sessions.save(session) self.sessions.save(session)
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=final_content, content=final_content,
metadata=msg.metadata metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
) )
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
@@ -481,7 +457,9 @@ class AgentLoop:
self.sessions.save(session) self.sessions.save(session)
return OutboundMessage( return OutboundMessage(
channel=origin_channel, chat_id=origin_chat_id, content=final_content channel=origin_channel,
chat_id=origin_chat_id,
content=final_content
) )
async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: async def _consolidate_memory(self, session, archive_all: bool = False) -> bool:
@@ -496,49 +474,29 @@ class AgentLoop:
if archive_all: if archive_all:
old_messages = session.messages old_messages = session.messages
keep_count = 0 keep_count = 0
logger.info( logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
"Memory consolidation (archive_all): {} total messages archived",
len(session.messages),
)
else: else:
keep_count = self.memory_window // 2 keep_count = self.memory_window // 2
if len(session.messages) <= keep_count: if len(session.messages) <= keep_count:
logger.debug( logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
"Session {}: No consolidation needed (messages={}, keep={})",
session.key,
len(session.messages),
keep_count,
)
return True return True
messages_to_process = len(session.messages) - session.last_consolidated messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <= 0: if messages_to_process <= 0:
logger.debug( logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
"Session {}: No new messages to consolidate (last_consolidated={}, total={})",
session.key,
session.last_consolidated,
len(session.messages),
)
return True return True
old_messages = session.messages[session.last_consolidated : -keep_count] old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages: if not old_messages:
return True return True
logger.info( logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
"Memory consolidation started: {} total, {} new to consolidate, {} keep",
len(session.messages),
len(old_messages),
keep_count,
)
lines = [] lines = []
for m in old_messages: for m in old_messages:
if not m.get("content"): if not m.get("content"):
continue continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append( lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}"
)
conversation = "\n".join(lines) conversation = "\n".join(lines)
current_memory = memory.read_long_term() current_memory = memory.read_long_term()
@@ -567,10 +525,7 @@ Respond with ONLY valid JSON, no markdown fences."""
try: try:
response = await self.provider.chat( response = await self.provider.chat(
messages=[ messages=[
{ {"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."},
"role": "system",
"content": "You are a memory consolidation agent. Respond only with valid JSON.",
},
{"role": "user", "content": prompt}, {"role": "user", "content": prompt},
], ],
model=self.model, model=self.model,
@@ -583,10 +538,7 @@ Respond with ONLY valid JSON, no markdown fences."""
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
result = json_repair.loads(text) result = json_repair.loads(text)
if not isinstance(result, dict): if not isinstance(result, dict):
logger.warning( logger.warning("Memory consolidation: unexpected response type, skipping. Response: {}", text[:200])
"Memory consolidation: unexpected response type, skipping. Response: {}",
text[:200],
)
return False return False
if entry := result.get("history_entry"): if entry := result.get("history_entry"):
@@ -605,11 +557,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session.last_consolidated = 0 session.last_consolidated = 0
else: else:
session.last_consolidated = len(session.messages) - keep_count session.last_consolidated = len(session.messages) - keep_count
logger.info( logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
"Memory consolidation done: {} messages, last_consolidated={}",
len(session.messages),
session.last_consolidated,
)
return True return True
except Exception as e: except Exception as e:
logger.error("Memory consolidation failed: {}", e) logger.error("Memory consolidation failed: {}", e)
@@ -637,9 +585,12 @@ Respond with ONLY valid JSON, no markdown fences."""
The agent's response. The agent's response.
""" """
await self._connect_mcp() await self._connect_mcp()
msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) msg = InboundMessage(
channel=channel,
response = await self._process_message( sender_id="user",
msg, session_key=session_key, on_progress=on_progress chat_id=chat_id,
content=content
) )
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else "" return response.content if response else ""