Merge branch 'main' into pr-962

This commit is contained in:
Re-bin
2026-02-22 17:24:34 +00:00
8 changed files with 472 additions and 42 deletions

View File

@@ -7,7 +7,7 @@ import json
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Awaitable, Callable
from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
@@ -95,6 +95,8 @@ class AgentLoop:
self._mcp_connected = False
self._mcp_connecting = False
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: dict[str, asyncio.Lock] = {}
self._register_default_tools()
def _register_default_tools(self) -> None:
@@ -194,7 +196,8 @@ class AgentLoop:
clean = self._strip_think(response.content)
if clean:
await on_progress(clean)
await on_progress(self._tool_hint(response.tool_calls))
else:
await on_progress(self._tool_hint(response.tool_calls))
tool_call_dicts = [
{
@@ -270,6 +273,18 @@ class AgentLoop:
self._running = False
logger.info("Agent loop stopping")
def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock:
lock = self._consolidation_locks.get(session_key)
if lock is None:
lock = asyncio.Lock()
self._consolidation_locks[session_key] = lock
return lock
def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None:
"""Drop lock entry if no longer in use."""
if not lock.locked():
self._consolidation_locks.pop(session_key, None)
async def _process_message(
self,
msg: InboundMessage,
@@ -305,33 +320,55 @@ class AgentLoop:
# Slash commands
cmd = msg.content.strip().lower()
if cmd == "/new":
messages_to_archive = session.messages.copy()
lock = self._get_consolidation_lock(session.key)
self._consolidating.add(session.key)
try:
async with lock:
snapshot = session.messages[session.last_consolidated:]
if snapshot:
temp = Session(key=session.key)
temp.messages = list(snapshot)
if not await self._consolidate_memory(temp, archive_all=True):
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
except Exception:
logger.exception("/new archival failed for {}", session.key)
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Memory archival failed, session not cleared. Please try again.",
)
finally:
self._consolidating.discard(session.key)
self._prune_consolidation_lock(session.key, lock)
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session.key)
async def _consolidate_and_cleanup():
temp = Session(key=session.key)
temp.messages = messages_to_archive
await self._consolidate_memory(temp, archive_all=True)
asyncio.create_task(_consolidate_and_cleanup())
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started. Memory consolidation in progress.")
content="New session started.")
if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
if len(session.messages) > self.memory_window and session.key not in self._consolidating:
self._consolidating.add(session.key)
lock = self._get_consolidation_lock(session.key)
async def _consolidate_and_unlock():
try:
await self._consolidate_memory(session)
async with lock:
await self._consolidate_memory(session)
finally:
self._consolidating.discard(session.key)
self._prune_consolidation_lock(session.key, lock)
_task = asyncio.current_task()
if _task is not None:
self._consolidation_tasks.discard(_task)
asyncio.create_task(_consolidate_and_unlock())
_task = asyncio.create_task(_consolidate_and_unlock())
self._consolidation_tasks.add(_task)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"):
@@ -376,9 +413,9 @@ class AgentLoop:
metadata=msg.metadata or {},
)
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Delegate to MemoryStore.consolidate()."""
await MemoryStore(self.workspace).consolidate(
async def _consolidate_memory(self, session, archive_all: bool = False) -> bool:
"""Delegate to MemoryStore.consolidate(). Returns True on success."""
return await MemoryStore(self.workspace).consolidate(
session, self.provider, self.model,
archive_all=archive_all, memory_window=self.memory_window,
)

View File

@@ -74,8 +74,11 @@ class MemoryStore:
*,
archive_all: bool = False,
memory_window: int = 50,
) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call."""
) -> bool:
"""Consolidate old messages into MEMORY.md + HISTORY.md via LLM tool call.
Returns True on success (including no-op), False on failure.
"""
if archive_all:
old_messages = session.messages
keep_count = 0
@@ -83,12 +86,12 @@ class MemoryStore:
else:
keep_count = memory_window // 2
if len(session.messages) <= keep_count:
return
return True
if len(session.messages) - session.last_consolidated <= 0:
return
return True
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return
return True
logger.info("Memory consolidation: {} to consolidate, {} keep", len(old_messages), keep_count)
lines = []
@@ -119,7 +122,7 @@ class MemoryStore:
if not response.has_tool_calls:
logger.warning("Memory consolidation: LLM did not call save_memory, skipping")
return
return False
args = response.tool_calls[0].arguments
if entry := args.get("history_entry"):
@@ -134,5 +137,7 @@ class MemoryStore:
session.last_consolidated = 0 if archive_all else len(session.messages) - keep_count
logger.info("Memory consolidation done: {} messages, last_consolidated={}", len(session.messages), session.last_consolidated)
except Exception as e:
logger.error("Memory consolidation failed: {}", e)
return True
except Exception:
logger.exception("Memory consolidation failed")
return False

