diff --git a/README.md b/README.md index cb751ba..f20e21f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ⚑️ Delivers core agent functionality in just **~4,000** lines of code β€” **99% smaller** than Clawdbot's 430k+ lines. -πŸ“ Real-time line count: **3,806 lines** (run `bash core_agent_lines.sh` to verify anytime) +πŸ“ Real-time line count: **3,862 lines** (run `bash core_agent_lines.sh` to verify anytime) ## πŸ“’ News @@ -593,7 +593,7 @@ Config file: `~/.nanobot/config.json` | `deepseek` | LLM (DeepSeek direct) | [platform.deepseek.com](https://platform.deepseek.com) | | `groq` | LLM + **Voice transcription** (Whisper) | [console.groq.com](https://console.groq.com) | | `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) | -| `minimax` | LLM (MiniMax direct) | [platform.minimax.io](https://platform.minimax.io) | +| `minimax` | LLM (MiniMax direct) | [platform.minimaxi.com](https://platform.minimaxi.com) | | `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) | | `siliconflow` | LLM (SiliconFlow/η‘…εŸΊζ΅εŠ¨) | [siliconflow.cn](https://siliconflow.cn) | | `volcengine` | LLM (VolcEngine/η«ε±±εΌ•ζ“Ž) | [volcengine.com](https://www.volcengine.com) | @@ -776,6 +776,21 @@ Two transport modes are supported: | **Stdio** | `command` + `args` | Local process via `npx` / `uvx` | | **HTTP** | `url` + `headers` (optional) | Remote endpoint (`https://mcp.example.com/sse`) | +Use `toolTimeout` to override the default 30s per-call timeout for slow servers: + +```json +{ + "tools": { + "mcpServers": { + "my-slow-server": { + "url": "https://example.com/mcp/", + "toolTimeout": 120 + } + } + } +} +``` + MCP tools are automatically discovered and registered on startup. The LLM can use them alongside built-in tools β€” no extra configuration needed. @@ -865,6 +880,59 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot agent -m "Hello!" docker run -v ~/.nanobot:/root/.nanobot --rm nanobot status ``` +## 🐧 Linux Service + +Run the gateway as a systemd user service so it starts automatically and restarts on failure. + +**1. Find the nanobot binary path:** + +```bash +which nanobot # e.g. /home/user/.local/bin/nanobot +``` + +**2. Create the service file** at `~/.config/systemd/user/nanobot-gateway.service` (replace `ExecStart` path if needed): + +```ini +[Unit] +Description=Nanobot Gateway +After=network.target + +[Service] +Type=simple +ExecStart=%h/.local/bin/nanobot gateway +Restart=always +RestartSec=10 +NoNewPrivileges=yes +ProtectSystem=strict +ReadWritePaths=%h + +[Install] +WantedBy=default.target +``` + +**3. Enable and start:** + +```bash +systemctl --user daemon-reload +systemctl --user enable --now nanobot-gateway +``` + +**Common operations:** + +```bash +systemctl --user status nanobot-gateway # check status +systemctl --user restart nanobot-gateway # restart after config changes +journalctl --user -u nanobot-gateway -f # follow logs +``` + +If you edit the `.service` file itself, run `systemctl --user daemon-reload` before restarting. + +> **Note:** User services only run while you are logged in. To keep the gateway running after logout, enable lingering: +> +> ```bash +> loginctl enable-linger $USER +> ``` + ## πŸ“ Project Structure ``` diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 7b9317c..b05ba90 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -7,7 +7,7 @@ import json import re from contextlib import AsyncExitStack from pathlib import Path -from typing import TYPE_CHECKING, Awaitable, Callable +from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger @@ -95,6 +95,8 @@ class AgentLoop: self._mcp_connected = False self._mcp_connecting = False self._consolidating: set[str] = set() # Session keys with consolidation in progress + self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks + self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() def _register_default_tools(self) -> None: @@ -194,7 +196,8 @@ class AgentLoop: clean = self._strip_think(response.content) if clean: await on_progress(clean) - await on_progress(self._tool_hint(response.tool_calls)) + else: + await on_progress(self._tool_hint(response.tool_calls)) tool_call_dicts = [ { @@ -270,6 +273,18 @@ class AgentLoop: self._running = False logger.info("Agent loop stopping") + def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: + lock = self._consolidation_locks.get(session_key) + if lock is None: + lock = asyncio.Lock() + self._consolidation_locks[session_key] = lock + return lock + + def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: + """Drop lock entry if no longer in use.""" + if not lock.locked(): + self._consolidation_locks.pop(session_key, None) + async def _process_message( self, msg: InboundMessage, @@ -305,33 +320,55 @@ class AgentLoop: # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - messages_to_archive = session.messages.copy() + lock = self._get_consolidation_lock(session.key) + self._consolidating.add(session.key) + try: + async with lock: + snapshot = session.messages[session.last_consolidated:] + if snapshot: + temp = Session(key=session.key) + temp.messages = list(snapshot) + if not await self._consolidate_memory(temp, archive_all=True): + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + except Exception: + logger.exception("/new archival failed for {}", session.key) + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + finally: + self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - - async def _consolidate_and_cleanup(): - temp = Session(key=session.key) - temp.messages = messages_to_archive - await self._consolidate_memory(temp, archive_all=True) - - asyncio.create_task(_consolidate_and_cleanup()) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new β€” Start a new conversation\n/help β€” Show available commands") if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) + lock = self._get_consolidation_lock(session.key) async def _consolidate_and_unlock(): try: - await self._consolidate_memory(session) + async with lock: + await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + _task = asyncio.current_task() + if _task is not None: + self._consolidation_tasks.discard(_task) - asyncio.create_task(_consolidate_and_unlock()) + _task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(_task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) if message_tool := self.tools.get("message"): @@ -376,9 +413,9 @@ class AgentLoop: metadata=msg.metadata or {}, ) - async def _consolidate_memory(self, session, archive_all: bool = False) -> None: - """Delegate to MemoryStore.consolidate().""" - await MemoryStore(self.workspace).consolidate( + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: + """Delegate to MemoryStore.consolidate(). Returns True on success.""" + return await MemoryStore(self.workspace).consolidate( session, self.provider, self.model, archive_all=archive_all, memory_window=self.memory_window, ) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 51abd8f..cdbc49f 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -74,8 +74,11 @@ class MemoryStore: *, archive_all: bool = False, memory_window: int = 50, - ) -> None: - """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.""" + ) -> bool: + """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call. + + Returns True on success (including no-op), False on failure. + """ if archive_all: old_messages = session.messages keep_count = 0 @@ -83,12 +86,12 @@ class MemoryStore: else: keep_count = memory_window // 2 if len(session.messages) <= keep_count: - return + return True if len(session.messages) - session.last_consolidated <= 0: - return + return True old_messages = session.messages[session.last_consolidated:-keep_count] if not old_messages: - return + return True logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count) lines = [] @@ -119,7 +122,7 @@ class MemoryStore: if not response.has_tool_calls: logger.warning("Memory consolidation: LLM did not call save_memory, skipping") - return + return False args = response.tool_calls[0].arguments if entry := args.get("history_entry"): @@ -134,5 +137,7 @@ class MemoryStore: session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) - except Exception as e: - logger.error("Memory consolidation failed: {}", e) + return True + except Exception: + logger.exception("Memory consolidation failed") + return False diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 9c169e4..b87da11 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -13,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | if not p.is_absolute() and workspace: p = workspace / p resolved = p.resolve() - if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + if allowed_dir: + try: + resolved.relative_to(allowed_dir.resolve()) + except ValueError: + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") return resolved diff --git a/nanobot/agent/tools/mcp.py b/nanobot/agent/tools/mcp.py index ad352bf..0257d52 100644 --- a/nanobot/agent/tools/mcp.py +++ b/nanobot/agent/tools/mcp.py @@ -1,5 +1,6 @@ """MCP client: connects to MCP servers and wraps their tools as native nanobot tools.""" +import asyncio from contextlib import AsyncExitStack from typing import Any @@ -13,12 +14,13 @@ from nanobot.agent.tools.registry import ToolRegistry class MCPToolWrapper(Tool): """Wraps a single MCP server tool as a nanobot Tool.""" - def __init__(self, session, server_name: str, tool_def): + def __init__(self, session, server_name: str, tool_def, tool_timeout: int = 30): self._session = session self._original_name = tool_def.name self._name = f"mcp_{server_name}_{tool_def.name}" self._description = tool_def.description or tool_def.name self._parameters = tool_def.inputSchema or {"type": "object", "properties": {}} + self._tool_timeout = tool_timeout @property def name(self) -> str: @@ -34,7 +36,14 @@ class MCPToolWrapper(Tool): async def execute(self, **kwargs: Any) -> str: from mcp import types - result = await self._session.call_tool(self._original_name, arguments=kwargs) + try: + result = await asyncio.wait_for( + self._session.call_tool(self._original_name, arguments=kwargs), + timeout=self._tool_timeout, + ) + except asyncio.TimeoutError: + logger.warning("MCP tool '{}' timed out after {}s", self._name, self._tool_timeout) + return f"(MCP tool call timed out after {self._tool_timeout}s)" parts = [] for block in result.content: if isinstance(block, types.TextContent): @@ -83,7 +92,7 @@ async def connect_mcp_servers( tools = await session.list_tools() for tool_def in tools.tools: - wrapper = MCPToolWrapper(session, name, tool_def) + wrapper = MCPToolWrapper(session, name, tool_def, tool_timeout=cfg.tool_timeout) registry.register(wrapper) logger.debug("MCP: registered tool '{}' from server '{}'", wrapper.name, name) diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 1b6f46b..5dc05fb 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -304,7 +304,8 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - self._processed_uids.clear() + # Evict a random half to cap memory; mark_seen is the primary dedup + self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:]) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 815d853..2d50d74 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -503,18 +503,29 @@ class FeishuChannel(BaseChannel): logger.error("Error downloading image {}: {}", image_key, e) return None, None - def _download_file_sync(self, file_key: str) -> tuple[bytes | None, str | None]: - """Download a file from Feishu by file_key.""" + def _download_file_sync( + self, message_id: str, file_key: str, resource_type: str = "file" + ) -> tuple[bytes | None, str | None]: + """Download a file/audio/media from a Feishu message by message_id and file_key.""" try: - request = GetFileRequest.builder().file_key(file_key).build() - response = self._client.im.v1.file.get(request) + request = ( + GetMessageResourceRequest.builder() + .message_id(message_id) + .file_key(file_key) + .type(resource_type) + .build() + ) + response = self._client.im.v1.message_resource.get(request) if response.success(): - return response.file, response.file_name + file_data = response.file + if hasattr(file_data, "read"): + file_data = file_data.read() + return file_data, response.file_name else: - logger.error("Failed to download file: code={}, msg={}", response.code, response.msg) + logger.error("Failed to download {}: code={}, msg={}", resource_type, response.code, response.msg) return None, None - except Exception as e: - logger.error("Error downloading file {}: {}", file_key, e) + except Exception: + logger.exception("Error downloading {} {}", resource_type, file_key) return None, None async def _download_and_save_media( @@ -544,14 +555,14 @@ class FeishuChannel(BaseChannel): if not filename: filename = f"{image_key[:16]}.jpg" - elif msg_type in ("audio", "file"): + elif msg_type in ("audio", "file", "media"): file_key = content_json.get("file_key") - if file_key: + if file_key and message_id: data, filename = await loop.run_in_executor( - None, self._download_file_sync, file_key + None, self._download_file_sync, message_id, file_key, msg_type ) if not filename: - ext = ".opus" if msg_type == "audio" else "" + ext = {"audio": ".opus", "media": ".mp4"}.get(msg_type, "") filename = f"{file_key[:16]}{ext}" if data and filename: @@ -684,7 +695,7 @@ class FeishuChannel(BaseChannel): if text: content_parts.append(text) - elif msg_type in ("image", "audio", "file"): + elif msg_type in ("image", "audio", "file", "media"): file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id) if file_path: media_paths.append(file_path) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 16cbfb8..5352a30 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -55,7 +55,6 @@ class QQChannel(BaseChannel): self.config: QQConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) - self._bot_task: asyncio.Task | None = None async def start(self) -> None: """Start the QQ bot.""" @@ -71,8 +70,8 @@ class QQChannel(BaseChannel): BotClass = _make_bot_class(self) self._client = BotClass() - self._bot_task = asyncio.create_task(self._run_bot()) logger.info("QQ bot started (C2C private message)") + await self._run_bot() async def _run_bot(self) -> None: """Run the bot connection with auto-reconnect.""" @@ -88,11 +87,10 @@ class QQChannel(BaseChannel): async def stop(self) -> None: """Stop the QQ bot.""" self._running = False - if self._bot_task: - self._bot_task.cancel() + if self._client: try: - await self._bot_task - except asyncio.CancelledError: + await self._client.close() + except Exception: pass logger.info("QQ bot stopped") @@ -130,5 +128,5 @@ class QQChannel(BaseChannel): content=content, metadata={"message_id": data.id}, ) - except Exception as e: - logger.error("Error handling QQ message: {}", e) + except Exception: + logger.exception("Error handling QQ message") diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 4fc1f41..b0f9bbb 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -179,18 +179,21 @@ class SlackChannel(BaseChannel): except Exception as e: logger.debug("Slack reactions_add failed: {}", e) - await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=text, - metadata={ - "slack": { - "event": event, - "thread_ts": thread_ts, - "channel_type": channel_type, - } - }, - ) + try: + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=text, + metadata={ + "slack": { + "event": event, + "thread_ts": thread_ts, + "channel_type": channel_type, + } + }, + ) + except Exception: + logger.exception("Error handling Slack message from {}", sender_id) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index bd602dc..fc9fede 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -261,6 +261,7 @@ class MCPServerConfig(Base): env: dict[str, str] = Field(default_factory=dict) # Stdio: extra env vars url: str = "" # HTTP: streamable HTTP endpoint URL headers: dict[str, str] = Field(default_factory=dict) # HTTP: Custom HTTP Headers + tool_timeout: int = 30 # Seconds before a tool call is cancelled class ToolsConfig(Base): diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py index c69c38b..eb1599a 100644 --- a/nanobot/providers/base.py +++ b/nanobot/providers/base.py @@ -39,6 +39,46 @@ class LLMProvider(ABC): def __init__(self, api_key: str | None = None, api_base: str | None = None): self.api_key = api_key self.api_base = api_base + + @staticmethod + def _sanitize_empty_content(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Replace empty text content that causes provider 400 errors. + + Empty content can appear when MCP tools return nothing. Most providers + reject empty-string content or empty text blocks in list content. + """ + result: list[dict[str, Any]] = [] + for msg in messages: + content = msg.get("content") + + if isinstance(content, str) and not content: + clean = dict(msg) + clean["content"] = None if (msg.get("role") == "assistant" and msg.get("tool_calls")) else "(empty)" + result.append(clean) + continue + + if isinstance(content, list): + filtered = [ + item for item in content + if not ( + isinstance(item, dict) + and item.get("type") in ("text", "input_text", "output_text") + and not item.get("text") + ) + ] + if len(filtered) != len(content): + clean = dict(msg) + if filtered: + clean["content"] = filtered + elif msg.get("role") == "assistant" and msg.get("tool_calls"): + clean["content"] = None + else: + clean["content"] = "(empty)" + result.append(clean) + continue + + result.append(msg) + return result @abstractmethod async def chat( diff --git a/nanobot/providers/custom_provider.py b/nanobot/providers/custom_provider.py index f190ccf..a578d14 100644 --- a/nanobot/providers/custom_provider.py +++ b/nanobot/providers/custom_provider.py @@ -19,8 +19,12 @@ class CustomProvider(LLMProvider): async def chat(self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7) -> LLMResponse: - kwargs: dict[str, Any] = {"model": model or self.default_model, "messages": messages, - "max_tokens": max(1, max_tokens), "temperature": temperature} + kwargs: dict[str, Any] = { + "model": model or self.default_model, + "messages": self._sanitize_empty_content(messages), + "max_tokens": max(1, max_tokens), + "temperature": temperature, + } if tools: kwargs.update(tools=tools, tool_choice="auto") try: @@ -40,8 +44,9 @@ class CustomProvider(LLMProvider): return LLMResponse( content=msg.content, tool_calls=tool_calls, finish_reason=choice.finish_reason or "stop", usage={"prompt_tokens": u.prompt_tokens, "completion_tokens": u.completion_tokens, "total_tokens": u.total_tokens} if u else {}, - reasoning_content=getattr(msg, "reasoning_content", None), + reasoning_content=getattr(msg, "reasoning_content", None) or None, ) def get_default_model(self) -> str: return self.default_model + diff --git a/nanobot/providers/litellm_provider.py b/nanobot/providers/litellm_provider.py index 58c9ac2..7402a2b 100644 --- a/nanobot/providers/litellm_provider.py +++ b/nanobot/providers/litellm_provider.py @@ -196,7 +196,7 @@ class LiteLLMProvider(LLMProvider): kwargs: dict[str, Any] = { "model": model, - "messages": self._sanitize_messages(messages), + "messages": self._sanitize_messages(self._sanitize_empty_content(messages)), "max_tokens": max_tokens, "temperature": temperature, } @@ -257,7 +257,7 @@ class LiteLLMProvider(LLMProvider): "total_tokens": response.usage.total_tokens, } - reasoning_content = getattr(message, "reasoning_content", None) + reasoning_content = getattr(message, "reasoning_content", None) or None return LLMResponse( content=message.content, diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 18e23b2..5f23dc2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -1,6 +1,7 @@ """Session management for conversation history.""" import json +import shutil from pathlib import Path from dataclasses import dataclass, field from datetime import datetime @@ -108,9 +109,11 @@ class SessionManager: if not path.exists(): legacy_path = self._get_legacy_session_path(key) if legacy_path.exists(): - import shutil - shutil.move(str(legacy_path), str(path)) - logger.info("Migrated session {} from legacy path", key) + try: + shutil.move(str(legacy_path), str(path)) + logger.info("Migrated session {} from legacy path", key) + except Exception: + logger.exception("Failed to migrate session {}", key) if not path.exists(): return None diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index e204733..323519e 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -1,5 +1,8 @@ """Test session management with cache-friendly message handling.""" +import asyncio +from unittest.mock import AsyncMock, MagicMock + import pytest from pathlib import Path from nanobot.session.manager import Session, SessionManager @@ -475,3 +478,351 @@ class TestEmptyAndBoundarySessions: expected_count = 60 - KEEP_COUNT - 10 assert len(old_messages) == expected_count assert_messages_content(old_messages, 10, 34) + + +class TestConsolidationDeduplicationGuard: + """Test that consolidation tasks are deduplicated and serialized.""" + + @pytest.mark.asyncio + async def test_consolidation_guard_prevents_duplicate_tasks(self, tmp_path: Path) -> None: + """Concurrent messages above memory_window spawn only one consolidation task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls + consolidation_calls += 1 + await asyncio.sleep(0.05) + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await loop._process_message(msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 1, ( + f"Expected exactly 1 consolidation, got {consolidation_calls}" + ) + + @pytest.mark.asyncio + async def test_new_command_guard_prevents_concurrent_consolidation( + self, tmp_path: Path + ) -> None: + """/new command does not run consolidation concurrently with in-flight consolidation.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + active = 0 + max_active = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls, active, max_active + consolidation_calls += 1 + active += 1 + max_active = max(max_active, active) + await asyncio.sleep(0.05) + active -= 1 + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + await loop._process_message(new_msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 2, ( + f"Expected normal + /new consolidations, got {consolidation_calls}" + ) + assert max_active == 1, ( + f"Expected serialized consolidation, observed concurrency={max_active}" + ) + + @pytest.mark.asyncio + async def test_consolidation_tasks_are_referenced(self, tmp_path: Path) -> None: + """create_task results are tracked in _consolidation_tasks while in flight.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + + async def _slow_consolidate(_session, archive_all: bool = False) -> None: + started.set() + await asyncio.sleep(0.1) + + loop._consolidate_memory = _slow_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + await started.wait() + assert len(loop._consolidation_tasks) == 1, "Task must be referenced while in-flight" + + await asyncio.sleep(0.15) + assert len(loop._consolidation_tasks) == 0, ( + "Task reference must be removed after completion" + ) + + @pytest.mark.asyncio + async def test_new_waits_for_inflight_consolidation_and_preserves_messages( + self, tmp_path: Path + ) -> None: + """/new waits for in-flight consolidation and archives before clear.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = 0 + + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return True + started.set() + await release.wait() + return True + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + + await asyncio.sleep(0.02) + assert not pending_new.done(), "/new should wait while consolidation is in-flight" + + release.set() + response = await pending_new + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count > 0, "Expected /new archival to process a non-empty snapshot" + + session_after = loop.sessions.get_or_create("cli:test") + assert session_after.messages == [], "Session should be cleared after successful archival" + + @pytest.mark.asyncio + async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: + """/new must keep session data if archive step reports failure.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(5): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + before_count = len(session.messages) + + async def _failing_consolidate(sess, archive_all: bool = False) -> bool: + if archive_all: + return False + return True + + loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "failed" in response.content.lower() + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == before_count, ( + "Session must remain intact when /new archival fails" + ) + + @pytest.mark.asyncio + async def test_new_archives_only_unconsolidated_messages_after_inflight_task( + self, tmp_path: Path + ) -> None: + """/new should archive only messages not yet consolidated by prior task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = -1 + + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return True + + started.set() + await release.wait() + sess.last_consolidated = len(sess.messages) - 3 + return True + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + await asyncio.sleep(0.02) + assert not pending_new.done() + + release.set() + response = await pending_new + + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count == 3, ( + f"Expected only unconsolidated tail to archive, got {archived_count}" + ) + + @pytest.mark.asyncio + async def test_new_cleans_up_consolidation_lock_for_invalidated_session( + self, tmp_path: Path + ) -> None: + """/new should remove lock entry for fully invalidated session key.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(3): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + # Ensure lock exists before /new. + _ = loop._get_consolidation_lock(session.key) + assert session.key in loop._consolidation_locks + + async def _ok_consolidate(sess, archive_all: bool = False) -> bool: + return True + + loop._consolidate_memory = _ok_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "new session started" in response.content.lower() + assert session.key not in loop._consolidation_locks