161 lines
4.1 KiB
Python
161 lines
4.1 KiB
Python
import asyncio
|
|
|
|
import pytest
|
|
|
|
from nanobot.heartbeat.service import HeartbeatService
|
|
from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
|
|
|
|
|
|
class DummyProvider(LLMProvider):
|
|
def __init__(self, responses: list[LLMResponse]):
|
|
super().__init__()
|
|
self._responses = list(responses)
|
|
self.calls = 0
|
|
|
|
async def chat(self, *args, **kwargs) -> LLMResponse:
|
|
self.calls += 1
|
|
if self._responses:
|
|
return self._responses.pop(0)
|
|
return LLMResponse(content="", tool_calls=[])
|
|
|
|
def get_default_model(self) -> str:
|
|
return "test-model"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_is_idempotent(tmp_path) -> None:
|
|
provider = DummyProvider([])
|
|
|
|
service = HeartbeatService(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="openai/gpt-4o-mini",
|
|
interval_s=9999,
|
|
enabled=True,
|
|
)
|
|
|
|
await service.start()
|
|
first_task = service._task
|
|
await service.start()
|
|
|
|
assert service._task is first_task
|
|
|
|
service.stop()
|
|
await asyncio.sleep(0)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decide_returns_skip_when_no_tool_call(tmp_path) -> None:
|
|
provider = DummyProvider([LLMResponse(content="no tool call", tool_calls=[])])
|
|
service = HeartbeatService(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="openai/gpt-4o-mini",
|
|
)
|
|
|
|
action, tasks = await service._decide("heartbeat content")
|
|
assert action == "skip"
|
|
assert tasks == ""
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_trigger_now_executes_when_decision_is_run(tmp_path) -> None:
|
|
(tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8")
|
|
|
|
provider = DummyProvider([
|
|
LLMResponse(
|
|
content="",
|
|
tool_calls=[
|
|
ToolCallRequest(
|
|
id="hb_1",
|
|
name="heartbeat",
|
|
arguments={"action": "run", "tasks": "check open tasks"},
|
|
)
|
|
],
|
|
)
|
|
])
|
|
|
|
called_with: list[str] = []
|
|
|
|
async def _on_execute(tasks: str) -> str:
|
|
called_with.append(tasks)
|
|
return "done"
|
|
|
|
service = HeartbeatService(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="openai/gpt-4o-mini",
|
|
on_execute=_on_execute,
|
|
)
|
|
|
|
result = await service.trigger_now()
|
|
assert result == "done"
|
|
assert called_with == ["check open tasks"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_trigger_now_returns_none_when_decision_is_skip(tmp_path) -> None:
|
|
(tmp_path / "HEARTBEAT.md").write_text("- [ ] do thing", encoding="utf-8")
|
|
|
|
provider = DummyProvider([
|
|
LLMResponse(
|
|
content="",
|
|
tool_calls=[
|
|
ToolCallRequest(
|
|
id="hb_1",
|
|
name="heartbeat",
|
|
arguments={"action": "skip"},
|
|
)
|
|
],
|
|
)
|
|
])
|
|
|
|
async def _on_execute(tasks: str) -> str:
|
|
return tasks
|
|
|
|
service = HeartbeatService(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="openai/gpt-4o-mini",
|
|
on_execute=_on_execute,
|
|
)
|
|
|
|
assert await service.trigger_now() is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decide_retries_transient_error_then_succeeds(tmp_path, monkeypatch) -> None:
|
|
provider = DummyProvider([
|
|
LLMResponse(content="429 rate limit", finish_reason="error"),
|
|
LLMResponse(
|
|
content="",
|
|
tool_calls=[
|
|
ToolCallRequest(
|
|
id="hb_1",
|
|
name="heartbeat",
|
|
arguments={"action": "run", "tasks": "check open tasks"},
|
|
)
|
|
],
|
|
),
|
|
])
|
|
|
|
delays: list[int] = []
|
|
|
|
async def _fake_sleep(delay: int) -> None:
|
|
delays.append(delay)
|
|
|
|
monkeypatch.setattr(asyncio, "sleep", _fake_sleep)
|
|
|
|
service = HeartbeatService(
|
|
workspace=tmp_path,
|
|
provider=provider,
|
|
model="openai/gpt-4o-mini",
|
|
)
|
|
|
|
action, tasks = await service._decide("heartbeat content")
|
|
|
|
assert action == "run"
|
|
assert tasks == "check open tasks"
|
|
assert provider.calls == 2
|
|
assert delays == [1]
|