Merge remote main to fix branch

This commit is contained in:
ethanclaw
2026-03-11 13:12:26 +08:00
17 changed files with 813 additions and 428 deletions

View File

@@ -10,7 +10,7 @@ from typing import Any
from nanobot.agent.memory import MemoryStore from nanobot.agent.memory import MemoryStore
from nanobot.agent.skills import SkillsLoader from nanobot.agent.skills import SkillsLoader
from nanobot.utils.helpers import detect_image_mime from nanobot.utils.helpers import build_assistant_message, detect_image_mime
class ContextBuilder: class ContextBuilder:
@@ -182,12 +182,10 @@ Reply directly with text for conversations. Only use the 'message' tool to send
thinking_blocks: list[dict] | None = None, thinking_blocks: list[dict] | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Add an assistant message to the message list.""" """Add an assistant message to the message list."""
msg: dict[str, Any] = {"role": "assistant", "content": content} messages.append(build_assistant_message(
if tool_calls: content,
msg["tool_calls"] = tool_calls tool_calls=tool_calls,
if reasoning_content is not None: reasoning_content=reasoning_content,
msg["reasoning_content"] = reasoning_content thinking_blocks=thinking_blocks,
if thinking_blocks: ))
msg["thinking_blocks"] = thinking_blocks
messages.append(msg)
return messages return messages

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import re import re
import weakref
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable from typing import TYPE_CHECKING, Any, Awaitable, Callable
@@ -13,7 +12,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger from loguru import logger
from nanobot.agent.context import ContextBuilder from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore from nanobot.agent.memory import MemoryConsolidator
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
@@ -55,8 +54,8 @@ class AgentLoop:
max_iterations: int = 40, max_iterations: int = 40,
temperature: float = 0.1, temperature: float = 0.1,
max_tokens: int = 4096, max_tokens: int = 4096,
memory_window: int = 100,
reasoning_effort: str | None = None, reasoning_effort: str | None = None,
context_window_tokens: int = 65_536,
brave_api_key: str | None = None, brave_api_key: str | None = None,
web_proxy: str | None = None, web_proxy: str | None = None,
exec_config: ExecToolConfig | None = None, exec_config: ExecToolConfig | None = None,
@@ -75,8 +74,8 @@ class AgentLoop:
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.temperature = temperature self.temperature = temperature
self.max_tokens = max_tokens self.max_tokens = max_tokens
self.memory_window = memory_window
self.reasoning_effort = reasoning_effort self.reasoning_effort = reasoning_effort
self.context_window_tokens = context_window_tokens
self.brave_api_key = brave_api_key self.brave_api_key = brave_api_key
self.web_proxy = web_proxy self.web_proxy = web_proxy
self.exec_config = exec_config or ExecToolConfig() self.exec_config = exec_config or ExecToolConfig()
@@ -105,11 +104,17 @@ class AgentLoop:
self._mcp_stack: AsyncExitStack | None = None self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False self._mcp_connected = False
self._mcp_connecting = False self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._processing_lock = asyncio.Lock() self._processing_lock = asyncio.Lock()
self.memory_consolidator = MemoryConsolidator(
workspace=workspace,
provider=provider,
model=self.model,
sessions=self.sessions,
context_window_tokens=context_window_tokens,
build_messages=self.context.build_messages,
get_tool_definitions=self.tools.get_definitions,
)
self._register_default_tools() self._register_default_tools()
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
@@ -182,7 +187,7 @@ class AgentLoop:
initial_messages: list[dict], initial_messages: list[dict],
on_progress: Callable[..., Awaitable[None]] | None = None, on_progress: Callable[..., Awaitable[None]] | None = None,
) -> tuple[str | None, list[str], list[dict]]: ) -> tuple[str | None, list[str], list[dict]]:
"""Run the agent iteration loop. Returns (final_content, tools_used, messages).""" """Run the agent iteration loop."""
messages = initial_messages messages = initial_messages
iteration = 0 iteration = 0
final_content = None final_content = None
@@ -191,9 +196,11 @@ class AgentLoop:
while iteration < self.max_iterations: while iteration < self.max_iterations:
iteration += 1 iteration += 1
tool_defs = self.tools.get_definitions()
response = await self.provider.chat_with_retry( response = await self.provider.chat_with_retry(
messages=messages, messages=messages,
tools=self.tools.get_definitions(), tools=tool_defs,
model=self.model, model=self.model,
temperature=self.temperature, temperature=self.temperature,
max_tokens=self.max_tokens, max_tokens=self.max_tokens,
@@ -341,8 +348,9 @@ class AgentLoop:
logger.info("Processing system message from {}", msg.sender_id) logger.info("Processing system message from {}", msg.sender_id)
key = f"{channel}:{chat_id}" key = f"{channel}:{chat_id}"
session = self.sessions.get_or_create(key) session = self.sessions.get_or_create(key)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
history = session.get_history(max_messages=self.memory_window) history = session.get_history(max_messages=0)
messages = self.context.build_messages( messages = self.context.build_messages(
history=history, history=history,
current_message=msg.content, channel=channel, chat_id=chat_id, current_message=msg.content, channel=channel, chat_id=chat_id,
@@ -350,6 +358,7 @@ class AgentLoop:
final_content, _, all_msgs = await self._run_agent_loop(messages) final_content, _, all_msgs = await self._run_agent_loop(messages)
self._save_turn(session, all_msgs, 1 + len(history)) self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session) self.sessions.save(session)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
return OutboundMessage(channel=channel, chat_id=chat_id, return OutboundMessage(channel=channel, chat_id=chat_id,
content=final_content or "Background task completed.") content=final_content or "Background task completed.")
@@ -362,27 +371,20 @@ class AgentLoop:
# Slash commands # Slash commands
cmd = msg.content.strip().lower() cmd = msg.content.strip().lower()
if cmd == "/new": if cmd == "/new":
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
self._consolidating.add(session.key)
try: try:
async with lock: if not await self.memory_consolidator.archive_unconsolidated(session):
snapshot = session.messages[session.last_consolidated:] return OutboundMessage(
if snapshot: channel=msg.channel,
temp = Session(key=session.key) chat_id=msg.chat_id,
temp.messages = list(snapshot) content="Memory archival failed, session not cleared. Please try again.",
if not await self._consolidate_memory(temp, archive_all=True): )
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
except Exception: except Exception:
logger.exception("/new archival failed for {}", session.key) logger.exception("/new archival failed for {}", session.key)
return OutboundMessage( return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, channel=msg.channel,
chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.", content="Memory archival failed, session not cleared. Please try again.",
) )
finally:
self._consolidating.discard(session.key)
session.clear() session.clear()
self.sessions.save(session) self.sessions.save(session)
@@ -393,30 +395,14 @@ class AgentLoop:
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands") content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")
unconsolidated = len(session.messages) - session.last_consolidated await self.memory_consolidator.maybe_consolidate_by_tokens(session)
if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
self._consolidating.add(session.key)
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
async def _consolidate_and_unlock():
try:
async with lock:
await self._consolidate_memory(session)
finally:
self._consolidating.discard(session.key)
_task = asyncio.current_task()
if _task is not None:
self._consolidation_tasks.discard(_task)
_task = asyncio.create_task(_consolidate_and_unlock())
self._consolidation_tasks.add(_task)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"): if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool): if isinstance(message_tool, MessageTool):
message_tool.start_turn() message_tool.start_turn()
history = session.get_history(max_messages=self.memory_window) history = session.get_history(max_messages=0)
initial_messages = self.context.build_messages( initial_messages = self.context.build_messages(
history=history, history=history,
current_message=msg.content, current_message=msg.content,
@@ -441,6 +427,7 @@ class AgentLoop:
self._save_turn(session, all_msgs, 1 + len(history)) self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session) self.sessions.save(session)
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
return None return None
@@ -487,13 +474,6 @@ class AgentLoop:
session.messages.append(entry) session.messages.append(entry)
session.updated_at = datetime.now() session.updated_at = datetime.now()
async def _consolidate_memory(self, session, archive_all: bool = False) -> bool:
"""Delegate to MemoryStore.consolidate(). Returns True on success."""
return await MemoryStore(self.workspace).consolidate(
session, self.provider, self.model,
archive_all=archive_all, memory_window=self.memory_window,
)
async def process_direct( async def process_direct(
self, self,
content: str, content: str,

View File

@@ -2,17 +2,19 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import weakref
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any, Callable
from loguru import logger from loguru import logger
from nanobot.utils.helpers import ensure_dir from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain
if TYPE_CHECKING: if TYPE_CHECKING:
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session from nanobot.session.manager import Session, SessionManager
_SAVE_MEMORY_TOOL = [ _SAVE_MEMORY_TOOL = [
@@ -26,7 +28,7 @@ _SAVE_MEMORY_TOOL = [
"properties": { "properties": {
"history_entry": { "history_entry": {
"type": "string", "type": "string",
"description": "A paragraph (2-5 sentences) summarizing key events/decisions/topics. " "description": "A paragraph summarizing key events/decisions/topics. "
"Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.", "Start with [YYYY-MM-DD HH:MM]. Include detail useful for grep search.",
}, },
"memory_update": { "memory_update": {
@@ -42,6 +44,20 @@ _SAVE_MEMORY_TOOL = [
] ]
def _ensure_text(value: Any) -> str:
"""Normalize tool-call payload values to text for file storage."""
return value if isinstance(value, str) else json.dumps(value, ensure_ascii=False)
def _normalize_save_memory_args(args: Any) -> dict[str, Any] | None:
"""Normalize provider tool-call arguments to the expected dict shape."""
if isinstance(args, str):
args = json.loads(args)
if isinstance(args, list):
return args[0] if args and isinstance(args[0], dict) else None
return args if isinstance(args, dict) else None
class MemoryStore: class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
@@ -66,40 +82,27 @@ class MemoryStore:
long_term = self.read_long_term() long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else "" return f"## Long-term Memory\n{long_term}" if long_term else ""
@staticmethod
def _format_messages(messages: list[dict]) -> str:
lines = []
for message in messages:
if not message.get("content"):
continue
tools = f" [tools: {', '.join(message['tools_used'])}]" if message.get("tools_used") else ""
lines.append(
f"[{message.get('timestamp', '?')[:16]}] {message['role'].upper()}{tools}: {message['content']}"
)
return "\n".join(lines)
async def consolidate( async def consolidate(
self, self,
session: Session, messages: list[dict],
provider: LLMProvider, provider: LLMProvider,
model: str, model: str,
*,
archive_all: bool = False,
memory_window: int = 50,
) -> bool: ) -> bool:
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call. """Consolidate the provided message chunk into MEMORY.md + HISTORY.md."""
if not messages:
Returns True on success (including no-op), False on failure. return True
"""
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info("Memory consolidation (archive_all): {} messages", len(session.messages))
else:
keep_count = memory_window // 2
if len(session.messages) <= keep_count:
return True
if len(session.messages) - session.last_consolidated <= 0:
return True
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return True
logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count)
lines = []
for m in old_messages:
if not m.get("content"):
continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
current_memory = self.read_long_term() current_memory = self.read_long_term()
prompt = f"""Process this conversation and call the save_memory tool with your consolidation. prompt = f"""Process this conversation and call the save_memory tool with your consolidation.
@@ -108,7 +111,7 @@ class MemoryStore:
{current_memory or "(empty)"} {current_memory or "(empty)"}
## Conversation to Process ## Conversation to Process
{chr(10).join(lines)}""" {self._format_messages(messages)}"""
try: try:
response = await provider.chat_with_retry( response = await provider.chat_with_retry(
@@ -124,34 +127,158 @@ class MemoryStore:
logger.warning("Memory consolidation: LLM did not call save_memory, skipping") logger.warning("Memory consolidation: LLM did not call save_memory, skipping")
return False return False
args = response.tool_calls[0].arguments args = _normalize_save_memory_args(response.tool_calls[0].arguments)
# Some providers return arguments as a JSON string instead of dict if args is None:
if isinstance(args, str): logger.warning("Memory consolidation: unexpected save_memory arguments")
args = json.loads(args)
# Some providers return arguments as a list (handle edge case)
if isinstance(args, list):
if args and isinstance(args[0], dict):
args = args[0]
else:
logger.warning("Memory consolidation: unexpected arguments as empty or non-dict list")
return False
if not isinstance(args, dict):
logger.warning("Memory consolidation: unexpected arguments type {}", type(args).__name__)
return False return False
if entry := args.get("history_entry"): if entry := args.get("history_entry"):
if not isinstance(entry, str): self.append_history(_ensure_text(entry))
entry = json.dumps(entry, ensure_ascii=False)
self.append_history(entry)
if update := args.get("memory_update"): if update := args.get("memory_update"):
if not isinstance(update, str): update = _ensure_text(update)
update = json.dumps(update, ensure_ascii=False)
if update != current_memory: if update != current_memory:
self.write_long_term(update) self.write_long_term(update)
session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count logger.info("Memory consolidation done for {} messages", len(messages))
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
return True return True
except Exception: except Exception:
logger.exception("Memory consolidation failed") logger.exception("Memory consolidation failed")
return False return False
class MemoryConsolidator:
"""Owns consolidation policy, locking, and session offset updates."""
_MAX_CONSOLIDATION_ROUNDS = 5
def __init__(
self,
workspace: Path,
provider: LLMProvider,
model: str,
sessions: SessionManager,
context_window_tokens: int,
build_messages: Callable[..., list[dict[str, Any]]],
get_tool_definitions: Callable[[], list[dict[str, Any]]],
):
self.store = MemoryStore(workspace)
self.provider = provider
self.model = model
self.sessions = sessions
self.context_window_tokens = context_window_tokens
self._build_messages = build_messages
self._get_tool_definitions = get_tool_definitions
self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
async def consolidate_messages(self, messages: list[dict[str, object]]) -> bool:
"""Archive a selected message chunk into persistent memory."""
return await self.store.consolidate(messages, self.provider, self.model)
def pick_consolidation_boundary(
self,
session: Session,
tokens_to_remove: int,
) -> tuple[int, int] | None:
"""Pick a user-turn boundary that removes enough old prompt tokens."""
start = session.last_consolidated
if start >= len(session.messages) or tokens_to_remove <= 0:
return None
removed_tokens = 0
last_boundary: tuple[int, int] | None = None
for idx in range(start, len(session.messages)):
message = session.messages[idx]
if idx > start and message.get("role") == "user":
last_boundary = (idx, removed_tokens)
if removed_tokens >= tokens_to_remove:
return last_boundary
removed_tokens += estimate_message_tokens(message)
return last_boundary
def estimate_session_prompt_tokens(self, session: Session) -> tuple[int, str]:
"""Estimate current prompt size for the normal session history view."""
history = session.get_history(max_messages=0)
channel, chat_id = (session.key.split(":", 1) if ":" in session.key else (None, None))
probe_messages = self._build_messages(
history=history,
current_message="[token-probe]",
channel=channel,
chat_id=chat_id,
)
return estimate_prompt_tokens_chain(
self.provider,
self.model,
probe_messages,
self._get_tool_definitions(),
)
async def archive_unconsolidated(self, session: Session) -> bool:
"""Archive the full unconsolidated tail for /new-style session rollover."""
lock = self.get_lock(session.key)
async with lock:
snapshot = session.messages[session.last_consolidated:]
if not snapshot:
return True
return await self.consolidate_messages(snapshot)
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
"""Loop: archive old messages until prompt fits within half the context window."""
if not session.messages or self.context_window_tokens <= 0:
return
lock = self.get_lock(session.key)
async with lock:
target = self.context_window_tokens // 2
estimated, source = self.estimate_session_prompt_tokens(session)
if estimated <= 0:
return
if estimated < self.context_window_tokens:
logger.debug(
"Token consolidation idle {}: {}/{} via {}",
session.key,
estimated,
self.context_window_tokens,
source,
)
return
for round_num in range(self._MAX_CONSOLIDATION_ROUNDS):
if estimated <= target:
return
boundary = self.pick_consolidation_boundary(session, max(1, estimated - target))
if boundary is None:
logger.debug(
"Token consolidation: no safe boundary for {} (round {})",
session.key,
round_num,
)
return
end_idx = boundary[0]
chunk = session.messages[session.last_consolidated:end_idx]
if not chunk:
return
logger.info(
"Token consolidation round {} for {}: {}/{} via {}, chunk={} msgs",
round_num,
session.key,
estimated,
self.context_window_tokens,
source,
len(chunk),
)
if not await self.consolidate_messages(chunk):
return
session.last_consolidated = end_idx
self.sessions.save(session)
estimated, source = self.estimate_session_prompt_tokens(session)
if estimated <= 0:
return

View File

@@ -16,6 +16,7 @@ from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.providers.base import LLMProvider from nanobot.providers.base import LLMProvider
from nanobot.utils.helpers import build_assistant_message
class SubagentManager: class SubagentManager:
@@ -133,7 +134,6 @@ class SubagentManager:
) )
if response.has_tool_calls: if response.has_tool_calls:
# Add assistant message with tool calls
tool_call_dicts = [ tool_call_dicts = [
{ {
"id": tc.id, "id": tc.id,
@@ -145,13 +145,12 @@ class SubagentManager:
} }
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages.append({ messages.append(build_assistant_message(
"role": "assistant", response.content or "",
"content": response.content or "", tool_calls=tool_call_dicts,
"tool_calls": tool_call_dicts, reasoning_content=response.reasoning_content,
"reasoning_content": response.reasoning_content, thinking_blocks=response.thinking_blocks,
"thinking_blocks": response.thinking_blocks, ))
})
# Execute tools # Execute tools
for tool_call in response.tool_calls: for tool_call in response.tool_calls:

View File

@@ -57,6 +57,8 @@ class NanobotDingTalkHandler(CallbackHandler):
content = "" content = ""
if chatbot_msg.text: if chatbot_msg.text:
content = chatbot_msg.text.content.strip() content = chatbot_msg.text.content.strip()
elif chatbot_msg.extensions.get("content", {}).get("recognition"):
content = chatbot_msg.extensions["content"]["recognition"].strip()
if not content: if not content:
content = message.data.get("text", {}).get("content", "").strip() content = message.data.get("text", {}).get("content", "").strip()

View File

@@ -191,6 +191,8 @@ def onboard():
save_config(Config()) save_config(Config())
console.print(f"[green]✓[/green] Created config at {config_path}") console.print(f"[green]✓[/green] Created config at {config_path}")
console.print("[dim]Config template now uses `maxTokens` + `contextWindowTokens`; `memoryWindow` is no longer a runtime setting.[/dim]")
# Create workspace # Create workspace
workspace = get_workspace_path() workspace = get_workspace_path()
@@ -283,6 +285,16 @@ def _load_runtime_config(config: str | None = None, workspace: str | None = None
return loaded return loaded
def _print_deprecated_memory_window_notice(config: Config) -> None:
"""Warn when running with old memoryWindow-only config."""
if config.agents.defaults.should_warn_deprecated_memory_window:
console.print(
"[yellow]Hint:[/yellow] Detected deprecated `memoryWindow` without "
"`contextWindowTokens`. `memoryWindow` is ignored; run "
"[cyan]nanobot onboard[/cyan] to refresh your config template."
)
# ============================================================================ # ============================================================================
# Gateway / Server # Gateway / Server
# ============================================================================ # ============================================================================
@@ -310,6 +322,7 @@ def gateway(
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
config = _load_runtime_config(config, workspace) config = _load_runtime_config(config, workspace)
_print_deprecated_memory_window_notice(config)
port = port if port is not None else config.gateway.port port = port if port is not None else config.gateway.port
console.print(f"{__logo__} Starting nanobot gateway on port {port}...") console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
@@ -331,8 +344,8 @@ def gateway(
temperature=config.agents.defaults.temperature, temperature=config.agents.defaults.temperature,
max_tokens=config.agents.defaults.max_tokens, max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations, max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
reasoning_effort=config.agents.defaults.reasoning_effort, reasoning_effort=config.agents.defaults.reasoning_effort,
context_window_tokens=config.agents.defaults.context_window_tokens,
brave_api_key=config.tools.web.search.api_key or None, brave_api_key=config.tools.web.search.api_key or None,
web_proxy=config.tools.web.proxy or None, web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec, exec_config=config.tools.exec,
@@ -494,6 +507,7 @@ def agent(
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
config = _load_runtime_config(config, workspace) config = _load_runtime_config(config, workspace)
_print_deprecated_memory_window_notice(config)
sync_workspace_templates(config.workspace_path) sync_workspace_templates(config.workspace_path)
bus = MessageBus() bus = MessageBus()
@@ -516,8 +530,8 @@ def agent(
temperature=config.agents.defaults.temperature, temperature=config.agents.defaults.temperature,
max_tokens=config.agents.defaults.max_tokens, max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations, max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
reasoning_effort=config.agents.defaults.reasoning_effort, reasoning_effort=config.agents.defaults.reasoning_effort,
context_window_tokens=config.agents.defaults.context_window_tokens,
brave_api_key=config.tools.web.search.api_key or None, brave_api_key=config.tools.web.search.api_key or None,
web_proxy=config.tools.web.proxy or None, web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec, exec_config=config.tools.exec,

View File

@@ -228,11 +228,18 @@ class AgentDefaults(Base):
"auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection
) )
max_tokens: int = 8192 max_tokens: int = 8192
context_window_tokens: int = 65_536
temperature: float = 0.1 temperature: float = 0.1
max_tool_iterations: int = 40 max_tool_iterations: int = 40
memory_window: int = 100 # Deprecated compatibility field: accepted from old configs but ignored at runtime.
memory_window: int | None = Field(default=None, exclude=True)
reasoning_effort: str | None = None # low / medium / high — enables LLM thinking mode reasoning_effort: str | None = None # low / medium / high — enables LLM thinking mode
@property
def should_warn_deprecated_memory_window(self) -> bool:
"""Return True when old memoryWindow is present without contextWindowTokens."""
return self.memory_window is not None and "context_window_tokens" not in self.model_fields_set
class AgentsConfig(Base): class AgentsConfig(Base):
"""Agent configuration.""" """Agent configuration."""

View File

@@ -1,8 +1,12 @@
"""Utility functions for nanobot.""" """Utility functions for nanobot."""
import json
import re import re
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any
import tiktoken
def detect_image_mime(data: bytes) -> str | None: def detect_image_mime(data: bytes) -> str | None:
@@ -68,6 +72,104 @@ def split_message(content: str, max_len: int = 2000) -> list[str]:
return chunks return chunks
def build_assistant_message(
content: str | None,
tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
thinking_blocks: list[dict] | None = None,
) -> dict[str, Any]:
"""Build a provider-safe assistant message with optional reasoning fields."""
msg: dict[str, Any] = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content
if thinking_blocks:
msg["thinking_blocks"] = thinking_blocks
return msg
def estimate_prompt_tokens(
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
) -> int:
"""Estimate prompt tokens with tiktoken."""
try:
enc = tiktoken.get_encoding("cl100k_base")
parts: list[str] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
txt = part.get("text", "")
if txt:
parts.append(txt)
if tools:
parts.append(json.dumps(tools, ensure_ascii=False))
return len(enc.encode("\n".join(parts)))
except Exception:
return 0
def estimate_message_tokens(message: dict[str, Any]) -> int:
"""Estimate prompt tokens contributed by one persisted message."""
content = message.get("content")
parts: list[str] = []
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text", "")
if text:
parts.append(text)
else:
parts.append(json.dumps(part, ensure_ascii=False))
elif content is not None:
parts.append(json.dumps(content, ensure_ascii=False))
for key in ("name", "tool_call_id"):
value = message.get(key)
if isinstance(value, str) and value:
parts.append(value)
if message.get("tool_calls"):
parts.append(json.dumps(message["tool_calls"], ensure_ascii=False))
payload = "\n".join(parts)
if not payload:
return 1
try:
enc = tiktoken.get_encoding("cl100k_base")
return max(1, len(enc.encode(payload)))
except Exception:
return max(1, len(payload) // 4)
def estimate_prompt_tokens_chain(
provider: Any,
model: str | None,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
) -> tuple[int, str]:
"""Estimate prompt tokens via provider counter first, then tiktoken fallback."""
provider_counter = getattr(provider, "estimate_prompt_tokens", None)
if callable(provider_counter):
try:
tokens, source = provider_counter(messages, tools, model)
if isinstance(tokens, (int, float)) and tokens > 0:
return int(tokens), str(source or "provider_counter")
except Exception:
pass
estimated = estimate_prompt_tokens(messages, tools)
if estimated > 0:
return int(estimated), "tiktoken"
return 0, "none"
def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]: def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]:
"""Sync bundled templates to workspace. Only creates missing files.""" """Sync bundled templates to workspace. Only creates missing files."""
from importlib.resources import files as pkg_files from importlib.resources import files as pkg_files
@@ -88,7 +190,7 @@ def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]
added.append(str(dest.relative_to(workspace))) added.append(str(dest.relative_to(workspace)))
for item in tpl.iterdir(): for item in tpl.iterdir():
if item.name.endswith(".md"): if item.name.endswith(".md") and not item.name.startswith("."):
_write(item, workspace / item.name) _write(item, workspace / item.name)
_write(tpl / "memory" / "MEMORY.md", workspace / "memory" / "MEMORY.md") _write(tpl / "memory" / "MEMORY.md", workspace / "memory" / "MEMORY.md")
_write(None, workspace / "memory" / "HISTORY.md") _write(None, workspace / "memory" / "HISTORY.md")

View File

@@ -18,7 +18,7 @@ classifiers = [
dependencies = [ dependencies = [
"typer>=0.20.0,<1.0.0", "typer>=0.20.0,<1.0.0",
"litellm>=1.81.5,<2.0.0", "litellm>=1.82.1,<2.0.0",
"pydantic>=2.12.0,<3.0.0", "pydantic>=2.12.0,<3.0.0",
"pydantic-settings>=2.12.0,<3.0.0", "pydantic-settings>=2.12.0,<3.0.0",
"websockets>=16.0,<17.0", "websockets>=16.0,<17.0",
@@ -44,6 +44,7 @@ dependencies = [
"json-repair>=0.57.0,<1.0.0", "json-repair>=0.57.0,<1.0.0",
"chardet>=3.0.2,<6.0.0", "chardet>=3.0.2,<6.0.0",
"openai>=2.8.0", "openai>=2.8.0",
"tiktoken>=0.12.0,<1.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -267,6 +267,16 @@ def test_agent_workspace_override_wins_over_config_workspace(mock_agent_runtime,
assert mock_agent_runtime["agent_loop_cls"].call_args.kwargs["workspace"] == workspace_path assert mock_agent_runtime["agent_loop_cls"].call_args.kwargs["workspace"] == workspace_path
def test_agent_warns_about_deprecated_memory_window(mock_agent_runtime):
mock_agent_runtime["config"].agents.defaults.memory_window = 100
result = runner.invoke(app, ["agent", "-m", "hello"])
assert result.exit_code == 0
assert "memoryWindow" in result.stdout
assert "contextWindowTokens" in result.stdout
def test_gateway_uses_workspace_from_config_by_default(monkeypatch, tmp_path: Path) -> None: def test_gateway_uses_workspace_from_config_by_default(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json" config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True) config_file.parent.mkdir(parents=True)
@@ -327,6 +337,29 @@ def test_gateway_workspace_option_overrides_config(monkeypatch, tmp_path: Path)
assert seen["workspace"] == override assert seen["workspace"] == override
assert config.workspace_path == override assert config.workspace_path == override
def test_gateway_warns_about_deprecated_memory_window(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
config = Config()
config.agents.defaults.memory_window = 100
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr(
"nanobot.cli.commands._make_provider",
lambda _config: (_ for _ in ()).throw(_StopGateway("stop")),
)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGateway)
assert "memoryWindow" in result.stdout
assert "contextWindowTokens" in result.stdout
def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None: def test_gateway_uses_config_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None:
config_file = tmp_path / "instance" / "config.json" config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True) config_file.parent.mkdir(parents=True)

View File

@@ -0,0 +1,88 @@
import json
from typer.testing import CliRunner
from nanobot.cli.commands import app
from nanobot.config.loader import load_config, save_config
runner = CliRunner()
def test_load_config_keeps_max_tokens_and_warns_on_legacy_memory_window(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text(
json.dumps(
{
"agents": {
"defaults": {
"maxTokens": 1234,
"memoryWindow": 42,
}
}
}
),
encoding="utf-8",
)
config = load_config(config_path)
assert config.agents.defaults.max_tokens == 1234
assert config.agents.defaults.context_window_tokens == 65_536
assert config.agents.defaults.should_warn_deprecated_memory_window is True
def test_save_config_writes_context_window_tokens_but_not_memory_window(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text(
json.dumps(
{
"agents": {
"defaults": {
"maxTokens": 2222,
"memoryWindow": 30,
}
}
}
),
encoding="utf-8",
)
config = load_config(config_path)
save_config(config, config_path)
saved = json.loads(config_path.read_text(encoding="utf-8"))
defaults = saved["agents"]["defaults"]
assert defaults["maxTokens"] == 2222
assert defaults["contextWindowTokens"] == 65_536
assert "memoryWindow" not in defaults
def test_onboard_refresh_rewrites_legacy_config_template(tmp_path, monkeypatch) -> None:
config_path = tmp_path / "config.json"
workspace = tmp_path / "workspace"
config_path.write_text(
json.dumps(
{
"agents": {
"defaults": {
"maxTokens": 3333,
"memoryWindow": 50,
}
}
}
),
encoding="utf-8",
)
monkeypatch.setattr("nanobot.config.loader.get_config_path", lambda: config_path)
monkeypatch.setattr("nanobot.cli.commands.get_workspace_path", lambda: workspace)
result = runner.invoke(app, ["onboard"], input="n\n")
assert result.exit_code == 0
assert "contextWindowTokens" in result.stdout
saved = json.loads(config_path.read_text(encoding="utf-8"))
defaults = saved["agents"]["defaults"]
assert defaults["maxTokens"] == 3333
assert defaults["contextWindowTokens"] == 65_536
assert "memoryWindow" not in defaults

View File

@@ -480,226 +480,35 @@ class TestEmptyAndBoundarySessions:
assert_messages_content(old_messages, 10, 34) assert_messages_content(old_messages, 10, 34)
class TestConsolidationDeduplicationGuard: class TestNewCommandArchival:
"""Test that consolidation tasks are deduplicated and serialized.""" """Test /new archival behavior with the simplified consolidation flow."""
@pytest.mark.asyncio @staticmethod
async def test_consolidation_guard_prevents_duplicate_tasks(self, tmp_path: Path) -> None: def _make_loop(tmp_path: Path):
"""Concurrent messages above memory_window spawn only one consolidation task."""
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse from nanobot.providers.base import LLMResponse
bus = MessageBus() bus = MessageBus()
provider = MagicMock() provider = MagicMock()
provider.get_default_model.return_value = "test-model" provider.get_default_model.return_value = "test-model"
provider.estimate_prompt_tokens.return_value = (10_000, "test")
loop = AgentLoop( loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 bus=bus,
provider=provider,
workspace=tmp_path,
model="test-model",
context_window_tokens=1,
) )
loop.provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.get_definitions = MagicMock(return_value=[])
return loop
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
consolidation_calls = 0
async def _fake_consolidate(_session, archive_all: bool = False) -> None:
nonlocal consolidation_calls
consolidation_calls += 1
await asyncio.sleep(0.05)
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await loop._process_message(msg)
await asyncio.sleep(0.1)
assert consolidation_calls == 1, (
f"Expected exactly 1 consolidation, got {consolidation_calls}"
)
@pytest.mark.asyncio
async def test_new_command_guard_prevents_concurrent_consolidation(
self, tmp_path: Path
) -> None:
"""/new command does not run consolidation concurrently with in-flight consolidation."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
consolidation_calls = 0
active = 0
max_active = 0
async def _fake_consolidate(_session, archive_all: bool = False) -> None:
nonlocal consolidation_calls, active, max_active
consolidation_calls += 1
active += 1
max_active = max(max_active, active)
await asyncio.sleep(0.05)
active -= 1
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
await loop._process_message(new_msg)
await asyncio.sleep(0.1)
assert consolidation_calls == 2, (
f"Expected normal + /new consolidations, got {consolidation_calls}"
)
assert max_active == 1, (
f"Expected serialized consolidation, observed concurrency={max_active}"
)
@pytest.mark.asyncio
async def test_consolidation_tasks_are_referenced(self, tmp_path: Path) -> None:
"""create_task results are tracked in _consolidation_tasks while in flight."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
started = asyncio.Event()
async def _slow_consolidate(_session, archive_all: bool = False) -> None:
started.set()
await asyncio.sleep(0.1)
loop._consolidate_memory = _slow_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
assert len(loop._consolidation_tasks) == 1, "Task must be referenced while in-flight"
await asyncio.sleep(0.15)
assert len(loop._consolidation_tasks) == 0, (
"Task reference must be removed after completion"
)
@pytest.mark.asyncio
async def test_new_waits_for_inflight_consolidation_and_preserves_messages(
self, tmp_path: Path
) -> None:
"""/new waits for in-flight consolidation and archives before clear."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
started = asyncio.Event()
release = asyncio.Event()
archived_count = 0
async def _fake_consolidate(sess, archive_all: bool = False) -> bool:
nonlocal archived_count
if archive_all:
archived_count = len(sess.messages)
return True
started.set()
await release.wait()
return True
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
pending_new = asyncio.create_task(loop._process_message(new_msg))
await asyncio.sleep(0.02)
assert not pending_new.done(), "/new should wait while consolidation is in-flight"
release.set()
response = await pending_new
assert response is not None
assert "new session started" in response.content.lower()
assert archived_count > 0, "Expected /new archival to process a non-empty snapshot"
session_after = loop.sessions.get_or_create("cli:test")
assert session_after.messages == [], "Session should be cleared after successful archival"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None:
"""/new must keep session data if archive step reports failure."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
loop = self._make_loop(tmp_path)
session = loop.sessions.get_or_create("cli:test") session = loop.sessions.get_or_create("cli:test")
for i in range(5): for i in range(5):
session.add_message("user", f"msg{i}") session.add_message("user", f"msg{i}")
@@ -707,111 +516,61 @@ class TestConsolidationDeduplicationGuard:
loop.sessions.save(session) loop.sessions.save(session)
before_count = len(session.messages) before_count = len(session.messages)
async def _failing_consolidate(sess, archive_all: bool = False) -> bool: async def _failing_consolidate(_messages) -> bool:
if archive_all: return False
return False
return True
loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign] loop.memory_consolidator.consolidate_messages = _failing_consolidate # type: ignore[method-assign]
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
response = await loop._process_message(new_msg) response = await loop._process_message(new_msg)
assert response is not None assert response is not None
assert "failed" in response.content.lower() assert "failed" in response.content.lower()
session_after = loop.sessions.get_or_create("cli:test") assert len(loop.sessions.get_or_create("cli:test").messages) == before_count
assert len(session_after.messages) == before_count, (
"Session must remain intact when /new archival fails"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_new_archives_only_unconsolidated_messages_after_inflight_task( async def test_new_archives_only_unconsolidated_messages(self, tmp_path: Path) -> None:
self, tmp_path: Path
) -> None:
"""/new should archive only messages not yet consolidated by prior task."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
loop = self._make_loop(tmp_path)
session = loop.sessions.get_or_create("cli:test") session = loop.sessions.get_or_create("cli:test")
for i in range(15): for i in range(15):
session.add_message("user", f"msg{i}") session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}") session.add_message("assistant", f"resp{i}")
session.last_consolidated = len(session.messages) - 3
loop.sessions.save(session) loop.sessions.save(session)
started = asyncio.Event()
release = asyncio.Event()
archived_count = -1 archived_count = -1
async def _fake_consolidate(sess, archive_all: bool = False) -> bool: async def _fake_consolidate(messages) -> bool:
nonlocal archived_count nonlocal archived_count
if archive_all: archived_count = len(messages)
archived_count = len(sess.messages)
return True
started.set()
await release.wait()
sess.last_consolidated = len(sess.messages) - 3
return True return True
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] loop.memory_consolidator.consolidate_messages = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
pending_new = asyncio.create_task(loop._process_message(new_msg)) response = await loop._process_message(new_msg)
await asyncio.sleep(0.02)
assert not pending_new.done()
release.set()
response = await pending_new
assert response is not None assert response is not None
assert "new session started" in response.content.lower() assert "new session started" in response.content.lower()
assert archived_count == 3, ( assert archived_count == 3
f"Expected only unconsolidated tail to archive, got {archived_count}"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_new_clears_session_and_responds(self, tmp_path: Path) -> None: async def test_new_clears_session_and_responds(self, tmp_path: Path) -> None:
"""/new clears session and returns confirmation."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
loop = self._make_loop(tmp_path)
session = loop.sessions.get_or_create("cli:test") session = loop.sessions.get_or_create("cli:test")
for i in range(3): for i in range(3):
session.add_message("user", f"msg{i}") session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}") session.add_message("assistant", f"resp{i}")
loop.sessions.save(session) loop.sessions.save(session)
async def _ok_consolidate(sess, archive_all: bool = False) -> bool: async def _ok_consolidate(_messages) -> bool:
return True return True
loop._consolidate_memory = _ok_consolidate # type: ignore[method-assign] loop.memory_consolidator.consolidate_messages = _ok_consolidate # type: ignore[method-assign]
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
response = await loop._process_message(new_msg) response = await loop._process_message(new_msg)

View File

@@ -1,9 +1,11 @@
import asyncio
from types import SimpleNamespace from types import SimpleNamespace
import pytest import pytest
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.dingtalk import DingTalkChannel import nanobot.channels.dingtalk as dingtalk_module
from nanobot.channels.dingtalk import DingTalkChannel, NanobotDingTalkHandler
from nanobot.config.schema import DingTalkConfig from nanobot.config.schema import DingTalkConfig
@@ -64,3 +66,46 @@ async def test_group_send_uses_group_messages_api() -> None:
assert call["url"] == "https://api.dingtalk.com/v1.0/robot/groupMessages/send" assert call["url"] == "https://api.dingtalk.com/v1.0/robot/groupMessages/send"
assert call["json"]["openConversationId"] == "conv123" assert call["json"]["openConversationId"] == "conv123"
assert call["json"]["msgKey"] == "sampleMarkdown" assert call["json"]["msgKey"] == "sampleMarkdown"
@pytest.mark.asyncio
async def test_handler_uses_voice_recognition_text_when_text_is_empty(monkeypatch) -> None:
bus = MessageBus()
channel = DingTalkChannel(
DingTalkConfig(client_id="app", client_secret="secret", allow_from=["user1"]),
bus,
)
handler = NanobotDingTalkHandler(channel)
class _FakeChatbotMessage:
text = None
extensions = {"content": {"recognition": "voice transcript"}}
sender_staff_id = "user1"
sender_id = "fallback-user"
sender_nick = "Alice"
message_type = "audio"
@staticmethod
def from_dict(_data):
return _FakeChatbotMessage()
monkeypatch.setattr(dingtalk_module, "ChatbotMessage", _FakeChatbotMessage)
monkeypatch.setattr(dingtalk_module, "AckMessage", SimpleNamespace(STATUS_OK="OK"))
status, body = await handler.process(
SimpleNamespace(
data={
"conversationType": "2",
"conversationId": "conv123",
"text": {"content": ""},
}
)
)
await asyncio.gather(*list(channel._background_tasks))
msg = await bus.consume_inbound()
assert (status, body) == ("OK", "OK")
assert msg.content == "voice transcript"
assert msg.sender_id == "user1"
assert msg.chat_id == "group:conv123"

View File

@@ -0,0 +1,190 @@
from unittest.mock import AsyncMock, MagicMock
import pytest
from nanobot.agent.loop import AgentLoop
import nanobot.agent.memory as memory_module
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
def _make_loop(tmp_path, *, estimated_tokens: int, context_window_tokens: int) -> AgentLoop:
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.estimate_prompt_tokens.return_value = (estimated_tokens, "test-counter")
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop = AgentLoop(
bus=MessageBus(),
provider=provider,
workspace=tmp_path,
model="test-model",
context_window_tokens=context_window_tokens,
)
loop.tools.get_definitions = MagicMock(return_value=[])
return loop
@pytest.mark.asyncio
async def test_prompt_below_threshold_does_not_consolidate(tmp_path) -> None:
loop = _make_loop(tmp_path, estimated_tokens=100, context_window_tokens=200)
loop.memory_consolidator.consolidate_messages = AsyncMock(return_value=True) # type: ignore[method-assign]
await loop.process_direct("hello", session_key="cli:test")
loop.memory_consolidator.consolidate_messages.assert_not_awaited()
@pytest.mark.asyncio
async def test_prompt_above_threshold_triggers_consolidation(tmp_path, monkeypatch) -> None:
loop = _make_loop(tmp_path, estimated_tokens=1000, context_window_tokens=200)
loop.memory_consolidator.consolidate_messages = AsyncMock(return_value=True) # type: ignore[method-assign]
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"},
{"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"},
{"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"},
]
loop.sessions.save(session)
monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _message: 500)
await loop.process_direct("hello", session_key="cli:test")
assert loop.memory_consolidator.consolidate_messages.await_count >= 1
@pytest.mark.asyncio
async def test_prompt_above_threshold_archives_until_next_user_boundary(tmp_path, monkeypatch) -> None:
loop = _make_loop(tmp_path, estimated_tokens=1000, context_window_tokens=200)
loop.memory_consolidator.consolidate_messages = AsyncMock(return_value=True) # type: ignore[method-assign]
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"},
{"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"},
{"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"},
{"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"},
{"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"},
]
loop.sessions.save(session)
token_map = {"u1": 120, "a1": 120, "u2": 120, "a2": 120, "u3": 120}
monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda message: token_map[message["content"]])
await loop.memory_consolidator.maybe_consolidate_by_tokens(session)
archived_chunk = loop.memory_consolidator.consolidate_messages.await_args.args[0]
assert [message["content"] for message in archived_chunk] == ["u1", "a1", "u2", "a2"]
assert session.last_consolidated == 4
@pytest.mark.asyncio
async def test_consolidation_loops_until_target_met(tmp_path, monkeypatch) -> None:
"""Verify maybe_consolidate_by_tokens keeps looping until under threshold."""
loop = _make_loop(tmp_path, estimated_tokens=0, context_window_tokens=200)
loop.memory_consolidator.consolidate_messages = AsyncMock(return_value=True) # type: ignore[method-assign]
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"},
{"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"},
{"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"},
{"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"},
{"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"},
{"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"},
{"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"},
]
loop.sessions.save(session)
call_count = [0]
def mock_estimate(_session):
call_count[0] += 1
if call_count[0] == 1:
return (500, "test")
if call_count[0] == 2:
return (300, "test")
return (80, "test")
loop.memory_consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign]
monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100)
await loop.memory_consolidator.maybe_consolidate_by_tokens(session)
assert loop.memory_consolidator.consolidate_messages.await_count == 2
assert session.last_consolidated == 6
@pytest.mark.asyncio
async def test_consolidation_continues_below_trigger_until_half_target(tmp_path, monkeypatch) -> None:
"""Once triggered, consolidation should continue until it drops below half threshold."""
loop = _make_loop(tmp_path, estimated_tokens=0, context_window_tokens=200)
loop.memory_consolidator.consolidate_messages = AsyncMock(return_value=True) # type: ignore[method-assign]
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"},
{"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"},
{"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"},
{"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"},
{"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"},
{"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"},
{"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"},
]
loop.sessions.save(session)
call_count = [0]
def mock_estimate(_session):
call_count[0] += 1
if call_count[0] == 1:
return (500, "test")
if call_count[0] == 2:
return (150, "test")
return (80, "test")
loop.memory_consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign]
monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100)
await loop.memory_consolidator.maybe_consolidate_by_tokens(session)
assert loop.memory_consolidator.consolidate_messages.await_count == 2
assert session.last_consolidated == 6
@pytest.mark.asyncio
async def test_preflight_consolidation_before_llm_call(tmp_path, monkeypatch) -> None:
"""Verify preflight consolidation runs before the LLM call in process_direct."""
order: list[str] = []
loop = _make_loop(tmp_path, estimated_tokens=0, context_window_tokens=200)
async def track_consolidate(messages):
order.append("consolidate")
return True
loop.memory_consolidator.consolidate_messages = track_consolidate # type: ignore[method-assign]
async def track_llm(*args, **kwargs):
order.append("llm")
return LLMResponse(content="ok", tool_calls=[])
loop.provider.chat_with_retry = track_llm
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"},
{"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"},
{"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"},
]
loop.sessions.save(session)
monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 500)
call_count = [0]
def mock_estimate(_session):
call_count[0] += 1
return (1000 if call_count[0] <= 1 else 80, "test")
loop.memory_consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign]
await loop.process_direct("hello", session_key="cli:test")
assert "consolidate" in order
assert "llm" in order
assert order.index("consolidate") < order.index("llm")

