diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 7b9317c..b05ba90 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -7,7 +7,7 @@ import json import re from contextlib import AsyncExitStack from pathlib import Path -from typing import TYPE_CHECKING, Awaitable, Callable +from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger @@ -95,6 +95,8 @@ class AgentLoop: self._mcp_connected = False self._mcp_connecting = False self._consolidating: set[str] = set() # Session keys with consolidation in progress + self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks + self._consolidation_locks: dict[str, asyncio.Lock] = {} self._register_default_tools() def _register_default_tools(self) -> None: @@ -194,7 +196,8 @@ class AgentLoop: clean = self._strip_think(response.content) if clean: await on_progress(clean) - await on_progress(self._tool_hint(response.tool_calls)) + else: + await on_progress(self._tool_hint(response.tool_calls)) tool_call_dicts = [ { @@ -270,6 +273,18 @@ class AgentLoop: self._running = False logger.info("Agent loop stopping") + def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock: + lock = self._consolidation_locks.get(session_key) + if lock is None: + lock = asyncio.Lock() + self._consolidation_locks[session_key] = lock + return lock + + def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None: + """Drop lock entry if no longer in use.""" + if not lock.locked(): + self._consolidation_locks.pop(session_key, None) + async def _process_message( self, msg: InboundMessage, @@ -305,33 +320,55 @@ class AgentLoop: # Slash commands cmd = msg.content.strip().lower() if cmd == "/new": - messages_to_archive = session.messages.copy() + lock = self._get_consolidation_lock(session.key) + self._consolidating.add(session.key) + try: + async with lock: + snapshot = session.messages[session.last_consolidated:] + if snapshot: + temp = Session(key=session.key) + temp.messages = list(snapshot) + if not await self._consolidate_memory(temp, archive_all=True): + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + except Exception: + logger.exception("/new archival failed for {}", session.key) + return OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="Memory archival failed, session not cleared. Please try again.", + ) + finally: + self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + session.clear() self.sessions.save(session) self.sessions.invalidate(session.key) - - async def _consolidate_and_cleanup(): - temp = Session(key=session.key) - temp.messages = messages_to_archive - await self._consolidate_memory(temp, archive_all=True) - - asyncio.create_task(_consolidate_and_cleanup()) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, - content="New session started. Memory consolidation in progress.") + content="New session started.") if cmd == "/help": return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") if len(session.messages) > self.memory_window and session.key not in self._consolidating: self._consolidating.add(session.key) + lock = self._get_consolidation_lock(session.key) async def _consolidate_and_unlock(): try: - await self._consolidate_memory(session) + async with lock: + await self._consolidate_memory(session) finally: self._consolidating.discard(session.key) + self._prune_consolidation_lock(session.key, lock) + _task = asyncio.current_task() + if _task is not None: + self._consolidation_tasks.discard(_task) - asyncio.create_task(_consolidate_and_unlock()) + _task = asyncio.create_task(_consolidate_and_unlock()) + self._consolidation_tasks.add(_task) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) if message_tool := self.tools.get("message"): @@ -376,9 +413,9 @@ class AgentLoop: metadata=msg.metadata or {}, ) - async def _consolidate_memory(self, session, archive_all: bool = False) -> None: - """Delegate to MemoryStore.consolidate().""" - await MemoryStore(self.workspace).consolidate( + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: + """Delegate to MemoryStore.consolidate(). Returns True on success.""" + return await MemoryStore(self.workspace).consolidate( session, self.provider, self.model, archive_all=archive_all, memory_window=self.memory_window, ) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 51abd8f..cdbc49f 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -74,8 +74,11 @@ class MemoryStore: *, archive_all: bool = False, memory_window: int = 50, - ) -> None: - """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.""" + ) -> bool: + """Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call. + + Returns True on success (including no-op), False on failure. + """ if archive_all: old_messages = session.messages keep_count = 0 @@ -83,12 +86,12 @@ class MemoryStore: else: keep_count = memory_window // 2 if len(session.messages) <= keep_count: - return + return True if len(session.messages) - session.last_consolidated <= 0: - return + return True old_messages = session.messages[session.last_consolidated:-keep_count] if not old_messages: - return + return True logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count) lines = [] @@ -119,7 +122,7 @@ class MemoryStore: if not response.has_tool_calls: logger.warning("Memory consolidation: LLM did not call save_memory, skipping") - return + return False args = response.tool_calls[0].arguments if entry := args.get("history_entry"): @@ -134,5 +137,7 @@ class MemoryStore: session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated) - except Exception as e: - logger.error("Memory consolidation failed: {}", e) + return True + except Exception: + logger.exception("Memory consolidation failed") + return False diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index 9c169e4..b87da11 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -13,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | if not p.is_absolute() and workspace: p = workspace / p resolved = p.resolve() - if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): - raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + if allowed_dir: + try: + resolved.relative_to(allowed_dir.resolve()) + except ValueError: + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") return resolved diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py index 1b6f46b..5dc05fb 100644 --- a/nanobot/channels/email.py +++ b/nanobot/channels/email.py @@ -304,7 +304,8 @@ class EmailChannel(BaseChannel): self._processed_uids.add(uid) # mark_seen is the primary dedup; this set is a safety net if len(self._processed_uids) > self._MAX_PROCESSED_UIDS: - self._processed_uids.clear() + # Evict a random half to cap memory; mark_seen is the primary dedup + self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:]) if mark_seen: client.store(imap_id, "+FLAGS", "\\Seen") diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index 4fc1f41..b0f9bbb 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -179,18 +179,21 @@ class SlackChannel(BaseChannel): except Exception as e: logger.debug("Slack reactions_add failed: {}", e) - await self._handle_message( - sender_id=sender_id, - chat_id=chat_id, - content=text, - metadata={ - "slack": { - "event": event, - "thread_ts": thread_ts, - "channel_type": channel_type, - } - }, - ) + try: + await self._handle_message( + sender_id=sender_id, + chat_id=chat_id, + content=text, + metadata={ + "slack": { + "event": event, + "thread_ts": thread_ts, + "channel_type": channel_type, + } + }, + ) + except Exception: + logger.exception("Error handling Slack message from {}", sender_id) def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool: if channel_type == "im": diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 6155463..f1f9b30 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -668,6 +668,33 @@ def channels_status(): slack_config ) + # DingTalk + dt = config.channels.dingtalk + dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]" + table.add_row( + "DingTalk", + "✓" if dt.enabled else "✗", + dt_config + ) + + # QQ + qq = config.channels.qq + qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]" + table.add_row( + "QQ", + "✓" if qq.enabled else "✗", + qq_config + ) + + # Email + em = config.channels.email + em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]" + table.add_row( + "Email", + "✓" if em.enabled else "✗", + em_config + ) + console.print(table) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 18e23b2..5f23dc2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -1,6 +1,7 @@ """Session management for conversation history.""" import json +import shutil from pathlib import Path from dataclasses import dataclass, field from datetime import datetime @@ -108,9 +109,11 @@ class SessionManager: if not path.exists(): legacy_path = self._get_legacy_session_path(key) if legacy_path.exists(): - import shutil - shutil.move(str(legacy_path), str(path)) - logger.info("Migrated session {} from legacy path", key) + try: + shutil.move(str(legacy_path), str(path)) + logger.info("Migrated session {} from legacy path", key) + except Exception: + logger.exception("Failed to migrate session {}", key) if not path.exists(): return None diff --git a/tests/test_consolidate_offset.py b/tests/test_consolidate_offset.py index e204733..323519e 100644 --- a/tests/test_consolidate_offset.py +++ b/tests/test_consolidate_offset.py @@ -1,5 +1,8 @@ """Test session management with cache-friendly message handling.""" +import asyncio +from unittest.mock import AsyncMock, MagicMock + import pytest from pathlib import Path from nanobot.session.manager import Session, SessionManager @@ -475,3 +478,351 @@ class TestEmptyAndBoundarySessions: expected_count = 60 - KEEP_COUNT - 10 assert len(old_messages) == expected_count assert_messages_content(old_messages, 10, 34) + + +class TestConsolidationDeduplicationGuard: + """Test that consolidation tasks are deduplicated and serialized.""" + + @pytest.mark.asyncio + async def test_consolidation_guard_prevents_duplicate_tasks(self, tmp_path: Path) -> None: + """Concurrent messages above memory_window spawn only one consolidation task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls + consolidation_calls += 1 + await asyncio.sleep(0.05) + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await loop._process_message(msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 1, ( + f"Expected exactly 1 consolidation, got {consolidation_calls}" + ) + + @pytest.mark.asyncio + async def test_new_command_guard_prevents_concurrent_consolidation( + self, tmp_path: Path + ) -> None: + """/new command does not run consolidation concurrently with in-flight consolidation.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + consolidation_calls = 0 + active = 0 + max_active = 0 + + async def _fake_consolidate(_session, archive_all: bool = False) -> None: + nonlocal consolidation_calls, active, max_active + consolidation_calls += 1 + active += 1 + max_active = max(max_active, active) + await asyncio.sleep(0.05) + active -= 1 + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + await loop._process_message(new_msg) + await asyncio.sleep(0.1) + + assert consolidation_calls == 2, ( + f"Expected normal + /new consolidations, got {consolidation_calls}" + ) + assert max_active == 1, ( + f"Expected serialized consolidation, observed concurrency={max_active}" + ) + + @pytest.mark.asyncio + async def test_consolidation_tasks_are_referenced(self, tmp_path: Path) -> None: + """create_task results are tracked in _consolidation_tasks while in flight.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + + async def _slow_consolidate(_session, archive_all: bool = False) -> None: + started.set() + await asyncio.sleep(0.1) + + loop._consolidate_memory = _slow_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + + await started.wait() + assert len(loop._consolidation_tasks) == 1, "Task must be referenced while in-flight" + + await asyncio.sleep(0.15) + assert len(loop._consolidation_tasks) == 0, ( + "Task reference must be removed after completion" + ) + + @pytest.mark.asyncio + async def test_new_waits_for_inflight_consolidation_and_preserves_messages( + self, tmp_path: Path + ) -> None: + """/new waits for in-flight consolidation and archives before clear.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = 0 + + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return True + started.set() + await release.wait() + return True + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + + await asyncio.sleep(0.02) + assert not pending_new.done(), "/new should wait while consolidation is in-flight" + + release.set() + response = await pending_new + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count > 0, "Expected /new archival to process a non-empty snapshot" + + session_after = loop.sessions.get_or_create("cli:test") + assert session_after.messages == [], "Session should be cleared after successful archival" + + @pytest.mark.asyncio + async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None: + """/new must keep session data if archive step reports failure.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(5): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + before_count = len(session.messages) + + async def _failing_consolidate(sess, archive_all: bool = False) -> bool: + if archive_all: + return False + return True + + loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "failed" in response.content.lower() + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == before_count, ( + "Session must remain intact when /new archival fails" + ) + + @pytest.mark.asyncio + async def test_new_archives_only_unconsolidated_messages_after_inflight_task( + self, tmp_path: Path + ) -> None: + """/new should archive only messages not yet consolidated by prior task.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(15): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + started = asyncio.Event() + release = asyncio.Event() + archived_count = -1 + + async def _fake_consolidate(sess, archive_all: bool = False) -> bool: + nonlocal archived_count + if archive_all: + archived_count = len(sess.messages) + return True + + started.set() + await release.wait() + sess.last_consolidated = len(sess.messages) - 3 + return True + + loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign] + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello") + await loop._process_message(msg) + await started.wait() + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + pending_new = asyncio.create_task(loop._process_message(new_msg)) + await asyncio.sleep(0.02) + assert not pending_new.done() + + release.set() + response = await pending_new + + assert response is not None + assert "new session started" in response.content.lower() + assert archived_count == 3, ( + f"Expected only unconsolidated tail to archive, got {archived_count}" + ) + + @pytest.mark.asyncio + async def test_new_cleans_up_consolidation_lock_for_invalidated_session( + self, tmp_path: Path + ) -> None: + """/new should remove lock entry for fully invalidated session key.""" + from nanobot.agent.loop import AgentLoop + from nanobot.bus.events import InboundMessage + from nanobot.bus.queue import MessageBus + from nanobot.providers.base import LLMResponse + + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + loop = AgentLoop( + bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10 + ) + + loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + loop.tools.get_definitions = MagicMock(return_value=[]) + + session = loop.sessions.get_or_create("cli:test") + for i in range(3): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + # Ensure lock exists before /new. + _ = loop._get_consolidation_lock(session.key) + assert session.key in loop._consolidation_locks + + async def _ok_consolidate(sess, archive_all: bool = False) -> bool: + return True + + loop._consolidate_memory = _ok_consolidate # type: ignore[method-assign] + + new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(new_msg) + + assert response is not None + assert "new session started" in response.content.lower() + assert session.key not in loop._consolidation_locks