diff --git a/README.md b/README.md index f20e21f..148c8f4 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. -📏 Real-time line count: **3,862 lines** (run `bash core_agent_lines.sh` to verify anytime) +📏 Real-time line count: **3,897 lines** (run `bash core_agent_lines.sh` to verify anytime) ## 📢 News @@ -841,6 +841,26 @@ nanobot cron remove +
+Heartbeat (Periodic Tasks) + +The gateway wakes up every 30 minutes and checks `HEARTBEAT.md` in your workspace (`~/.nanobot/workspace/HEARTBEAT.md`). If the file has tasks, the agent executes them and delivers results to your most recently active chat channel. + +**Setup:** edit `~/.nanobot/workspace/HEARTBEAT.md` (created automatically by `nanobot onboard`): + +```markdown +## Periodic Tasks + +- [ ] Check weather forecast and send a summary +- [ ] Scan inbox for urgent emails +``` + +The agent can also manage this file itself — ask it to "add a periodic task" and it will update `HEARTBEAT.md` for you. + +> **Note:** The gateway must be running (`nanobot gateway`) and you must have chatted with the bot at least once so it knows which channel to deliver to. + +
+ ## 🐳 Docker > [!TIP] diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index c5869f3..98c13f2 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -96,14 +96,18 @@ Your workspace is at: {workspace_path} - History log: {workspace_path}/memory/HISTORY.md (grep-searchable) - Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md -IMPORTANT: When responding to direct questions or conversations, reply directly with your text response. -Only use the 'message' tool when you need to send a message to a specific chat channel (like WhatsApp). -For normal conversation, just respond with text - do not call the message tool. +Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel. -Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language). -If you need to use tools, call them directly — never send a preliminary message like "Let me check" without actually calling a tool. -When remembering something important, write to {workspace_path}/memory/MEMORY.md -To recall past events, grep {workspace_path}/memory/HISTORY.md""" +## Tool Call Guidelines +- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it. +- Before modifying a file, read it first to confirm its current content. +- Do not assume a file or directory exists — use list_dir or read_file to verify. +- After writing or editing a file, re-read it if accuracy matters. +- If a tool call fails, analyze the error before retrying with a different approach. + +## Memory +- Remember important facts: write to {workspace_path}/memory/MEMORY.md +- Recall past events: grep {workspace_path}/memory/HISTORY.md""" def _load_bootstrap_files(self) -> str: """Load all bootstrap files from workspace.""" diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b05ba90..8be8e51 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -27,7 +27,7 @@ from nanobot.providers.base import LLMProvider from nanobot.session.manager import Session, SessionManager if TYPE_CHECKING: - from nanobot.config.schema import ExecToolConfig + from nanobot.config.schema import ChannelsConfig, ExecToolConfig from nanobot.cron.service import CronService @@ -49,19 +49,21 @@ class AgentLoop: provider: LLMProvider, workspace: Path, model: str | None = None, - max_iterations: int = 20, - temperature: float = 0.7, + max_iterations: int = 40, + temperature: float = 0.1, max_tokens: int = 4096, - memory_window: int = 50, + memory_window: int = 100, brave_api_key: str | None = None, exec_config: ExecToolConfig | None = None, cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, + channels_config: ChannelsConfig | None = None, ): from nanobot.config.schema import ExecToolConfig self.bus = bus + self.channels_config = channels_config self.provider = provider self.workspace = workspace self.model = model or provider.get_default_model() @@ -172,9 +174,9 @@ class AgentLoop: async def _run_agent_loop( self, initial_messages: list[dict], - on_progress: Callable[[str], Awaitable[None]] | None = None, - ) -> tuple[str | None, list[str]]: - """Run the agent iteration loop. Returns (final_content, tools_used).""" + on_progress: Callable[..., Awaitable[None]] | None = None, + ) -> tuple[str | None, list[str], list[dict]]: + """Run the agent iteration loop. Returns (final_content, tools_used, messages).""" messages = initial_messages iteration = 0 final_content = None @@ -196,8 +198,7 @@ class AgentLoop: clean = self._strip_think(response.content) if clean: await on_progress(clean) - else: - await on_progress(self._tool_hint(response.tool_calls)) + await on_progress(self._tool_hint(response.tool_calls), tool_hint=True) tool_call_dicts = [ { @@ -227,7 +228,14 @@ class AgentLoop: final_content = self._strip_think(response.content) break - return final_content, tools_used + if final_content is None and iteration >= self.max_iterations: + logger.warning("Max iterations ({}) reached", self.max_iterations) + final_content = ( + f"I reached the maximum number of tool call iterations ({self.max_iterations}) " + "without completing the task. You can try breaking the task into smaller steps." + ) + + return final_content, tools_used, messages async def run(self) -> None: """Run the agent loop, processing messages from the bus.""" @@ -300,13 +308,13 @@ class AgentLoop: key = f"{channel}:{chat_id}" session = self.sessions.get_or_create(key) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) + history = session.get_history(max_messages=self.memory_window) messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), + history=history, current_message=msg.content, channel=channel, chat_id=chat_id, ) - final_content, _ = await self._run_agent_loop(messages) - session.add_message("user", f"[System: {msg.sender_id}] {msg.content}") - session.add_message("assistant", final_content or "Background task completed.") + final_content, _, all_msgs = await self._run_agent_loop(messages) + self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content or "Background task completed.") @@ -352,7 +360,8 @@ class AgentLoop: return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands") - if len(session.messages) > self.memory_window and session.key not in self._consolidating: + unconsolidated = len(session.messages) - session.last_consolidated + if (unconsolidated >= self.memory_window and session.key not in self._consolidating): self._consolidating.add(session.key) lock = self._get_consolidation_lock(session.key) @@ -375,21 +384,23 @@ class AgentLoop: if isinstance(message_tool, MessageTool): message_tool.start_turn() + history = session.get_history(max_messages=self.memory_window) initial_messages = self.context.build_messages( - history=session.get_history(max_messages=self.memory_window), + history=history, current_message=msg.content, media=msg.media if msg.media else None, channel=msg.channel, chat_id=msg.chat_id, ) - async def _bus_progress(content: str) -> None: + async def _bus_progress(content: str, *, tool_hint: bool = False) -> None: meta = dict(msg.metadata or {}) meta["_progress"] = True + meta["_tool_hint"] = tool_hint await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, )) - final_content, tools_used = await self._run_agent_loop( + final_content, _, all_msgs = await self._run_agent_loop( initial_messages, on_progress=on_progress or _bus_progress, ) @@ -399,9 +410,7 @@ class AgentLoop: preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) - session.add_message("user", msg.content) - session.add_message("assistant", final_content, - tools_used=tools_used if tools_used else None) + self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) if message_tool := self.tools.get("message"): @@ -413,6 +422,21 @@ class AgentLoop: metadata=msg.metadata or {}, ) + _TOOL_RESULT_MAX_CHARS = 500 + + def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None: + """Save new-turn messages into session, truncating large tool results.""" + from datetime import datetime + for m in messages[skip:]: + entry = {k: v for k, v in m.items() if k != "reasoning_content"} + if entry.get("role") == "tool" and isinstance(entry.get("content"), str): + content = entry["content"] + if len(content) > self._TOOL_RESULT_MAX_CHARS: + entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" + entry.setdefault("timestamp", datetime.now().isoformat()) + session.messages.append(entry) + session.updated_at = datetime.now() + async def _consolidate_memory(self, session, archive_all: bool = False) -> bool: """Delegate to MemoryStore.consolidate(). Returns True on success.""" return await MemoryStore(self.workspace).consolidate( diff --git a/nanobot/agent/tools/registry.py b/nanobot/agent/tools/registry.py index d9b33ff..8256a59 100644 --- a/nanobot/agent/tools/registry.py +++ b/nanobot/agent/tools/registry.py @@ -49,17 +49,22 @@ class ToolRegistry: Raises: KeyError: If tool not found. """ + _HINT = "\n\n[Analyze the error above and try a different approach.]" + tool = self._tools.get(name) if not tool: - return f"Error: Tool '{name}' not found" + return f"Error: Tool '{name}' not found. Available: {', '.join(self.tool_names)}" try: errors = tool.validate_params(params) if errors: - return f"Error: Invalid parameters for tool '{name}': " + "; ".join(errors) - return await tool.execute(**params) + return f"Error: Invalid parameters for tool '{name}': " + "; ".join(errors) + _HINT + result = await tool.execute(**params) + if isinstance(result, str) and result.startswith("Error"): + return result + _HINT + return result except Exception as e: - return f"Error executing {name}: {str(e)}" + return f"Error executing {name}: {str(e)}" + _HINT @property def tool_names(self) -> list[str]: diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py index 90cdda8..82eca83 100644 --- a/nanobot/agent/tools/web.py +++ b/nanobot/agent/tools/web.py @@ -58,12 +58,17 @@ class WebSearchTool(Tool): } def __init__(self, api_key: str | None = None, max_results: int = 5): - self.api_key = api_key or os.environ.get("BRAVE_API_KEY", "") + self.api_key = api_key self.max_results = max_results - + async def execute(self, query: str, count: int | None = None, **kwargs: Any) -> str: - if not self.api_key: - return "Error: BRAVE_API_KEY not configured" + api_key = self.api_key or os.environ.get("BRAVE_API_KEY", "") + if not api_key: + return ( + "Error: Brave Search API key not configured. " + "Set it in ~/.nanobot/config.json under tools.web.search.apiKey " + "(or export BRAVE_API_KEY), then restart the gateway." + ) try: n = min(max(count or self.max_results, 1), 10) @@ -71,7 +76,7 @@ class WebSearchTool(Tool): r = await client.get( "https://api.search.brave.com/res/v1/web/search", params={"q": query, "count": n}, - headers={"Accept": "application/json", "X-Subscription-Token": self.api_key}, + headers={"Accept": "application/json", "X-Subscription-Token": api_key}, timeout=10.0 ) r.raise_for_status() diff --git a/nanobot/bus/events.py b/nanobot/bus/events.py index a149e20..a48660d 100644 --- a/nanobot/bus/events.py +++ b/nanobot/bus/events.py @@ -16,11 +16,12 @@ class InboundMessage: timestamp: datetime = field(default_factory=datetime.now) media: list[str] = field(default_factory=list) # Media URLs metadata: dict[str, Any] = field(default_factory=dict) # Channel-specific data + session_key_override: str | None = None # Optional override for thread-scoped sessions @property def session_key(self) -> str: """Unique key for session identification.""" - return f"{self.channel}:{self.chat_id}" + return self.session_key_override or f"{self.channel}:{self.chat_id}" @dataclass diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 3a5a785..3010373 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -89,7 +89,8 @@ class BaseChannel(ABC): chat_id: str, content: str, media: list[str] | None = None, - metadata: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None, + session_key: str | None = None, ) -> None: """ Handle an incoming message from the chat platform. @@ -102,6 +103,7 @@ class BaseChannel(ABC): content: Message text content. media: Optional list of media URLs. metadata: Optional channel-specific metadata. + session_key: Optional session key override (e.g. thread-scoped sessions). """ if not self.is_allowed(sender_id): logger.warning( @@ -117,7 +119,8 @@ class BaseChannel(ABC): chat_id=str(chat_id), content=content, media=media or [], - metadata=metadata or {} + metadata=metadata or {}, + session_key_override=session_key, ) await self.bus.publish_inbound(msg) diff --git a/nanobot/channels/discord.py b/nanobot/channels/discord.py index 1d2d7a6..b9227fb 100644 --- a/nanobot/channels/discord.py +++ b/nanobot/channels/discord.py @@ -285,8 +285,11 @@ class DiscordChannel(BaseChannel): while self._running: try: await self._http.post(url, headers=headers) - except Exception: - pass + except asyncio.CancelledError: + return + except Exception as e: + logger.debug("Discord typing indicator failed for {}: {}", channel_id, e) + return await asyncio.sleep(8) self._typing_tasks[channel_id] = asyncio.create_task(typing_loop()) diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 6fbab04..77b7294 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -193,6 +193,12 @@ class ChannelManager: timeout=1.0 ) + if msg.metadata.get("_progress"): + if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints: + continue + if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress: + continue + channel = self.channels.get(msg.channel) if channel: try: diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index b0f9bbb..906593b 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -179,6 +179,9 @@ class SlackChannel(BaseChannel): except Exception as e: logger.debug("Slack reactions_add failed: {}", e) + # Thread-scoped session key for channel/group messages + session_key = f"slack:{chat_id}:{thread_ts}" if thread_ts and channel_type != "im" else None + try: await self._handle_message( sender_id=sender_id, @@ -189,8 +192,9 @@ class SlackChannel(BaseChannel): "event": event, "thread_ts": thread_ts, "channel_type": channel_type, - } + }, }, + session_key=session_key, ) except Exception: logger.exception("Error handling Slack message from {}", sender_id) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index f1f9b30..ca71694 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -199,84 +199,34 @@ def onboard(): def _create_workspace_templates(workspace: Path): - """Create default workspace template files.""" - templates = { - "AGENTS.md": """# Agent Instructions + """Create default workspace template files from bundled templates.""" + from importlib.resources import files as pkg_files -You are a helpful AI assistant. Be concise, accurate, and friendly. + templates_dir = pkg_files("nanobot") / "templates" -## Guidelines + for item in templates_dir.iterdir(): + if not item.name.endswith(".md"): + continue + dest = workspace / item.name + if not dest.exists(): + dest.write_text(item.read_text(encoding="utf-8"), encoding="utf-8") + console.print(f" [dim]Created {item.name}[/dim]") -- Always explain what you're doing before taking actions -- Ask for clarification when the request is ambiguous -- Use tools to help accomplish tasks -- Remember important information in memory/MEMORY.md; past events are logged in memory/HISTORY.md -""", - "SOUL.md": """# Soul - -I am nanobot, a lightweight AI assistant. - -## Personality - -- Helpful and friendly -- Concise and to the point -- Curious and eager to learn - -## Values - -- Accuracy over speed -- User privacy and safety -- Transparency in actions -""", - "USER.md": """# User - -Information about the user goes here. - -## Preferences - -- Communication style: (casual/formal) -- Timezone: (your timezone) -- Language: (your preferred language) -""", - } - - for filename, content in templates.items(): - file_path = workspace / filename - if not file_path.exists(): - file_path.write_text(content, encoding="utf-8") - console.print(f" [dim]Created {filename}[/dim]") - - # Create memory directory and MEMORY.md memory_dir = workspace / "memory" memory_dir.mkdir(exist_ok=True) + + memory_template = templates_dir / "memory" / "MEMORY.md" memory_file = memory_dir / "MEMORY.md" if not memory_file.exists(): - memory_file.write_text("""# Long-term Memory - -This file stores important information that should persist across sessions. - -## User Information - -(Important facts about the user) - -## Preferences - -(User preferences learned over time) - -## Important Notes - -(Things to remember) -""", encoding="utf-8") + memory_file.write_text(memory_template.read_text(encoding="utf-8"), encoding="utf-8") console.print(" [dim]Created memory/MEMORY.md[/dim]") - + history_file = memory_dir / "HISTORY.md" if not history_file.exists(): history_file.write_text("", encoding="utf-8") console.print(" [dim]Created memory/HISTORY.md[/dim]") - # Create skills directory for custom user skills - skills_dir = workspace / "skills" - skills_dir.mkdir(exist_ok=True) + (workspace / "skills").mkdir(exist_ok=True) def _make_provider(config: Config): @@ -368,6 +318,7 @@ def gateway( restrict_to_workspace=config.tools.restrict_to_workspace, session_manager=session_manager, mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, ) # Set cron callback (needs agent) @@ -389,20 +340,59 @@ def gateway( return response cron.on_job = on_cron_job - # Create heartbeat service - async def on_heartbeat(prompt: str) -> str: - """Execute heartbeat through the agent.""" - return await agent.process_direct(prompt, session_key="heartbeat") - - heartbeat = HeartbeatService( - workspace=config.workspace_path, - on_heartbeat=on_heartbeat, - interval_s=30 * 60, # 30 minutes - enabled=True - ) - # Create channel manager channels = ChannelManager(config, bus) + + def _pick_heartbeat_target() -> tuple[str, str]: + """Pick a routable channel/chat target for heartbeat-triggered messages.""" + enabled = set(channels.enabled_channels) + # Prefer the most recently updated non-internal session on an enabled channel. + for item in session_manager.list_sessions(): + key = item.get("key") or "" + if ":" not in key: + continue + channel, chat_id = key.split(":", 1) + if channel in {"cli", "system"}: + continue + if channel in enabled and chat_id: + return channel, chat_id + # Fallback keeps prior behavior but remains explicit. + return "cli", "direct" + + # Create heartbeat service + async def on_heartbeat_execute(tasks: str) -> str: + """Phase 2: execute heartbeat tasks through the full agent loop.""" + channel, chat_id = _pick_heartbeat_target() + + async def _silent(*_args, **_kwargs): + pass + + return await agent.process_direct( + tasks, + session_key="heartbeat", + channel=channel, + chat_id=chat_id, + on_progress=_silent, + ) + + async def on_heartbeat_notify(response: str) -> None: + """Deliver a heartbeat response to the user's channel.""" + from nanobot.bus.events import OutboundMessage + channel, chat_id = _pick_heartbeat_target() + if channel == "cli": + return # No external channel available to deliver to + await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) + + hb_cfg = config.gateway.heartbeat + heartbeat = HeartbeatService( + workspace=config.workspace_path, + provider=provider, + model=agent.model, + on_execute=on_heartbeat_execute, + on_notify=on_heartbeat_notify, + interval_s=hb_cfg.interval_s, + enabled=hb_cfg.enabled, + ) if channels.enabled_channels: console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") @@ -484,6 +474,7 @@ def agent( cron_service=cron, restrict_to_workspace=config.tools.restrict_to_workspace, mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, ) # Show spinner when logs are off (no output to miss); skip when logs are on @@ -494,7 +485,12 @@ def agent( # Animated spinner is safe to use with prompt_toolkit input handling return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots") - async def _cli_progress(content: str) -> None: + async def _cli_progress(content: str, *, tool_hint: bool = False) -> None: + ch = agent_loop.channels_config + if ch and tool_hint and not ch.send_tool_hints: + return + if ch and not tool_hint and not ch.send_progress: + return console.print(f" [dim]↳ {content}[/dim]") if message: @@ -535,7 +531,14 @@ def agent( try: msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) if msg.metadata.get("_progress"): - console.print(f" [dim]↳ {msg.content}[/dim]") + is_tool_hint = msg.metadata.get("_tool_hint", False) + ch = agent_loop.channels_config + if ch and is_tool_hint and not ch.send_tool_hints: + pass + elif ch and not is_tool_hint and not ch.send_progress: + pass + else: + console.print(f" [dim]↳ {msg.content}[/dim]") elif not turn_done.is_set(): if msg.content: turn_response.append(msg.content) @@ -961,6 +964,7 @@ def cron_run( exec_config=config.tools.exec, restrict_to_workspace=config.tools.restrict_to_workspace, mcp_servers=config.tools.mcp_servers, + channels_config=config.channels, ) store_path = get_data_dir() / "cron" / "jobs.json" diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index be36536..215f38d 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -168,6 +168,8 @@ class QQConfig(Base): class ChannelsConfig(Base): """Configuration for chat channels.""" + send_progress: bool = True # stream agent's text progress to the channel + send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…")) whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) @@ -185,9 +187,9 @@ class AgentDefaults(Base): workspace: str = "~/.nanobot/workspace" model: str = "anthropic/claude-opus-4-5" max_tokens: int = 8192 - temperature: float = 0.7 - max_tool_iterations: int = 20 - memory_window: int = 50 + temperature: float = 0.1 + max_tool_iterations: int = 40 + memory_window: int = 100 class AgentsConfig(Base): @@ -226,11 +228,19 @@ class ProvidersConfig(Base): github_copilot: ProviderConfig = Field(default_factory=ProviderConfig) # Github Copilot (OAuth) +class HeartbeatConfig(Base): + """Heartbeat service configuration.""" + + enabled: bool = True + interval_s: int = 30 * 60 # 30 minutes + + class GatewayConfig(Base): """Gateway/server configuration.""" host: str = "0.0.0.0" port: int = 18790 + heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig) class WebSearchConfig(Base): diff --git a/nanobot/heartbeat/service.py b/nanobot/heartbeat/service.py index 3c1a6aa..e534017 100644 --- a/nanobot/heartbeat/service.py +++ b/nanobot/heartbeat/service.py @@ -1,92 +1,130 @@ """Heartbeat service - periodic agent wake-up to check for tasks.""" +from __future__ import annotations + import asyncio from pathlib import Path -from typing import Any, Callable, Coroutine +from typing import TYPE_CHECKING, Any, Callable, Coroutine from loguru import logger -# Default interval: 30 minutes -DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60 +if TYPE_CHECKING: + from nanobot.providers.base import LLMProvider -# The prompt sent to agent during heartbeat -HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists). -Follow any instructions or tasks listed there. -If nothing needs attention, reply with just: HEARTBEAT_OK""" - -# Token that indicates "nothing to do" -HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK" - - -def _is_heartbeat_empty(content: str | None) -> bool: - """Check if HEARTBEAT.md has no actionable content.""" - if not content: - return True - - # Lines to skip: empty, headers, HTML comments, empty checkboxes - skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"} - - for line in content.split("\n"): - line = line.strip() - if not line or line.startswith("#") or line.startswith("