412 lines
15 KiB
Python
412 lines
15 KiB
Python
"""Test async memory consolidation background task.
|
|
|
|
Tests for the new async background consolidation feature where token-based
|
|
consolidation runs when sessions are idle instead of blocking user interactions.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.agent.memory import MemoryConsolidator
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.providers.base import LLMResponse
|
|
|
|
|
|
class TestMemoryConsolidatorBackgroundTask:
|
|
"""Tests for the background consolidation task."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_and_stop_background_task(self, tmp_path) -> None:
|
|
"""Test that background task can be started and stopped cleanly."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Start background task
|
|
await consolidator.start_background_task()
|
|
assert consolidator._background_task is not None
|
|
assert not consolidator._stop_event.is_set()
|
|
|
|
# Stop background task
|
|
await consolidator.stop_background_task()
|
|
assert consolidator._background_task is None or consolidator._background_task.done()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_background_loop_checks_idle_sessions(self, tmp_path) -> None:
|
|
"""Test that background loop checks for idle sessions."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
session1 = MagicMock()
|
|
session1.key = "cli:session1"
|
|
session1.messages = [{"role": "user", "content": "msg"}]
|
|
session2 = MagicMock()
|
|
session2.key = "cli:session2"
|
|
session2.messages = []
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[session1, session2])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mark session1 as recently active (should not consolidate)
|
|
consolidator._session_last_activity["cli:session1"] = asyncio.get_event_loop().time()
|
|
# Leave session2 without activity record (should be considered idle)
|
|
|
|
# Mock maybe_consolidate_by_tokens_async to track calls
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
|
|
|
|
# Run the background loop with a very short interval for testing
|
|
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
|
|
# Start task and let it run briefly
|
|
await consolidator.start_background_task()
|
|
await asyncio.sleep(0.5)
|
|
await consolidator.stop_background_task()
|
|
|
|
# session2 should have been checked for consolidation (it's idle)
|
|
# session1 should not have been consolidated (recently active)
|
|
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_activity_updates_timestamp(self, tmp_path) -> None:
|
|
"""Test that record_activity updates the activity timestamp."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Initially no activity recorded
|
|
assert "cli:test" not in consolidator._session_last_activity
|
|
|
|
# Record activity
|
|
consolidator.record_activity("cli:test")
|
|
assert "cli:test" in consolidator._session_last_activity
|
|
|
|
# Wait a bit and check timestamp changed
|
|
await asyncio.sleep(0.1)
|
|
consolidator.record_activity("cli:test")
|
|
# The timestamp should have updated (though we can't easily verify the exact value)
|
|
assert consolidator._session_last_activity["cli:test"] > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_maybe_consolidate_by_tokens_schedules_async_task(self, tmp_path) -> None:
|
|
"""Test that maybe_consolidate_by_tokens schedules an async task."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
session = MagicMock()
|
|
session.messages = [{"role": "user", "content": "msg"}]
|
|
session.key = "cli:test"
|
|
session.context_window_tokens = 200
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[session])
|
|
sessions.save = MagicMock()
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mock the async version to track calls
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
|
|
|
|
# Call the synchronous method - should schedule a task
|
|
consolidator.maybe_consolidate_by_tokens(session)
|
|
|
|
# The async version should have been scheduled via create_task
|
|
await asyncio.sleep(0.1) # Let the task start
|
|
|
|
|
|
class TestAgentLoopIntegration:
|
|
"""Integration tests for AgentLoop with background consolidation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_starts_background_task(self, tmp_path) -> None:
|
|
"""Test that run() starts the background consolidation task."""
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
model="test-model",
|
|
context_window_tokens=200,
|
|
)
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
# Start the loop in background
|
|
import asyncio
|
|
run_task = asyncio.create_task(loop.run())
|
|
|
|
# Give it time to start the background task
|
|
await asyncio.sleep(0.3)
|
|
|
|
# Background task should be started
|
|
assert loop.memory_consolidator._background_task is not None
|
|
|
|
# Stop the loop
|
|
await loop.stop()
|
|
await run_task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_loop_stops_background_task(self, tmp_path) -> None:
|
|
"""Test that stop() stops the background consolidation task."""
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=tmp_path,
|
|
model="test-model",
|
|
context_window_tokens=200,
|
|
)
|
|
loop.tools.get_definitions = MagicMock(return_value=[])
|
|
|
|
# Start the loop in background
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.3)
|
|
|
|
# Stop via async stop method
|
|
await loop.stop()
|
|
|
|
# Background task should be stopped
|
|
assert loop.memory_consolidator._background_task is None or \
|
|
loop.memory_consolidator._background_task.done()
|
|
|
|
|
|
class TestIdleDetection:
|
|
"""Tests for idle session detection logic."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recently_active_session_not_considered_idle(self, tmp_path) -> None:
|
|
"""Test that recently active sessions are not consolidated."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
session = MagicMock()
|
|
session.key = "cli:active"
|
|
session.messages = [{"role": "user", "content": "msg"}]
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[session])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mark as recently active (within idle threshold)
|
|
current_time = asyncio.get_event_loop().time()
|
|
consolidator._session_last_activity["cli:active"] = current_time
|
|
|
|
# Mock maybe_consolidate_by_tokens_async to track calls
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
|
|
|
|
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
|
|
await consolidator.start_background_task()
|
|
# Sleep less than 2 * interval to ensure session remains active
|
|
await asyncio.sleep(0.15)
|
|
await consolidator.stop_background_task()
|
|
|
|
# Should not have been called for recently active session
|
|
assert consolidator.maybe_consolidate_by_tokens_async.await_count == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idle_session_triggers_consolidation(self, tmp_path) -> None:
|
|
"""Test that idle sessions trigger consolidation."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
session = MagicMock()
|
|
session.key = "cli:idle"
|
|
session.messages = [{"role": "user", "content": "msg"}]
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[session])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mark as inactive (older than idle threshold)
|
|
current_time = asyncio.get_event_loop().time()
|
|
consolidator._session_last_activity["cli:idle"] = current_time - 10 # 10 seconds ago
|
|
|
|
# Mock maybe_consolidate_by_tokens_async to track calls
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
|
|
|
|
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
|
|
await consolidator.start_background_task()
|
|
await asyncio.sleep(0.5)
|
|
await consolidator.stop_background_task()
|
|
|
|
# Should have been called for idle session
|
|
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 1
|
|
|
|
|
|
class TestScheduleConsolidation:
|
|
"""Tests for the schedule consolidation mechanism."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schedule_consolidation_runs_async_version(self, tmp_path) -> None:
|
|
"""Test that scheduling runs the async version."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
session = MagicMock()
|
|
session.messages = [{"role": "user", "content": "msg"}]
|
|
session.key = "cli:scheduled"
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[session])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mock the async version to track calls
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock() # type: ignore[method-assign]
|
|
|
|
# Schedule consolidation
|
|
await consolidator._schedule_consolidation(session)
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
assert consolidator.maybe_consolidate_by_tokens_async.await_count >= 1
|
|
|
|
|
|
class TestBackgroundTaskCancellation:
|
|
"""Tests for background task cancellation and error handling."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_background_task_handles_exceptions_gracefully(self, tmp_path) -> None:
|
|
"""Test that exceptions in the loop don't crash it."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Mock maybe_consolidate_by_tokens_async to raise an exception
|
|
consolidator.maybe_consolidate_by_tokens_async = AsyncMock( # type: ignore[method-assign]
|
|
side_effect=Exception("Test exception")
|
|
)
|
|
|
|
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 0.1):
|
|
await consolidator.start_background_task()
|
|
await asyncio.sleep(0.5)
|
|
# Task should still be running despite exceptions
|
|
assert consolidator._background_task is not None
|
|
await consolidator.stop_background_task()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_cancels_running_task(self, tmp_path) -> None:
|
|
"""Test that stop properly cancels a running task."""
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
|
|
|
|
sessions = MagicMock()
|
|
sessions.all = MagicMock(return_value=[])
|
|
|
|
consolidator = MemoryConsolidator(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=200,
|
|
build_messages=lambda **kw: [],
|
|
get_tool_definitions=lambda: [],
|
|
)
|
|
|
|
# Start a task that will sleep for a while
|
|
with patch.object(consolidator, '_IDLE_CHECK_INTERVAL', 10): # Long interval
|
|
await consolidator.start_background_task()
|
|
# Task should be running
|
|
assert consolidator._background_task is not None
|
|
|
|
# Stop should cancel it
|
|
await consolidator.stop_background_task()
|
|
|
|
# Verify task was cancelled or completed
|
|
assert consolidator._background_task is None or \
|
|
consolidator._background_task.done()
|