Merge PR #1102 to replace HEARTBEAT_OK token with virtual tool-call decision
fix(heartbeat): replace HEARTBEAT_OK token with virtual tool-call decision
This commit is contained in:
@@ -360,19 +360,19 @@ def gateway(
|
||||
return "cli", "direct"
|
||||
|
||||
# Create heartbeat service
|
||||
async def on_heartbeat(prompt: str) -> str:
|
||||
"""Execute heartbeat through the agent."""
|
||||
async def on_heartbeat_execute(tasks: str) -> str:
|
||||
"""Phase 2: execute heartbeat tasks through the full agent loop."""
|
||||
channel, chat_id = _pick_heartbeat_target()
|
||||
|
||||
async def _silent(*_args, **_kwargs):
|
||||
pass
|
||||
|
||||
return await agent.process_direct(
|
||||
prompt,
|
||||
tasks,
|
||||
session_key="heartbeat",
|
||||
channel=channel,
|
||||
chat_id=chat_id,
|
||||
on_progress=_silent, # suppress: heartbeat should not push progress to external channels
|
||||
on_progress=_silent,
|
||||
)
|
||||
|
||||
async def on_heartbeat_notify(response: str) -> None:
|
||||
@@ -383,12 +383,15 @@ def gateway(
|
||||
return # No external channel available to deliver to
|
||||
await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
|
||||
|
||||
hb_cfg = config.gateway.heartbeat
|
||||
heartbeat = HeartbeatService(
|
||||
workspace=config.workspace_path,
|
||||
on_heartbeat=on_heartbeat,
|
||||
provider=provider,
|
||||
model=agent.model,
|
||||
on_execute=on_heartbeat_execute,
|
||||
on_notify=on_heartbeat_notify,
|
||||
interval_s=30 * 60, # 30 minutes
|
||||
enabled=True
|
||||
interval_s=hb_cfg.interval_s,
|
||||
enabled=hb_cfg.enabled,
|
||||
)
|
||||
|
||||
if channels.enabled_channels:
|
||||
|
||||
@@ -228,11 +228,19 @@ class ProvidersConfig(Base):
|
||||
github_copilot: ProviderConfig = Field(default_factory=ProviderConfig) # Github Copilot (OAuth)
|
||||
|
||||
|
||||
class HeartbeatConfig(Base):
|
||||
"""Heartbeat service configuration."""
|
||||
|
||||
enabled: bool = True
|
||||
interval_s: int = 30 * 60 # 30 minutes
|
||||
|
||||
|
||||
class GatewayConfig(Base):
|
||||
"""Gateway/server configuration."""
|
||||
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 18790
|
||||
heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig)
|
||||
|
||||
|
||||
class WebSearchConfig(Base):
|
||||
|
||||
@@ -1,80 +1,110 @@
|
||||
"""Heartbeat service - periodic agent wake-up to check for tasks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Coroutine
|
||||
from typing import TYPE_CHECKING, Any, Callable, Coroutine
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# Default interval: 30 minutes
|
||||
DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60
|
||||
if TYPE_CHECKING:
|
||||
from nanobot.providers.base import LLMProvider
|
||||
|
||||
# Token the agent replies with when there is nothing to report
|
||||
HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
|
||||
|
||||
# The prompt sent to agent during heartbeat
|
||||
HEARTBEAT_PROMPT = (
|
||||
"Read HEARTBEAT.md in your workspace and follow any instructions listed there. "
|
||||
f"If nothing needs attention, reply with exactly: {HEARTBEAT_OK_TOKEN}"
|
||||
)
|
||||
|
||||
|
||||
def _is_heartbeat_empty(content: str | None) -> bool:
|
||||
"""Check if HEARTBEAT.md has no actionable content."""
|
||||
if not content:
|
||||
return True
|
||||
|
||||
# Lines to skip: empty, headers, HTML comments, empty checkboxes
|
||||
skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
|
||||
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or line.startswith("<!--") or line in skip_patterns:
|
||||
continue
|
||||
return False # Found actionable content
|
||||
|
||||
return True
|
||||
_HEARTBEAT_TOOL = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "heartbeat",
|
||||
"description": "Report heartbeat decision after reviewing tasks.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["skip", "run"],
|
||||
"description": "skip = nothing to do, run = has active tasks",
|
||||
},
|
||||
"tasks": {
|
||||
"type": "string",
|
||||
"description": "Natural-language summary of active tasks (required for run)",
|
||||
},
|
||||
},
|
||||
"required": ["action"],
|
||||
},
|
||||
},
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
class HeartbeatService:
|
||||
"""
|
||||
Periodic heartbeat service that wakes the agent to check for tasks.
|
||||
|
||||
The agent reads HEARTBEAT.md from the workspace and executes any tasks
|
||||
listed there. If it has something to report, the response is forwarded
|
||||
to the user via on_notify. If nothing needs attention, the agent replies
|
||||
HEARTBEAT_OK and the response is silently dropped.
|
||||
Phase 1 (decision): reads HEARTBEAT.md and asks the LLM — via a virtual
|
||||
tool call — whether there are active tasks. This avoids free-text parsing
|
||||
and the unreliable HEARTBEAT_OK token.
|
||||
|
||||
Phase 2 (execution): only triggered when Phase 1 returns ``run``. The
|
||||
``on_execute`` callback runs the task through the full agent loop and
|
||||
returns the result to deliver.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace: Path,
|
||||
on_heartbeat: Callable[[str], Coroutine[Any, Any, str]] | None = None,
|
||||
provider: LLMProvider,
|
||||
model: str,
|
||||
on_execute: Callable[[str], Coroutine[Any, Any, str]] | None = None,
|
||||
on_notify: Callable[[str], Coroutine[Any, Any, None]] | None = None,
|
||||
interval_s: int = DEFAULT_HEARTBEAT_INTERVAL_S,
|
||||
interval_s: int = 30 * 60,
|
||||
enabled: bool = True,
|
||||
):
|
||||
self.workspace = workspace
|
||||
self.on_heartbeat = on_heartbeat
|
||||
self.provider = provider
|
||||
self.model = model
|
||||
self.on_execute = on_execute
|
||||
self.on_notify = on_notify
|
||||
self.interval_s = interval_s
|
||||
self.enabled = enabled
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
|
||||
|
||||
@property
|
||||
def heartbeat_file(self) -> Path:
|
||||
return self.workspace / "HEARTBEAT.md"
|
||||
|
||||
|
||||
def _read_heartbeat_file(self) -> str | None:
|
||||
"""Read HEARTBEAT.md content."""
|
||||
if self.heartbeat_file.exists():
|
||||
try:
|
||||
return self.heartbeat_file.read_text(encoding="utf-8")
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
async def _decide(self, content: str) -> tuple[str, str]:
|
||||
"""Phase 1: ask LLM to decide skip/run via virtual tool call.
|
||||
|
||||
Returns (action, tasks) where action is 'skip' or 'run'.
|
||||
"""
|
||||
response = await self.provider.chat(
|
||||
messages=[
|
||||
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
|
||||
{"role": "user", "content": (
|
||||
"Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n"
|
||||
f"{content}"
|
||||
)},
|
||||
],
|
||||
tools=_HEARTBEAT_TOOL,
|
||||
model=self.model,
|
||||
)
|
||||
|
||||
if not response.has_tool_calls:
|
||||
return "skip", ""
|
||||
|
||||
args = response.tool_calls[0].arguments
|
||||
return args.get("action", "skip"), args.get("tasks", "")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the heartbeat service."""
|
||||
if not self.enabled:
|
||||
@@ -83,18 +113,18 @@ class HeartbeatService:
|
||||
if self._running:
|
||||
logger.warning("Heartbeat already running")
|
||||
return
|
||||
|
||||
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._run_loop())
|
||||
logger.info("Heartbeat started (every {}s)", self.interval_s)
|
||||
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the heartbeat service."""
|
||||
self._running = False
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
self._task = None
|
||||
|
||||
|
||||
async def _run_loop(self) -> None:
|
||||
"""Main heartbeat loop."""
|
||||
while self._running:
|
||||
@@ -106,32 +136,38 @@ class HeartbeatService:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Heartbeat error: {}", e)
|
||||
|
||||
|
||||
async def _tick(self) -> None:
|
||||
"""Execute a single heartbeat tick."""
|
||||
content = self._read_heartbeat_file()
|
||||
|
||||
# Skip if HEARTBEAT.md is empty or doesn't exist
|
||||
if _is_heartbeat_empty(content):
|
||||
logger.debug("Heartbeat: no tasks (HEARTBEAT.md empty)")
|
||||
if not content:
|
||||
logger.debug("Heartbeat: HEARTBEAT.md missing or empty")
|
||||
return
|
||||
|
||||
|
||||
logger.info("Heartbeat: checking for tasks...")
|
||||
|
||||
if self.on_heartbeat:
|
||||
try:
|
||||
response = await self.on_heartbeat(HEARTBEAT_PROMPT)
|
||||
if HEARTBEAT_OK_TOKEN in response.upper():
|
||||
logger.info("Heartbeat: OK (nothing to report)")
|
||||
else:
|
||||
|
||||
try:
|
||||
action, tasks = await self._decide(content)
|
||||
|
||||
if action != "run":
|
||||
logger.info("Heartbeat: OK (nothing to report)")
|
||||
return
|
||||
|
||||
logger.info("Heartbeat: tasks found, executing...")
|
||||
if self.on_execute:
|
||||
response = await self.on_execute(tasks)
|
||||
if response and self.on_notify:
|
||||
logger.info("Heartbeat: completed, delivering response")
|
||||
if self.on_notify:
|
||||
await self.on_notify(response)
|
||||
except Exception:
|
||||
logger.exception("Heartbeat execution failed")
|
||||
|
||||
await self.on_notify(response)
|
||||
except Exception:
|
||||
logger.exception("Heartbeat execution failed")
|
||||
|
||||
async def trigger_now(self) -> str | None:
|
||||
"""Manually trigger a heartbeat."""
|
||||
if self.on_heartbeat:
|
||||
return await self.on_heartbeat(HEARTBEAT_PROMPT)
|
||||
return None
|
||||
content = self._read_heartbeat_file()
|
||||
if not content:
|
||||
return None
|
||||
action, tasks = await self._decide(content)
|
||||
if action != "run" or not self.on_execute:
|
||||
return None
|
||||
return await self.on_execute(tasks)
|
||||
|
||||
Reference in New Issue
Block a user