View File

@@ -7,7 +7,7 @@ tool call response, it should serialize them to JSON instead of raising TypeErro
import json import json
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock
import pytest import pytest
@@ -15,15 +15,12 @@ from nanobot.agent.memory import MemoryStore
from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
def _make_session(message_count: int = 30, memory_window: int = 50): def _make_messages(message_count: int = 30):
"""Create a mock session with messages.""" """Create a list of mock messages."""
session = MagicMock() return [
session.messages = [
{"role": "user", "content": f"msg{i}", "timestamp": "2026-01-01 00:00"} {"role": "user", "content": f"msg{i}", "timestamp": "2026-01-01 00:00"}
for i in range(message_count) for i in range(message_count)
] ]
session.last_consolidated = 0
return session
def _make_tool_response(history_entry, memory_update): def _make_tool_response(history_entry, memory_update):
@@ -74,9 +71,9 @@ class TestMemoryConsolidationTypeHandling:
) )
) )
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
assert store.history_file.exists() assert store.history_file.exists()
@@ -95,9 +92,9 @@ class TestMemoryConsolidationTypeHandling:
) )
) )
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
assert store.history_file.exists() assert store.history_file.exists()
@@ -131,9 +128,9 @@ class TestMemoryConsolidationTypeHandling:
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
assert "User discussed testing." in store.history_file.read_text() assert "User discussed testing." in store.history_file.read_text()
@@ -147,22 +144,22 @@ class TestMemoryConsolidationTypeHandling:
return_value=LLMResponse(content="I summarized the conversation.", tool_calls=[]) return_value=LLMResponse(content="I summarized the conversation.", tool_calls=[])
) )
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is False assert result is False
assert not store.history_file.exists() assert not store.history_file.exists()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_skips_when_few_messages(self, tmp_path: Path) -> None: async def test_skips_when_message_chunk_is_empty(self, tmp_path: Path) -> None:
"""Consolidation should be a no-op when messages < keep_count.""" """Consolidation should be a no-op when the selected chunk is empty."""
store = MemoryStore(tmp_path) store = MemoryStore(tmp_path)
provider = AsyncMock() provider = AsyncMock()
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=10) messages: list[dict] = []
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
provider.chat.assert_not_called() provider.chat.assert_not_called()
@@ -189,9 +186,9 @@ class TestMemoryConsolidationTypeHandling:
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
assert "User discussed testing." in store.history_file.read_text() assert "User discussed testing." in store.history_file.read_text()
@@ -215,9 +212,9 @@ class TestMemoryConsolidationTypeHandling:
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is False assert result is False
@@ -239,9 +236,9 @@ class TestMemoryConsolidationTypeHandling:
) )
provider.chat = AsyncMock(return_value=response) provider.chat = AsyncMock(return_value=response)
provider.chat_with_retry = provider.chat provider.chat_with_retry = provider.chat
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is False assert result is False
@@ -255,7 +252,7 @@ class TestMemoryConsolidationTypeHandling:
memory_update="# Memory\nUser likes testing.", memory_update="# Memory\nUser likes testing.",
), ),
]) ])
session = _make_session(message_count=60) messages = _make_messages(message_count=60)
delays: list[int] = [] delays: list[int] = []
async def _fake_sleep(delay: int) -> None: async def _fake_sleep(delay: int) -> None:
@@ -263,7 +260,7 @@ class TestMemoryConsolidationTypeHandling:
monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep) monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep)
result = await store.consolidate(session, provider, "test-model", memory_window=50) result = await store.consolidate(messages, provider, "test-model")
assert result is True assert result is True
assert provider.calls == 2 assert provider.calls == 2