View File

@@ -13,8 +13,11 @@ def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path |
if not p.is_absolute() and workspace:
p = workspace / p
resolved = p.resolve()
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())):
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
if allowed_dir:
try:
resolved.relative_to(allowed_dir.resolve())
except ValueError:
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
return resolved

View File

@@ -304,7 +304,8 @@ class EmailChannel(BaseChannel):
self._processed_uids.add(uid)
# mark_seen is the primary dedup; this set is a safety net
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
self._processed_uids.clear()
# Evict a random half to cap memory; mark_seen is the primary dedup
self._processed_uids = set(list(self._processed_uids)[len(self._processed_uids) // 2:])
if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen")

View File

@@ -179,18 +179,21 @@ class SlackChannel(BaseChannel):
except Exception as e:
logger.debug("Slack reactions_add failed: {}", e)
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=text,
metadata={
"slack": {
"event": event,
"thread_ts": thread_ts,
"channel_type": channel_type,
}
},
)
try:
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=text,
metadata={
"slack": {
"event": event,
"thread_ts": thread_ts,
"channel_type": channel_type,
}
},
)
except Exception:
logger.exception("Error handling Slack message from {}", sender_id)
def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool:
if channel_type == "im":

View File

@@ -668,6 +668,33 @@ def channels_status():
slack_config
)
# DingTalk
dt = config.channels.dingtalk
dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]"
table.add_row(
"DingTalk",
"" if dt.enabled else "",
dt_config
)
# QQ
qq = config.channels.qq
qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]"
table.add_row(
"QQ",
"" if qq.enabled else "",
qq_config
)
# Email
em = config.channels.email
em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]"
table.add_row(
"Email",
"" if em.enabled else "",
em_config
)
console.print(table)

View File

@@ -1,6 +1,7 @@
"""Session management for conversation history."""
import json
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
@@ -108,9 +109,11 @@ class SessionManager:
if not path.exists():
legacy_path = self._get_legacy_session_path(key)
if legacy_path.exists():
import shutil
shutil.move(str(legacy_path), str(path))
logger.info("Migrated session {} from legacy path", key)
try:
shutil.move(str(legacy_path), str(path))
logger.info("Migrated session {} from legacy path", key)
except Exception:
logger.exception("Failed to migrate session {}", key)
if not path.exists():
return None

View File

