Files
nanobot/nanobot/heartbeat/service.py
2026-02-23 13:59:47 +00:00

138 lines
4.6 KiB
Python

"""Heartbeat service - periodic agent wake-up to check for tasks."""
import asyncio
from pathlib import Path
from typing import Any, Callable, Coroutine
from loguru import logger
# Default interval: 30 minutes
DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60
# Token the agent replies with when there is nothing to report
HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
# The prompt sent to agent during heartbeat
HEARTBEAT_PROMPT = (
"Read HEARTBEAT.md in your workspace and follow any instructions listed there. "
f"If nothing needs attention, reply with exactly: {HEARTBEAT_OK_TOKEN}"
)
def _is_heartbeat_empty(content: str | None) -> bool:
"""Check if HEARTBEAT.md has no actionable content."""
if not content:
return True
# Lines to skip: empty, headers, HTML comments, empty checkboxes
skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
for line in content.split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("<!--") or line in skip_patterns:
continue
return False # Found actionable content
return True
class HeartbeatService:
"""
Periodic heartbeat service that wakes the agent to check for tasks.
The agent reads HEARTBEAT.md from the workspace and executes any tasks
listed there. If it has something to report, the response is forwarded
to the user via on_notify. If nothing needs attention, the agent replies
HEARTBEAT_OK and the response is silently dropped.
"""
def __init__(
self,
workspace: Path,
on_heartbeat: Callable[[str], Coroutine[Any, Any, str]] | None = None,
on_notify: Callable[[str], Coroutine[Any, Any, None]] | None = None,
interval_s: int = DEFAULT_HEARTBEAT_INTERVAL_S,
enabled: bool = True,
):
self.workspace = workspace
self.on_heartbeat = on_heartbeat
self.on_notify = on_notify
self.interval_s = interval_s
self.enabled = enabled
self._running = False
self._task: asyncio.Task | None = None
@property
def heartbeat_file(self) -> Path:
return self.workspace / "HEARTBEAT.md"
def _read_heartbeat_file(self) -> str | None:
"""Read HEARTBEAT.md content."""
if self.heartbeat_file.exists():
try:
return self.heartbeat_file.read_text(encoding="utf-8")
except Exception:
return None
return None
async def start(self) -> None:
"""Start the heartbeat service."""
if not self.enabled:
logger.info("Heartbeat disabled")
return
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("Heartbeat started (every {}s)", self.interval_s)
def stop(self) -> None:
"""Stop the heartbeat service."""
self._running = False
if self._task:
self._task.cancel()
self._task = None
async def _run_loop(self) -> None:
"""Main heartbeat loop."""
while self._running:
try:
await asyncio.sleep(self.interval_s)
if self._running:
await self._tick()
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Heartbeat error: {}", e)
async def _tick(self) -> None:
"""Execute a single heartbeat tick."""
content = self._read_heartbeat_file()
# Skip if HEARTBEAT.md is empty or doesn't exist
if _is_heartbeat_empty(content):
logger.debug("Heartbeat: no tasks (HEARTBEAT.md empty)")
return
logger.info("Heartbeat: checking for tasks...")
if self.on_heartbeat:
try:
response = await self.on_heartbeat(HEARTBEAT_PROMPT)
if HEARTBEAT_OK_TOKEN in response.upper():
logger.info("Heartbeat: OK (nothing to report)")
else:
logger.info("Heartbeat: completed, delivering response")
if self.on_notify:
await self.on_notify(response)
except Exception:
logger.exception("Heartbeat execution failed")
async def trigger_now(self) -> str | None:
"""Manually trigger a heartbeat."""
if self.on_heartbeat:
return await self.on_heartbeat(HEARTBEAT_PROMPT)
return None