View File

@@ -16,7 +16,7 @@ def _make_loop(tmp_path: Path) -> AgentLoop:
bus = MessageBus() bus = MessageBus()
provider = MagicMock() provider = MagicMock()
provider.get_default_model.return_value = "test-model" provider.get_default_model.return_value = "test-model"
return AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10) return AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="test-model")
class TestMessageToolSuppressLogic: class TestMessageToolSuppressLogic:
@@ -33,7 +33,7 @@ class TestMessageToolSuppressLogic:
LLMResponse(content="", tool_calls=[tool_call]), LLMResponse(content="", tool_calls=[tool_call]),
LLMResponse(content="Done", tool_calls=[]), LLMResponse(content="Done", tool_calls=[]),
]) ])
loop.provider.chat = AsyncMock(side_effect=lambda *a, **kw: next(calls)) loop.provider.chat_with_retry = AsyncMock(side_effect=lambda *a, **kw: next(calls))
loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.get_definitions = MagicMock(return_value=[])
sent: list[OutboundMessage] = [] sent: list[OutboundMessage] = []
@@ -58,7 +58,7 @@ class TestMessageToolSuppressLogic:
LLMResponse(content="", tool_calls=[tool_call]), LLMResponse(content="", tool_calls=[tool_call]),
LLMResponse(content="I've sent the email.", tool_calls=[]), LLMResponse(content="I've sent the email.", tool_calls=[]),
]) ])
loop.provider.chat = AsyncMock(side_effect=lambda *a, **kw: next(calls)) loop.provider.chat_with_retry = AsyncMock(side_effect=lambda *a, **kw: next(calls))
loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.get_definitions = MagicMock(return_value=[])
sent: list[OutboundMessage] = [] sent: list[OutboundMessage] = []
@@ -77,7 +77,7 @@ class TestMessageToolSuppressLogic:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_not_suppress_when_no_message_tool_used(self, tmp_path: Path) -> None: async def test_not_suppress_when_no_message_tool_used(self, tmp_path: Path) -> None:
loop = _make_loop(tmp_path) loop = _make_loop(tmp_path)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="Hello!", tool_calls=[])) loop.provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="Hello!", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.get_definitions = MagicMock(return_value=[])
msg = InboundMessage(channel="feishu", sender_id="user1", chat_id="chat123", content="Hi") msg = InboundMessage(channel="feishu", sender_id="user1", chat_id="chat123", content="Hi")
@@ -98,7 +98,7 @@ class TestMessageToolSuppressLogic:
), ),
LLMResponse(content="Done", tool_calls=[]), LLMResponse(content="Done", tool_calls=[]),
]) ])
loop.provider.chat = AsyncMock(side_effect=lambda *a, **kw: next(calls)) loop.provider.chat_with_retry = AsyncMock(side_effect=lambda *a, **kw: next(calls))
loop.tools.get_definitions = MagicMock(return_value=[]) loop.tools.get_definitions = MagicMock(return_value=[])
loop.tools.execute = AsyncMock(return_value="ok") loop.tools.execute = AsyncMock(return_value="ok")