@@ -1,5 +1,8 @@
"""Test session management with cache-friendly message handling."""
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
from pathlib import Path
from nanobot.session.manager import Session, SessionManager
@@ -475,3 +478,351 @@ class TestEmptyAndBoundarySessions:
expected_count = 60 - KEEP_COUNT - 10
assert len(old_messages) == expected_count
assert_messages_content(old_messages, 10, 34)
class TestConsolidationDeduplicationGuard:
"""Test that consolidation tasks are deduplicated and serialized."""
@pytest.mark.asyncio
async def test_consolidation_guard_prevents_duplicate_tasks(self, tmp_path: Path) -> None:
"""Concurrent messages above memory_window spawn only one consolidation task."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
consolidation_calls = 0
async def _fake_consolidate(_session, archive_all: bool = False) -> None:
nonlocal consolidation_calls
consolidation_calls += 1
await asyncio.sleep(0.05)
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await loop._process_message(msg)
await asyncio.sleep(0.1)
assert consolidation_calls == 1, (
f"Expected exactly 1 consolidation, got {consolidation_calls}"
)
@pytest.mark.asyncio
async def test_new_command_guard_prevents_concurrent_consolidation(
self, tmp_path: Path
) -> None:
"""/new command does not run consolidation concurrently with in-flight consolidation."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
consolidation_calls = 0
active = 0
max_active = 0
async def _fake_consolidate(_session, archive_all: bool = False) -> None:
nonlocal consolidation_calls, active, max_active
consolidation_calls += 1
active += 1
max_active = max(max_active, active)
await asyncio.sleep(0.05)
active -= 1
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
await loop._process_message(new_msg)
await asyncio.sleep(0.1)
assert consolidation_calls == 2, (
f"Expected normal + /new consolidations, got {consolidation_calls}"
)
assert max_active == 1, (
f"Expected serialized consolidation, observed concurrency={max_active}"
)
@pytest.mark.asyncio
async def test_consolidation_tasks_are_referenced(self, tmp_path: Path) -> None:
"""create_task results are tracked in _consolidation_tasks while in flight."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
started = asyncio.Event()
async def _slow_consolidate(_session, archive_all: bool = False) -> None:
started.set()
await asyncio.sleep(0.1)
loop._consolidate_memory = _slow_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
assert len(loop._consolidation_tasks) == 1, "Task must be referenced while in-flight"
await asyncio.sleep(0.15)
assert len(loop._consolidation_tasks) == 0, (
"Task reference must be removed after completion"
)
@pytest.mark.asyncio
async def test_new_waits_for_inflight_consolidation_and_preserves_messages(
self, tmp_path: Path
) -> None:
"""/new waits for in-flight consolidation and archives before clear."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
started = asyncio.Event()
release = asyncio.Event()
archived_count = 0
async def _fake_consolidate(sess, archive_all: bool = False) -> bool:
nonlocal archived_count
if archive_all:
archived_count = len(sess.messages)
return True
started.set()
await release.wait()
return True
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
pending_new = asyncio.create_task(loop._process_message(new_msg))
await asyncio.sleep(0.02)
assert not pending_new.done(), "/new should wait while consolidation is in-flight"
release.set()
response = await pending_new
assert response is not None
assert "new session started" in response.content.lower()
assert archived_count > 0, "Expected /new archival to process a non-empty snapshot"
session_after = loop.sessions.get_or_create("cli:test")
assert session_after.messages == [], "Session should be cleared after successful archival"
@pytest.mark.asyncio
async def test_new_does_not_clear_session_when_archive_fails(self, tmp_path: Path) -> None:
"""/new must keep session data if archive step reports failure."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(5):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
before_count = len(session.messages)
async def _failing_consolidate(sess, archive_all: bool = False) -> bool:
if archive_all:
return False
return True
loop._consolidate_memory = _failing_consolidate # type: ignore[method-assign]
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
response = await loop._process_message(new_msg)
assert response is not None
assert "failed" in response.content.lower()
session_after = loop.sessions.get_or_create("cli:test")
assert len(session_after.messages) == before_count, (
"Session must remain intact when /new archival fails"
)
@pytest.mark.asyncio
async def test_new_archives_only_unconsolidated_messages_after_inflight_task(
self, tmp_path: Path
) -> None:
"""/new should archive only messages not yet consolidated by prior task."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(15):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
started = asyncio.Event()
release = asyncio.Event()
archived_count = -1
async def _fake_consolidate(sess, archive_all: bool = False) -> bool:
nonlocal archived_count
if archive_all:
archived_count = len(sess.messages)
return True
started.set()
await release.wait()
sess.last_consolidated = len(sess.messages) - 3
return True
loop._consolidate_memory = _fake_consolidate # type: ignore[method-assign]
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="hello")
await loop._process_message(msg)
await started.wait()
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
pending_new = asyncio.create_task(loop._process_message(new_msg))
await asyncio.sleep(0.02)
assert not pending_new.done()
release.set()
response = await pending_new
assert response is not None
assert "new session started" in response.content.lower()
assert archived_count == 3, (
f"Expected only unconsolidated tail to archive, got {archived_count}"
)
@pytest.mark.asyncio
async def test_new_cleans_up_consolidation_lock_for_invalidated_session(
self, tmp_path: Path
) -> None:
"""/new should remove lock entry for fully invalidated session key."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus, provider=provider, workspace=tmp_path, model="test-model", memory_window=10
)
loop.provider.chat = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
loop.tools.get_definitions = MagicMock(return_value=[])
session = loop.sessions.get_or_create("cli:test")
for i in range(3):
session.add_message("user", f"msg{i}")
session.add_message("assistant", f"resp{i}")
loop.sessions.save(session)
# Ensure lock exists before /new.
_ = loop._get_consolidation_lock(session.key)
assert session.key in loop._consolidation_locks
async def _ok_consolidate(sess, archive_all: bool = False) -> bool:
return True
loop._consolidate_memory = _ok_consolidate # type: ignore[method-assign]
new_msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
response = await loop._process_message(new_msg)
assert response is not None
assert "new session started" in response.content.lower()
assert session.key not in loop._consolidation_locks