From ab026c513162580443e1c386f13539b088aab770 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sat, 21 Feb 2026 08:14:46 +0000 Subject: [PATCH] refactor: extract memory consolidation to MemoryStore, slim down AgentLoop --- README.md | 2 +- nanobot/agent/loop.py | 257 ++++++---------------------------------- nanobot/agent/memory.py | 108 +++++++++++++++++ 3 files changed, 147 insertions(+), 220 deletions(-) diff --git a/README.md b/README.md index 68ad5a9..c5fd908 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. -📏 Real-time line count: **3,827 lines** (run `bash core_agent_lines.sh` to verify anytime) +📏 Real-time line count: **3,806 lines** (run `bash core_agent_lines.sh` to verify anytime) ## 📢 News diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 325c1ac..50f6ec2 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -99,33 +99,18 @@ class AgentLoop: def _register_default_tools(self) -> None: """Register the default set of tools.""" - # File tools (workspace for relative paths, restrict if configured) allowed_dir = self.workspace if self.restrict_to_workspace else None - self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir)) - self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir)) - - # Shell tool + for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool): + self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir)) self.tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, restrict_to_workspace=self.restrict_to_workspace, )) - - # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) self.tools.register(WebFetchTool()) - - # Message tool - message_tool = MessageTool(send_callback=self.bus.publish_outbound) - self.tools.register(message_tool) - - # Spawn tool (for subagents) - spawn_tool = SpawnTool(manager=self.subagents) - self.tools.register(spawn_tool) - - # Cron tool (for scheduling) + self.tools.register(MessageTool(send_callback=self.bus.publish_outbound)) + self.tools.register(SpawnTool(manager=self.subagents)) if self.cron_service: self.tools.register(CronTool(self.cron_service)) @@ -187,16 +172,7 @@ class AgentLoop: initial_messages: list[dict], on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> tuple[str | None, list[str]]: - """ - Run the agent iteration loop. - - Args: - initial_messages: Starting messages for the LLM conversation. - on_progress: Optional callback to push intermediate content to the user. - - Returns: - Tuple of (final_content, list_of_tools_used). - """ + """Run the agent iteration loop. Returns (final_content, tools_used).""" messages = initial_messages iteration = 0 final_content = None @@ -297,20 +273,25 @@ class AgentLoop: session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> OutboundMessage | None: - """ - Process a single inbound message. - - Args: - msg: The inbound message to process. - session_key: Override session key (used by process_direct). - on_progress: Optional callback for intermediate output (defaults to bus publish). - - Returns: - The response message, or None if no response needed. - """ - # System messages route back via chat_id ("channel:chat_id") + """Process a single inbound message and return the response.""" + # System messages: parse origin from chat_id ("channel:chat_id") if msg.channel == "system": - return await self._process_system_message(msg) + channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id + else ("cli", msg.chat_id)) + logger.info("Processing system message from {}", msg.sender_id) + key = f"{channel}:{chat_id}" + session = self.sessions.get_or_create(key) + self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) + messages = self.context.build_messages( + history=session.get_history(max_messages=self.memory_window), + current_message=msg.content, channel=channel, chat_id=chat_id, + ) + final_content, _ = await self._run_agent_loop(messages) + session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") + session.add_message("assistant", final_content or "Background task completed.") + self.sessions.save(session) + return OutboundMessage(channel=channel, chat_id=chat_id, + content=final_content or "Background task completed.") preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) @@ -318,19 +299,18 @@ class AgentLoop: key = session_key or msg.session_key session = self.sessions.get_or_create(key) - # Handle slash commands + # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - # Capture messages before clearing (avoid race condition with background task) messages_to_archive = session.messages.copy() session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) async def _consolidate_and_cleanup(): - temp_session = Session(key=session.key) - temp_session.messages = messages_to_archive - await self._consolidate_memory(temp_session, archive_all=True) + temp = Session(key=session.key) + temp.messages = messages_to_archive + await self._consolidate_memory(temp, archive_all=True) asyncio.create_task(_consolidate_and_cleanup()) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, @@ -359,16 +339,14 @@ class AgentLoop: history=session.get_history(max_messages=self.memory_window), current_message=msg.content, media=msg.media if msg.media else None, - channel=msg.channel, - chat_id=msg.chat_id, + channel=msg.channel, chat_id=msg.chat_id, ) async def _bus_progress(content: str) -> None: meta = dict(msg.metadata or {}) meta["_progress"] = True await self.bus.publish_outbound(OutboundMessage( - channel=msg.channel, chat_id=msg.chat_id, content=content, - metadata=meta, + channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, )) final_content, tools_used = await self._run_agent_loop( @@ -391,157 +369,16 @@ class AgentLoop: return None return OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=final_content, - metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts) - ) - - async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: - """ - Process a system message (e.g., subagent announce). - - The chat_id field contains "original_channel:original_chat_id" to route - the response back to the correct destination. - """ - logger.info("Processing system message from {}", msg.sender_id) - - # Parse origin from chat_id (format: "channel:chat_id") - if ":" in msg.chat_id: - parts = msg.chat_id.split(":", 1) - origin_channel = parts[0] - origin_chat_id = parts[1] - else: - # Fallback - origin_channel = "cli" - origin_chat_id = msg.chat_id - - session_key = f"{origin_channel}:{origin_chat_id}" - session = self.sessions.get_or_create(session_key) - self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id")) - initial_messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), - current_message=msg.content, - channel=origin_channel, - chat_id=origin_chat_id, - ) - final_content, _ = await self._run_agent_loop(initial_messages) - - if final_content is None: - final_content = "Background task completed." - - session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") - session.add_message("assistant", final_content) - self.sessions.save(session) - - return OutboundMessage( - channel=origin_channel, - chat_id=origin_chat_id, - content=final_content + channel=msg.channel, chat_id=msg.chat_id, content=final_content, + metadata=msg.metadata or {}, ) async def _consolidate_memory(self, session, archive_all: bool = False) -> None: - """Consolidate old messages into MEMORY.md + HISTORY.md. - - Args: - archive_all: If True, clear all messages and reset session (for /new command). - If False, only write to files without modifying session. - """ - memory = MemoryStore(self.workspace) - - if archive_all: - old_messages = session.messages - keep_count = 0 - logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages)) - else: - keep_count = self.memory_window // 2 - if len(session.messages) <= keep_count: - logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count) - return - - messages_to_process = len(session.messages) - session.last_consolidated - if messages_to_process <= 0: - logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages)) - return - - old_messages = session.messages[session.last_consolidated:-keep_count] - if not old_messages: - return - logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count) - - lines = [] - for m in old_messages: - if not m.get("content"): - continue - tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" - lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") - conversation = "\n".join(lines) - current_memory = memory.read_long_term() - - prompt = f"""Process this conversation and call the save_memory tool with your consolidation. - -## Current Long-term Memory -{current_memory or "(empty)"} - -## Conversation to Process -{conversation}""" - - save_memory_tool = [ - { - "type": "function", - "function": { - "name": "save_memory", - "description": "Save the memory consolidation result to persistent storage.", - "parameters": { - "type": "object", - "properties": { - "history_entry": { - "type": "string", - "description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later.", - }, - "memory_update": { - "type": "string", - "description": "The full updated long-term memory content as a markdown string. Include all existing facts plus any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged.", - }, - }, - "required": ["history_entry", "memory_update"], - }, - }, - } - ] - - try: - response = await self.provider.chat( - messages=[ - {"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."}, - {"role": "user", "content": prompt}, - ], - tools=save_memory_tool, - model=self.model, - ) - - if not response.has_tool_calls: - logger.warning("Memory consolidation: LLM did not call save_memory tool, skipping") - return - - args = response.tool_calls[0].arguments - if entry := args.get("history_entry"): - if not isinstance(entry, str): - entry = json.dumps(entry, ensure_ascii=False) - memory.append_history(entry) - if update := args.get("memory_update"): - if not isinstance(update, str): - update = json.dumps(update, ensure_ascii=False) - if update != current_memory: - memory.write_long_term(update) - - if archive_all: - session.last_consolidated = 0 - else: - session.last_consolidated = len(session.messages) - keep_count - logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) - except Exception as e: - logger.error("Memory consolidation failed: {}", e) + """Delegate to MemoryStore.consolidate().""" + await MemoryStore(self.workspace).consolidate( + session, self.provider, self.model, + archive_all=archive_all, memory_window=self.memory_window, + ) async def process_direct( self, @@ -551,26 +388,8 @@ class AgentLoop: chat_id: str = "direct", on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> str: - """ - Process a message directly (for CLI or cron usage). - - Args: - content: The message content. - session_key: Session identifier (overrides channel:chat_id for session lookup). - channel: Source channel (for tool context routing). - chat_id: Source chat ID (for tool context routing). - on_progress: Optional callback for intermediate output. - - Returns: - The agent's response. - """ + """Process a message directly (for CLI or cron usage).""" await self._connect_mcp() - msg = InboundMessage( - channel=channel, - sender_id="user", - chat_id=chat_id, - content=content - ) - + msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else "" diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 29477c4..51abd8f 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -1,9 +1,46 @@ """Memory system for persistent agent memory.""" +from __future__ import annotations + +import json from pathlib import Path +from typing import TYPE_CHECKING + +from loguru import logger from nanobot.utils.helpers import ensure_dir +if TYPE_CHECKING: + from nanobot.providers.base import LLMProvider + from nanobot.session.manager import Session + + +_SAVE_MEMORY_TOOL = [ + { + "type": "function", + "function": { + "name": "save_memory", + "description": "Save the memory consolidation result to persistent storage.", + "parameters": { + "type": "object", + "properties": { + "history_entry": { + "type": "string", + "description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. " + "Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.", + }, + "memory_update": { + "type": "string", + "description": "Full updated long-term memory as markdown. Include all existing " + "facts plus new ones. Return unchanged if nothing new.", + }, + }, + "required": ["history_entry", "memory_update"], + }, + }, + } +] + class MemoryStore: """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" @@ -28,3 +65,74 @@ class MemoryStore: def get_memory_context(self) -> str: long_term = self.read_long_term() return f"## Long-term Memory\n{long_term}" if long_term else "" + + async def consolidate( + self, + session: Session, + provider: LLMProvider, + model: str, + *, + archive_all: bool = False, + memory_window: int = 50, + ) -> None: + """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.""" + if archive_all: + old_messages = session.messages + keep_count = 0 + logger.info("Memory consolidation (archive_all): {} messages", len(session.messages)) + else: + keep_count = memory_window // 2 + if len(session.messages) <= keep_count: + return + if len(session.messages) - session.last_consolidated <= 0: + return + old_messages = session.messages[session.last_consolidated:-keep_count] + if not old_messages: + return + logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count) + + lines = [] + for m in old_messages: + if not m.get("content"): + continue + tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else "" + lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}") + + current_memory = self.read_long_term() + prompt = f"""Process this conversation and call the save_memory tool with your consolidation. + +## Current Long-term Memory +{current_memory or "(empty)"} + +## Conversation to Process +{chr(10).join(lines)}""" + + try: + response = await provider.chat( + messages=[ + {"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."}, + {"role": "user", "content": prompt}, + ], + tools=_SAVE_MEMORY_TOOL, + model=model, + ) + + if not response.has_tool_calls: + logger.warning("Memory consolidation: LLM did not call save_memory, skipping") + return + + args = response.tool_calls[0].arguments + if entry := args.get("history_entry"): + if not isinstance(entry, str): + entry = json.dumps(entry, ensure_ascii=False) + self.append_history(entry) + if update := args.get("memory_update"): + if not isinstance(update, str): + update = json.dumps(update, ensure_ascii=False) + if update != current_memory: + self.write_long_term(update) + + session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count + logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) + except Exception as e: + logger.error("Memory consolidation failed: {}", e)