Merge pull request #930 to slim down agent loop
refactor: extract memory consolidation to MemoryStore, slim down agent loop
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
|
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
|
||||||
|
|
||||||
📏 Real-time line count: **3,827 lines** (run `bash core_agent_lines.sh` to verify anytime)
|
📏 Real-time line count: **3,806 lines** (run `bash core_agent_lines.sh` to verify anytime)
|
||||||
|
|
||||||
## 📢 News
|
## 📢 News
|
||||||
|
|
||||||
|
|||||||
@@ -99,33 +99,18 @@ class AgentLoop:
|
|||||||
|
|
||||||
def _register_default_tools(self) -> None:
|
def _register_default_tools(self) -> None:
|
||||||
"""Register the default set of tools."""
|
"""Register the default set of tools."""
|
||||||
# File tools (workspace for relative paths, restrict if configured)
|
|
||||||
allowed_dir = self.workspace if self.restrict_to_workspace else None
|
allowed_dir = self.workspace if self.restrict_to_workspace else None
|
||||||
self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
|
||||||
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
|
||||||
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
|
||||||
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
|
|
||||||
|
|
||||||
# Shell tool
|
|
||||||
self.tools.register(ExecTool(
|
self.tools.register(ExecTool(
|
||||||
working_dir=str(self.workspace),
|
working_dir=str(self.workspace),
|
||||||
timeout=self.exec_config.timeout,
|
timeout=self.exec_config.timeout,
|
||||||
restrict_to_workspace=self.restrict_to_workspace,
|
restrict_to_workspace=self.restrict_to_workspace,
|
||||||
))
|
))
|
||||||
|
|
||||||
# Web tools
|
|
||||||
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
|
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
|
||||||
self.tools.register(WebFetchTool())
|
self.tools.register(WebFetchTool())
|
||||||
|
self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
|
||||||
# Message tool
|
self.tools.register(SpawnTool(manager=self.subagents))
|
||||||
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
|
|
||||||
self.tools.register(message_tool)
|
|
||||||
|
|
||||||
# Spawn tool (for subagents)
|
|
||||||
spawn_tool = SpawnTool(manager=self.subagents)
|
|
||||||
self.tools.register(spawn_tool)
|
|
||||||
|
|
||||||
# Cron tool (for scheduling)
|
|
||||||
if self.cron_service:
|
if self.cron_service:
|
||||||
self.tools.register(CronTool(self.cron_service))
|
self.tools.register(CronTool(self.cron_service))
|
||||||
|
|
||||||
@@ -187,16 +172,7 @@ class AgentLoop:
|
|||||||
initial_messages: list[dict],
|
initial_messages: list[dict],
|
||||||
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
||||||
) -> tuple[str | None, list[str]]:
|
) -> tuple[str | None, list[str]]:
|
||||||
"""
|
"""Run the agent iteration loop. Returns (final_content, tools_used)."""
|
||||||
Run the agent iteration loop.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
initial_messages: Starting messages for the LLM conversation.
|
|
||||||
on_progress: Optional callback to push intermediate content to the user.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of (final_content, list_of_tools_used).
|
|
||||||
"""
|
|
||||||
messages = initial_messages
|
messages = initial_messages
|
||||||
iteration = 0
|
iteration = 0
|
||||||
final_content = None
|
final_content = None
|
||||||
@@ -297,20 +273,25 @@ class AgentLoop:
|
|||||||
session_key: str | None = None,
|
session_key: str | None = None,
|
||||||
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
||||||
) -> OutboundMessage | None:
|
) -> OutboundMessage | None:
|
||||||
"""
|
"""Process a single inbound message and return the response."""
|
||||||
Process a single inbound message.
|
# System messages: parse origin from chat_id ("channel:chat_id")
|
||||||
|
|
||||||
Args:
|
|
||||||
msg: The inbound message to process.
|
|
||||||
session_key: Override session key (used by process_direct).
|
|
||||||
on_progress: Optional callback for intermediate output (defaults to bus publish).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The response message, or None if no response needed.
|
|
||||||
"""
|
|
||||||
# System messages route back via chat_id ("channel:chat_id")
|
|
||||||
if msg.channel == "system":
|
if msg.channel == "system":
|
||||||
return await self._process_system_message(msg)
|
channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id
|
||||||
|
else ("cli", msg.chat_id))
|
||||||
|
logger.info("Processing system message from {}", msg.sender_id)
|
||||||
|
key = f"{channel}:{chat_id}"
|
||||||
|
session = self.sessions.get_or_create(key)
|
||||||
|
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
|
||||||
|
messages = self.context.build_messages(
|
||||||
|
history=session.get_history(max_messages=self.memory_window),
|
||||||
|
current_message=msg.content, channel=channel, chat_id=chat_id,
|
||||||
|
)
|
||||||
|
final_content, _ = await self._run_agent_loop(messages)
|
||||||
|
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
|
||||||
|
session.add_message("assistant", final_content or "Background task completed.")
|
||||||
|
self.sessions.save(session)
|
||||||
|
return OutboundMessage(channel=channel, chat_id=chat_id,
|
||||||
|
content=final_content or "Background task completed.")
|
||||||
|
|
||||||
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
|
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
|
||||||
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
|
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
|
||||||
@@ -318,19 +299,18 @@ class AgentLoop:
|
|||||||
key = session_key or msg.session_key
|
key = session_key or msg.session_key
|
||||||
session = self.sessions.get_or_create(key)
|
session = self.sessions.get_or_create(key)
|
||||||
|
|
||||||
# Handle slash commands
|
# Slash commands
|
||||||
cmd = msg.content.strip().lower()
|
cmd = msg.content.strip().lower()
|
||||||
if cmd == "/new":
|
if cmd == "/new":
|
||||||
# Capture messages before clearing (avoid race condition with background task)
|
|
||||||
messages_to_archive = session.messages.copy()
|
messages_to_archive = session.messages.copy()
|
||||||
session.clear()
|
session.clear()
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
self.sessions.invalidate(session.key)
|
self.sessions.invalidate(session.key)
|
||||||
|
|
||||||
async def _consolidate_and_cleanup():
|
async def _consolidate_and_cleanup():
|
||||||
temp_session = Session(key=session.key)
|
temp = Session(key=session.key)
|
||||||
temp_session.messages = messages_to_archive
|
temp.messages = messages_to_archive
|
||||||
await self._consolidate_memory(temp_session, archive_all=True)
|
await self._consolidate_memory(temp, archive_all=True)
|
||||||
|
|
||||||
asyncio.create_task(_consolidate_and_cleanup())
|
asyncio.create_task(_consolidate_and_cleanup())
|
||||||
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
|
||||||
@@ -359,16 +339,14 @@ class AgentLoop:
|
|||||||
history=session.get_history(max_messages=self.memory_window),
|
history=session.get_history(max_messages=self.memory_window),
|
||||||
current_message=msg.content,
|
current_message=msg.content,
|
||||||
media=msg.media if msg.media else None,
|
media=msg.media if msg.media else None,
|
||||||
channel=msg.channel,
|
channel=msg.channel, chat_id=msg.chat_id,
|
||||||
chat_id=msg.chat_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _bus_progress(content: str) -> None:
|
async def _bus_progress(content: str) -> None:
|
||||||
meta = dict(msg.metadata or {})
|
meta = dict(msg.metadata or {})
|
||||||
meta["_progress"] = True
|
meta["_progress"] = True
|
||||||
await self.bus.publish_outbound(OutboundMessage(
|
await self.bus.publish_outbound(OutboundMessage(
|
||||||
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
|
||||||
metadata=meta,
|
|
||||||
))
|
))
|
||||||
|
|
||||||
final_content, tools_used = await self._run_agent_loop(
|
final_content, tools_used = await self._run_agent_loop(
|
||||||
@@ -391,157 +369,16 @@ class AgentLoop:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
return OutboundMessage(
|
return OutboundMessage(
|
||||||
channel=msg.channel,
|
channel=msg.channel, chat_id=msg.chat_id, content=final_content,
|
||||||
chat_id=msg.chat_id,
|
metadata=msg.metadata or {},
|
||||||
content=final_content,
|
|
||||||
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
|
|
||||||
"""
|
|
||||||
Process a system message (e.g., subagent announce).
|
|
||||||
|
|
||||||
The chat_id field contains "original_channel:original_chat_id" to route
|
|
||||||
the response back to the correct destination.
|
|
||||||
"""
|
|
||||||
logger.info("Processing system message from {}", msg.sender_id)
|
|
||||||
|
|
||||||
# Parse origin from chat_id (format: "channel:chat_id")
|
|
||||||
if ":" in msg.chat_id:
|
|
||||||
parts = msg.chat_id.split(":", 1)
|
|
||||||
origin_channel = parts[0]
|
|
||||||
origin_chat_id = parts[1]
|
|
||||||
else:
|
|
||||||
# Fallback
|
|
||||||
origin_channel = "cli"
|
|
||||||
origin_chat_id = msg.chat_id
|
|
||||||
|
|
||||||
session_key = f"{origin_channel}:{origin_chat_id}"
|
|
||||||
session = self.sessions.get_or_create(session_key)
|
|
||||||
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
|
|
||||||
initial_messages = self.context.build_messages(
|
|
||||||
history=session.get_history(max_messages=self.memory_window),
|
|
||||||
current_message=msg.content,
|
|
||||||
channel=origin_channel,
|
|
||||||
chat_id=origin_chat_id,
|
|
||||||
)
|
|
||||||
final_content, _ = await self._run_agent_loop(initial_messages)
|
|
||||||
|
|
||||||
if final_content is None:
|
|
||||||
final_content = "Background task completed."
|
|
||||||
|
|
||||||
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
|
|
||||||
session.add_message("assistant", final_content)
|
|
||||||
self.sessions.save(session)
|
|
||||||
|
|
||||||
return OutboundMessage(
|
|
||||||
channel=origin_channel,
|
|
||||||
chat_id=origin_chat_id,
|
|
||||||
content=final_content
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
|
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
|
||||||
"""Consolidate old messages into MEMORY.md + HISTORY.md.
|
"""Delegate to MemoryStore.consolidate()."""
|
||||||
|
await MemoryStore(self.workspace).consolidate(
|
||||||
Args:
|
session, self.provider, self.model,
|
||||||
archive_all: If True, clear all messages and reset session (for /new command).
|
archive_all=archive_all, memory_window=self.memory_window,
|
||||||
If False, only write to files without modifying session.
|
)
|
||||||
"""
|
|
||||||
memory = MemoryStore(self.workspace)
|
|
||||||
|
|
||||||
if archive_all:
|
|
||||||
old_messages = session.messages
|
|
||||||
keep_count = 0
|
|
||||||
logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
|
|
||||||
else:
|
|
||||||
keep_count = self.memory_window // 2
|
|
||||||
if len(session.messages) <= keep_count:
|
|
||||||
logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
|
|
||||||
return
|
|
||||||
|
|
||||||
messages_to_process = len(session.messages) - session.last_consolidated
|
|
||||||
if messages_to_process <= 0:
|
|
||||||
logger.debug("Session {}: No new messages to consolidate (last_consolidated={}, total={})", session.key, session.last_consolidated, len(session.messages))
|
|
||||||
return
|
|
||||||
|
|
||||||
old_messages = session.messages[session.last_consolidated:-keep_count]
|
|
||||||
if not old_messages:
|
|
||||||
return
|
|
||||||
logger.info("Memory consolidation started: {} total, {} new to consolidate, {} keep", len(session.messages), len(old_messages), keep_count)
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
for m in old_messages:
|
|
||||||
if not m.get("content"):
|
|
||||||
continue
|
|
||||||
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
|
|
||||||
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
|
|
||||||
conversation = "\n".join(lines)
|
|
||||||
current_memory = memory.read_long_term()
|
|
||||||
|
|
||||||
prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
|
|
||||||
|
|
||||||
## Current Long-term Memory
|
|
||||||
{current_memory or "(empty)"}
|
|
||||||
|
|
||||||
## Conversation to Process
|
|
||||||
{conversation}"""
|
|
||||||
|
|
||||||
save_memory_tool = [
|
|
||||||
{
|
|
||||||
"type": "function",
|
|
||||||
"function": {
|
|
||||||
"name": "save_memory",
|
|
||||||
"description": "Save the memory consolidation result to persistent storage.",
|
|
||||||
"parameters": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"history_entry": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later.",
|
|
||||||
},
|
|
||||||
"memory_update": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "The full updated long-term memory content as a markdown string. Include all existing facts plus any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"required": ["history_entry", "memory_update"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
response = await self.provider.chat(
|
|
||||||
messages=[
|
|
||||||
{"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."},
|
|
||||||
{"role": "user", "content": prompt},
|
|
||||||
],
|
|
||||||
tools=save_memory_tool,
|
|
||||||
model=self.model,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not response.has_tool_calls:
|
|
||||||
logger.warning("Memory consolidation: LLM did not call save_memory tool, skipping")
|
|
||||||
return
|
|
||||||
|
|
||||||
args = response.tool_calls[0].arguments
|
|
||||||
if entry := args.get("history_entry"):
|
|
||||||
if not isinstance(entry, str):
|
|
||||||
entry = json.dumps(entry, ensure_ascii=False)
|
|
||||||
memory.append_history(entry)
|
|
||||||
if update := args.get("memory_update"):
|
|
||||||
if not isinstance(update, str):
|
|
||||||
update = json.dumps(update, ensure_ascii=False)
|
|
||||||
if update != current_memory:
|
|
||||||
memory.write_long_term(update)
|
|
||||||
|
|
||||||
if archive_all:
|
|
||||||
session.last_consolidated = 0
|
|
||||||
else:
|
|
||||||
session.last_consolidated = len(session.messages) - keep_count
|
|
||||||
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Memory consolidation failed: {}", e)
|
|
||||||
|
|
||||||
async def process_direct(
|
async def process_direct(
|
||||||
self,
|
self,
|
||||||
@@ -551,26 +388,8 @@ class AgentLoop:
|
|||||||
chat_id: str = "direct",
|
chat_id: str = "direct",
|
||||||
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""Process a message directly (for CLI or cron usage)."""
|
||||||
Process a message directly (for CLI or cron usage).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
content: The message content.
|
|
||||||
session_key: Session identifier (overrides channel:chat_id for session lookup).
|
|
||||||
channel: Source channel (for tool context routing).
|
|
||||||
chat_id: Source chat ID (for tool context routing).
|
|
||||||
on_progress: Optional callback for intermediate output.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The agent's response.
|
|
||||||
"""
|
|
||||||
await self._connect_mcp()
|
await self._connect_mcp()
|
||||||
msg = InboundMessage(
|
msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content)
|
||||||
channel=channel,
|
|
||||||
sender_id="user",
|
|
||||||
chat_id=chat_id,
|
|
||||||
content=content
|
|
||||||
)
|
|
||||||
|
|
||||||
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
|
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
|
||||||
return response.content if response else ""
|
return response.content if response else ""
|
||||||
|
|||||||
@@ -1,9 +1,46 @@
|
|||||||
"""Memory system for persistent agent memory."""
|
"""Memory system for persistent agent memory."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
from nanobot.utils.helpers import ensure_dir
|
from nanobot.utils.helpers import ensure_dir
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from nanobot.providers.base import LLMProvider
|
||||||
|
from nanobot.session.manager import Session
|
||||||
|
|
||||||
|
|
||||||
|
_SAVE_MEMORY_TOOL = [
|
||||||
|
{
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": "save_memory",
|
||||||
|
"description": "Save the memory consolidation result to persistent storage.",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"history_entry": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. "
|
||||||
|
"Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.",
|
||||||
|
},
|
||||||
|
"memory_update": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Full updated long-term memory as markdown. Include all existing "
|
||||||
|
"facts plus new ones. Return unchanged if nothing new.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"required": ["history_entry", "memory_update"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class MemoryStore:
|
class MemoryStore:
|
||||||
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
|
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
|
||||||
@@ -28,3 +65,74 @@ class MemoryStore:
|
|||||||
def get_memory_context(self) -> str:
|
def get_memory_context(self) -> str:
|
||||||
long_term = self.read_long_term()
|
long_term = self.read_long_term()
|
||||||
return f"## Long-term Memory\n{long_term}" if long_term else ""
|
return f"## Long-term Memory\n{long_term}" if long_term else ""
|
||||||
|
|
||||||
|
async def consolidate(
|
||||||
|
self,
|
||||||
|
session: Session,
|
||||||
|
provider: LLMProvider,
|
||||||
|
model: str,
|
||||||
|
*,
|
||||||
|
archive_all: bool = False,
|
||||||
|
memory_window: int = 50,
|
||||||
|
) -> None:
|
||||||
|
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call."""
|
||||||
|
if archive_all:
|
||||||
|
old_messages = session.messages
|
||||||
|
keep_count = 0
|
||||||
|
logger.info("Memory consolidation (archive_all): {} messages", len(session.messages))
|
||||||
|
else:
|
||||||
|
keep_count = memory_window // 2
|
||||||
|
if len(session.messages) <= keep_count:
|
||||||
|
return
|
||||||
|
if len(session.messages) - session.last_consolidated <= 0:
|
||||||
|
return
|
||||||
|
old_messages = session.messages[session.last_consolidated:-keep_count]
|
||||||
|
if not old_messages:
|
||||||
|
return
|
||||||
|
logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count)
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for m in old_messages:
|
||||||
|
if not m.get("content"):
|
||||||
|
continue
|
||||||
|
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
|
||||||
|
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
|
||||||
|
|
||||||
|
current_memory = self.read_long_term()
|
||||||
|
prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
|
||||||
|
|
||||||
|
## Current Long-term Memory
|
||||||
|
{current_memory or "(empty)"}
|
||||||
|
|
||||||
|
## Conversation to Process
|
||||||
|
{chr(10).join(lines)}"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = await provider.chat(
|
||||||
|
messages=[
|
||||||
|
{"role": "system", "content": "You are a memory consolidation agent. Call the save_memory tool with your consolidation of the conversation."},
|
||||||
|
{"role": "user", "content": prompt},
|
||||||
|
],
|
||||||
|
tools=_SAVE_MEMORY_TOOL,
|
||||||
|
model=model,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not response.has_tool_calls:
|
||||||
|
logger.warning("Memory consolidation: LLM did not call save_memory, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
args = response.tool_calls[0].arguments
|
||||||
|
if entry := args.get("history_entry"):
|
||||||
|
if not isinstance(entry, str):
|
||||||
|
entry = json.dumps(entry, ensure_ascii=False)
|
||||||
|
self.append_history(entry)
|
||||||
|
if update := args.get("memory_update"):
|
||||||
|
if not isinstance(update, str):
|
||||||
|
update = json.dumps(update, ensure_ascii=False)
|
||||||
|
if update != current_memory:
|
||||||
|
self.write_long_term(update)
|
||||||
|
|
||||||
|
session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count
|
||||||
|
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Memory consolidation failed: {}", e)
|
||||||
|
|||||||
Reference in New Issue
Block a user