View File

@@ -165,3 +165,46 @@ class TestSubagentCancellation:
provider.get_default_model.return_value = "test-model" provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus) mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
assert await mgr.cancel_by_session("nonexistent") == 0 assert await mgr.cancel_by_session("nonexistent") == 0
@pytest.mark.asyncio
async def test_subagent_preserves_reasoning_fields_in_tool_turn(self, monkeypatch, tmp_path):
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse, ToolCallRequest
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
captured_second_call: list[dict] = []
call_count = {"n": 0}
async def scripted_chat_with_retry(*, messages, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return LLMResponse(
content="thinking",
tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})],
reasoning_content="hidden reasoning",
thinking_blocks=[{"type": "thinking", "thinking": "step"}],
)
captured_second_call[:] = messages
return LLMResponse(content="done", tool_calls=[])
provider.chat_with_retry = scripted_chat_with_retry
mgr = SubagentManager(provider=provider, workspace=tmp_path, bus=bus)
async def fake_execute(self, name, arguments):
return "tool result"
monkeypatch.setattr("nanobot.agent.tools.registry.ToolRegistry.execute", fake_execute)
await mgr._run_subagent("sub-1", "do task", "label", {"channel": "test", "chat_id": "c1"})
assistant_messages = [
msg for msg in captured_second_call
if msg.get("role") == "assistant" and msg.get("tool_calls")
]
assert len(assistant_messages) == 1
assert assistant_messages[0]["reasoning_content"] == "hidden reasoning"
assert assistant_messages[0]["thinking_blocks"] == [{"type": "thinking", "thinking": "step"}]