Files
nanobot/tests/test_async_memory_consolidation.py
2026-03-13 14:06:22 +08:00

412 lines
15 KiB
Python

"""Test async memory consolidation background task.
Tests for the new async background consolidation feature where token-based
consolidation runs when sessions are idle instead of blocking user interactions.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.agent.loop import AgentLoop
from nanobot.agent.memory import MemoryConsolidator
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
class TestMemoryConsolidatorBackgroundTask:
"""Tests for the background consolidation task."""
@pytest.mark.asyncio
async def test_start_and_stop_background_task(self, tmp_path) -> None:
"""Test that background task can be started and stopped cleanly."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
sessions = MagicMock()
sessions.all = MagicMock(return_value=[])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Start background task
await consolidator.start_background_task()
assert consolidator._background_task is not None
assert not consolidator._stop_event.is_set()
# Stop background task
await consolidator.stop_background_task()
assert consolidator._background_task is None or consolidator._background_task.done()
@pytest.mark.asyncio
async def test_background_loop_checks_idle_sessions(self, tmp_path) -> None:
"""Test that background loop checks for idle sessions."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
session1 = MagicMock()
session1.key = "cli:session1"
session1.messages = [{"role": "user", "content": "msg"}]
session2 = MagicMock()
session2.key = "cli:session2"
session2.messages = []
sessions = MagicMock()
sessions.all = MagicMock(return_value=[session1, session2])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mark session1 as recently active (should not consolidate)
consolidator._session_last_activity["cli:session1"] = asyncio.get_event_loop().time()
# Leave session2 without activity record (should be considered idle)
# Mock maybe_consolidate_by_tokens_async to track calls
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
# Run the background loop with a very short interval for testing
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
# Start task and let it run briefly
await consolidator.start_background_task()
await asyncio.sleep(0.5)
await consolidator.stop_background_task()
# session2 should have been checked for consolidation (it's idle)
# session1 should not have been consolidated (recently active)
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 0
@pytest.mark.asyncio
async def test_record_activity_updates_timestamp(self, tmp_path) -> None:
"""Test that record_activity updates the activity timestamp."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
sessions = MagicMock()
sessions.all = MagicMock(return_value=[])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Initially no activity recorded
assert "cli:test" not in consolidator._session_last_activity
# Record activity
consolidator.record_activity("cli:test")
assert "cli:test" in consolidator._session_last_activity
# Wait a bit and check timestamp changed
await asyncio.sleep(0.1)
consolidator.record_activity("cli:test")
# The timestamp should have updated (though we can't easily verify the exact value)
assert consolidator._session_last_activity["cli:test"] > 0
@pytest.mark.asyncio
async def test_maybe_consolidate_by_tokens_schedules_async_task(self, tmp_path) -> None:
"""Test that maybe_consolidate_by_tokens schedules an async task."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
session = MagicMock()
session.messages = [{"role": "user", "content": "msg"}]
session.key = "cli:test"
session.context_window_tokens = 200
sessions = MagicMock()
sessions.all = MagicMock(return_value=[session])
sessions.save = MagicMock()
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mock the async version to track calls
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
# Call the synchronous method - should schedule a task
consolidator.maybe_consolidate_by_tokens(session)
# The async version should have been scheduled via create_task
await asyncio.sleep(0.1) # Let the task start
class TestAgentLoopIntegration:
"""Integration tests for AgentLoop with background consolidation."""
@pytest.mark.asyncio
async def test_loop_starts_background_task(self, tmp_path) -> None:
"""Test that run() starts the background consolidation task."""
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus,
provider=provider,
workspace=tmp_path,
model="test-model",
context_window_tokens=200,
)
loop.tools.get_definitions = MagicMock(return_value=[])
# Start the loop in background
import asyncio
run_task = asyncio.create_task(loop.run())
# Give it time to start the background task
await asyncio.sleep(0.3)
# Background task should be started
assert loop.memory_consolidator._background_task is not None
# Stop the loop
await loop.stop()
await run_task
@pytest.mark.asyncio
async def test_loop_stops_background_task(self, tmp_path) -> None:
"""Test that stop() stops the background consolidation task."""
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
loop = AgentLoop(
bus=bus,
provider=provider,
workspace=tmp_path,
model="test-model",
context_window_tokens=200,
)
loop.tools.get_definitions = MagicMock(return_value=[])
# Start the loop in background
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.3)
# Stop via async stop method
await loop.stop()
# Background task should be stopped
assert loop.memory_consolidator._background_task is None or \
loop.memory_consolidator._background_task.done()
class TestIdleDetection:
"""Tests for idle session detection logic."""
@pytest.mark.asyncio
async def test_recently_active_session_not_considered_idle(self, tmp_path) -> None:
"""Test that recently active sessions are not consolidated."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
session = MagicMock()
session.key = "cli:active"
session.messages = [{"role": "user", "content": "msg"}]
sessions = MagicMock()
sessions.all = MagicMock(return_value=[session])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mark as recently active (within idle threshold)
current_time = asyncio.get_event_loop().time()
consolidator._session_last_activity["cli:active"] = current_time
# Mock maybe_consolidate_by_tokens_async to track calls
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
await consolidator.start_background_task()
# Sleep less than 2 * interval to ensure session remains active
await asyncio.sleep(0.15)
await consolidator.stop_background_task()
# Should not have been called for recently active session
assert consolidator.maybe_consolidate_by_tokens_async.await_count == 0
@pytest.mark.asyncio
async def test_idle_session_triggers_consolidation(self, tmp_path) -> None:
"""Test that idle sessions trigger consolidation."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
session = MagicMock()
session.key = "cli:idle"
session.messages = [{"role": "user", "content": "msg"}]
sessions = MagicMock()
sessions.all = MagicMock(return_value=[session])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mark as inactive (older than idle threshold)
current_time = asyncio.get_event_loop().time()
consolidator._session_last_activity["cli:idle"] = current_time - 10 # 10 seconds ago
# Mock maybe_consolidate_by_tokens_async to track calls
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
await consolidator.start_background_task()
await asyncio.sleep(0.5)
await consolidator.stop_background_task()
# Should have been called for idle session
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 1
class TestScheduleConsolidation:
"""Tests for the schedule consolidation mechanism."""
@pytest.mark.asyncio
async def test_schedule_consolidation_runs_async_version(self, tmp_path) -> None:
"""Test that scheduling runs the async version."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
session = MagicMock()
session.messages = [{"role": "user", "content": "msg"}]
session.key = "cli:scheduled"
sessions = MagicMock()
sessions.all = MagicMock(return_value=[session])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mock the async version to track calls
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
# Schedule consolidation
await consolidator._schedule_consolidation(session)
await asyncio.sleep(0.1)
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 1
class TestBackgroundTaskCancellation:
"""Tests for background task cancellation and error handling."""
@pytest.mark.asyncio
async def test_background_task_handles_exceptions_gracefully(self, tmp_path) -> None:
"""Test that exceptions in the loop don't crash it."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
sessions = MagicMock()
sessions.all = MagicMock(return_value=[])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Mock maybe_consolidate_by_tokens_async to raise an exception
consolidator.maybe_consolidate_by_tokens_async = AsyncMock( # type: ignore[method-assign]
side_effect=Exception("Test exception")
)
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
await consolidator.start_background_task()
await asyncio.sleep(0.5)
# Task should still be running despite exceptions
assert consolidator._background_task is not None
await consolidator.stop_background_task()
@pytest.mark.asyncio
async def test_stop_cancels_running_task(self, tmp_path) -> None:
"""Test that stop properly cancels a running task."""
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
sessions = MagicMock()
sessions.all = MagicMock(return_value=[])
consolidator = MemoryConsolidator(
workspace=tmp_path,
provider=provider,
model="test-model",
sessions=sessions,
context_window_tokens=200,
build_messages=lambda **kw: [],
get_tool_definitions=lambda: [],
)
# Start a task that will sleep for a while
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 10): # Long interval
await consolidator.start_background_task()
# Task should be running
assert consolidator._background_task is not None
# Stop should cancel it
await consolidator.stop_background_task()
# Verify task was cancelled or completed
assert consolidator._background_task is None or \
consolidator._background_task.done()