Merge branch 'main' into pr-1083

This commit is contained in:
Re-bin
2026-02-25 15:50:00 +00:00
19 changed files with 522 additions and 142 deletions

View File

@@ -2,5 +2,5 @@
nanobot - A lightweight AI agent framework
"""
__version__ = "0.1.4"
__version__ = "0.1.4.post2"
__logo__ = "🐈"

View File

@@ -3,6 +3,8 @@
import base64
import mimetypes
import platform
import time
from datetime import datetime
from pathlib import Path
from typing import Any
@@ -72,10 +74,6 @@ Skills with available="false" need dependencies installed first - you can try in
def _get_identity(self) -> str:
"""Get the core identity section."""
from datetime import datetime
import time as _time
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = _time.strftime("%Z") or "UTC"
workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
@@ -84,9 +82,6 @@ Skills with available="false" need dependencies installed first - you can try in
You are nanobot, a helpful AI assistant.
## Current Time
{now} ({tz})
## Runtime
{runtime}
@@ -108,6 +103,23 @@ Reply directly with text for conversations. Only use the 'message' tool to send
## Memory
- Remember important facts: write to {workspace_path}/memory/MEMORY.md
- Recall past events: grep {workspace_path}/memory/HISTORY.md"""
@staticmethod
def _inject_runtime_context(
user_content: str | list[dict[str, Any]],
channel: str | None,
chat_id: str | None,
) -> str | list[dict[str, Any]]:
"""Append dynamic runtime context to the tail of the user message."""
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = time.strftime("%Z") or "UTC"
lines = [f"Current Time: {now} ({tz})"]
if channel and chat_id:
lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
block = "[Runtime Context]\n" + "\n".join(lines)
if isinstance(user_content, str):
return f"{user_content}\n\n{block}"
return [*user_content, {"type": "text", "text": block}]
def _load_bootstrap_files(self) -> str:
"""Load all bootstrap files from workspace."""
@@ -148,8 +160,6 @@ Reply directly with text for conversations. Only use the 'message' tool to send
# System prompt
system_prompt = self.build_system_prompt(skill_names)
if channel and chat_id:
system_prompt += f"\n\n## Current Session\nChannel: {channel}\nChat ID: {chat_id}"
messages.append({"role": "system", "content": system_prompt})
# History
@@ -157,6 +167,7 @@ Reply directly with text for conversations. Only use the 'message' tool to send
# Current message (with optional image attachments)
user_content = self._build_user_content(current_message, media)
user_content = self._inject_runtime_context(user_content, channel, chat_id)
messages.append({"role": "user", "content": user_content})
return messages

View File

@@ -125,6 +125,13 @@ class MemoryStore:
return False
args = response.tool_calls[0].arguments
# Some providers return arguments as a JSON string instead of dict
if isinstance(args, str):
args = json.loads(args)
if not isinstance(args, dict):
logger.warning("Memory consolidation: unexpected arguments type {}", type(args).__name__)
return False
if entry := args.get("history_entry"):
if not isinstance(entry, str):
entry = json.dumps(entry, ensure_ascii=False)

View File

@@ -69,20 +69,18 @@ async def connect_mcp_servers(
read, write = await stack.enter_async_context(stdio_client(params))
elif cfg.url:
from mcp.client.streamable_http import streamable_http_client
if cfg.headers:
http_client = await stack.enter_async_context(
httpx.AsyncClient(
headers=cfg.headers,
follow_redirects=True
)
)
read, write, _ = await stack.enter_async_context(
streamable_http_client(cfg.url, http_client=http_client)
)
else:
read, write, _ = await stack.enter_async_context(
streamable_http_client(cfg.url)
# Always provide an explicit httpx client so MCP HTTP transport does not
# inherit httpx's default 5s timeout and preempt the higher-level tool timeout.
http_client = await stack.enter_async_context(
httpx.AsyncClient(
headers=cfg.headers or None,
follow_redirects=True,
timeout=None,
)
)
read, write, _ = await stack.enter_async_context(
streamable_http_client(cfg.url, http_client=http_client)
)
else:
logger.warning("MCP server '{}': no command or url configured, skipping", name)
continue

View File

@@ -36,19 +36,7 @@ class ToolRegistry:
return [tool.to_schema() for tool in self._tools.values()]
async def execute(self, name: str, params: dict[str, Any]) -> str:
"""
Execute a tool by name with given parameters.
Args:
name: Tool name.
params: Tool parameters.
Returns:
Tool execution result as string.
Raises:
KeyError: If tool not found.
"""
"""Execute a tool by name with given parameters."""
_HINT = "\n\n[Analyze the error above and try a different approach.]"
tool = self._tools.get(name)

View File

@@ -9,12 +9,7 @@ if TYPE_CHECKING:
class SpawnTool(Tool):
"""
Tool to spawn a subagent for background task execution.
The subagent runs asynchronously and announces its result back
to the main agent when complete.
"""
"""Tool to spawn a subagent for background task execution."""
def __init__(self, manager: "SubagentManager"):
self._manager = manager

View File

@@ -58,12 +58,21 @@ class WebSearchTool(Tool):
}
def __init__(self, api_key: str | None = None, max_results: int = 5):
self.api_key = api_key or os.environ.get("BRAVE_API_KEY", "")
self._init_api_key = api_key
self.max_results = max_results
@property
def api_key(self) -> str:
"""Resolve API key at call time so env/config changes are picked up."""
return self._init_api_key or os.environ.get("BRAVE_API_KEY", "")
async def execute(self, query: str, count: int | None = None, **kwargs: Any) -> str:
if not self.api_key:
return "Error: BRAVE_API_KEY not configured"
return (
"Error: Brave Search API key not configured. "
"Set it in ~/.nanobot/config.json under tools.web.search.apiKey "
"(or export BRAVE_API_KEY), then restart the gateway."
)
try:
n = min(max(count or self.max_results, 1), 10)
@@ -71,7 +80,7 @@ class WebSearchTool(Tool):
r = await client.get(
"https://api.search.brave.com/res/v1/web/search",
params={"q": query, "count": n},
headers={"Accept": "application/json", "X-Subscription-Token": self.api_key},
headers={"Accept": "application/json", "X-Subscription-Token": api_key},
timeout=10.0
)
r.raise_for_status()

View File

@@ -108,11 +108,6 @@ class EmailChannel(BaseChannel):
logger.warning("Skip email send: consent_granted is false")
return
force_send = bool((msg.metadata or {}).get("force_send"))
if not self.config.auto_reply_enabled and not force_send:
logger.info("Skip automatic email reply: auto_reply_enabled is false")
return
if not self.config.smtp_host:
logger.warning("Email channel SMTP host not configured")
return
@@ -122,6 +117,15 @@ class EmailChannel(BaseChannel):
logger.warning("Email channel missing recipient address")
return
# Determine if this is a reply (recipient has sent us an email before)
is_reply = to_addr in self._last_subject_by_chat
force_send = bool((msg.metadata or {}).get("force_send"))
# autoReplyEnabled only controls automatic replies, not proactive sends
if is_reply and not self.config.auto_reply_enabled and not force_send:
logger.info("Skip automatic email reply to {}: auto_reply_enabled is false", to_addr)
return
base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply")
subject = self._reply_subject(base_subject)
if msg.metadata and isinstance(msg.metadata.get("subject"), str):

View File

@@ -180,21 +180,25 @@ def _extract_element_content(element: dict) -> list[str]:
return parts
def _extract_post_text(content_json: dict) -> str:
"""Extract plain text from Feishu post (rich text) message content.
def _extract_post_content(content_json: dict) -> tuple[str, list[str]]:
"""Extract text and image keys from Feishu post (rich text) message content.
Supports two formats:
1. Direct format: {"title": "...", "content": [...]}
2. Localized format: {"zh_cn": {"title": "...", "content": [...]}}
Returns:
(text, image_keys) - extracted text and list of image keys
"""
def extract_from_lang(lang_content: dict) -> str | None:
def extract_from_lang(lang_content: dict) -> tuple[str | None, list[str]]:
if not isinstance(lang_content, dict):
return None
return None, []
title = lang_content.get("title", "")
content_blocks = lang_content.get("content", [])
if not isinstance(content_blocks, list):
return None
return None, []
text_parts = []
image_keys = []
if title:
text_parts.append(title)
for block in content_blocks:
@@ -209,22 +213,36 @@ def _extract_post_text(content_json: dict) -> str:
text_parts.append(element.get("text", ""))
elif tag == "at":
text_parts.append(f"@{element.get('user_name', 'user')}")
return " ".join(text_parts).strip() if text_parts else None
elif tag == "img":
img_key = element.get("image_key")
if img_key:
image_keys.append(img_key)
text = " ".join(text_parts).strip() if text_parts else None
return text, image_keys
# Try direct format first
if "content" in content_json:
result = extract_from_lang(content_json)
if result:
return result
text, images = extract_from_lang(content_json)
if text or images:
return text or "", images
# Try localized format
for lang_key in ("zh_cn", "en_us", "ja_jp"):
lang_content = content_json.get(lang_key)
result = extract_from_lang(lang_content)
if result:
return result
text, images = extract_from_lang(lang_content)
if text or images:
return text or "", images
return ""
return "", []
def _extract_post_text(content_json: dict) -> str:
"""Extract plain text from Feishu post (rich text) message content.
Legacy wrapper for _extract_post_content, returns only text.
"""
text, _ = _extract_post_content(content_json)
return text
class FeishuChannel(BaseChannel):
@@ -691,9 +709,17 @@ class FeishuChannel(BaseChannel):
content_parts.append(text)
elif msg_type == "post":
text = _extract_post_text(content_json)
text, image_keys = _extract_post_content(content_json)
if text:
content_parts.append(text)
# Download images embedded in post
for img_key in image_keys:
file_path, content_text = await self._download_and_save_media(
"image", {"image_key": img_key}, message_id
)
if file_path:
media_paths.append(file_path)
content_parts.append(content_text)
elif msg_type in ("image", "audio", "file", "media"):
file_path, content_text = await self._download_and_save_media(msg_type, content_json, message_id)

View File

@@ -229,6 +229,11 @@ class SlackChannel(BaseChannel):
return re.sub(rf"<@{re.escape(self._bot_user_id)}>\s*", "", text).strip()
_TABLE_RE = re.compile(r"(?m)^\|.*\|$(?:\n\|[\s:|-]*\|$)(?:\n\|.*\|$)*")
_CODE_FENCE_RE = re.compile(r"```[\s\S]*?```")
_INLINE_CODE_RE = re.compile(r"`[^`]+`")
_LEFTOVER_BOLD_RE = re.compile(r"\*\*(.+?)\*\*")
_LEFTOVER_HEADER_RE = re.compile(r"^#{1,6}\s+(.+)$", re.MULTILINE)
_BARE_URL_RE = re.compile(r"(?<![|<])(https?://\S+)")
@classmethod
def _to_mrkdwn(cls, text: str) -> str:
@@ -236,7 +241,26 @@ class SlackChannel(BaseChannel):
if not text:
return ""
text = cls._TABLE_RE.sub(cls._convert_table, text)
return slackify_markdown(text)
return cls._fixup_mrkdwn(slackify_markdown(text))
@classmethod
def _fixup_mrkdwn(cls, text: str) -> str:
"""Fix markdown artifacts that slackify_markdown misses."""
code_blocks: list[str] = []
def _save_code(m: re.Match) -> str:
code_blocks.append(m.group(0))
return f"\x00CB{len(code_blocks) - 1}\x00"
text = cls._CODE_FENCE_RE.sub(_save_code, text)
text = cls._INLINE_CODE_RE.sub(_save_code, text)
text = cls._LEFTOVER_BOLD_RE.sub(r"*\1*", text)
text = cls._LEFTOVER_HEADER_RE.sub(r"*\1*", text)
text = cls._BARE_URL_RE.sub(lambda m: m.group(0).replace("&amp;", "&"), text)
for i, block in enumerate(code_blocks):
text = text.replace(f"\x00CB{i}\x00", block)
return text
@staticmethod
def _convert_table(match: re.Match) -> str:

View File

@@ -360,19 +360,19 @@ def gateway(
return "cli", "direct"
# Create heartbeat service
async def on_heartbeat(prompt: str) -> str:
"""Execute heartbeat through the agent."""
async def on_heartbeat_execute(tasks: str) -> str:
"""Phase 2: execute heartbeat tasks through the full agent loop."""
channel, chat_id = _pick_heartbeat_target()
async def _silent(*_args, **_kwargs):
pass
return await agent.process_direct(
prompt,
tasks,
session_key="heartbeat",
channel=channel,
chat_id=chat_id,
on_progress=_silent, # suppress: heartbeat should not push progress to external channels
on_progress=_silent,
)
async def on_heartbeat_notify(response: str) -> None:
@@ -383,12 +383,15 @@ def gateway(
return # No external channel available to deliver to
await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
hb_cfg = config.gateway.heartbeat
heartbeat = HeartbeatService(
workspace=config.workspace_path,
on_heartbeat=on_heartbeat,
provider=provider,
model=agent.model,
on_execute=on_heartbeat_execute,
on_notify=on_heartbeat_notify,
interval_s=30 * 60, # 30 minutes
enabled=True
interval_s=hb_cfg.interval_s,
enabled=hb_cfg.enabled,
)
if channels.enabled_channels:
@@ -400,7 +403,7 @@ def gateway(
if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
console.print(f"[green]✓[/green] Heartbeat: every 30m")
console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s")
async def run():
try:

View File

@@ -228,11 +228,19 @@ class ProvidersConfig(Base):
github_copilot: ProviderConfig = Field(default_factory=ProviderConfig) # Github Copilot (OAuth)
class HeartbeatConfig(Base):
"""Heartbeat service configuration."""
enabled: bool = True
interval_s: int = 30 * 60 # 30 minutes
class GatewayConfig(Base):
"""Gateway/server configuration."""
host: str = "0.0.0.0"
port: int = 18790
heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig)
class WebSearchConfig(Base):

View File

@@ -1,80 +1,110 @@
"""Heartbeat service - periodic agent wake-up to check for tasks."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Callable, Coroutine
from typing import TYPE_CHECKING, Any, Callable, Coroutine
from loguru import logger
# Default interval: 30 minutes
DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60
if TYPE_CHECKING:
from nanobot.providers.base import LLMProvider
# Token the agent replies with when there is nothing to report
HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
# The prompt sent to agent during heartbeat
HEARTBEAT_PROMPT = (
"Read HEARTBEAT.md in your workspace and follow any instructions listed there. "
f"If nothing needs attention, reply with exactly: {HEARTBEAT_OK_TOKEN}"
)
def _is_heartbeat_empty(content: str | None) -> bool:
"""Check if HEARTBEAT.md has no actionable content."""
if not content:
return True
# Lines to skip: empty, headers, HTML comments, empty checkboxes
skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
for line in content.split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("<!--") or line in skip_patterns:
continue
return False # Found actionable content
return True
_HEARTBEAT_TOOL = [
{
"type": "function",
"function": {
"name": "heartbeat",
"description": "Report heartbeat decision after reviewing tasks.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["skip", "run"],
"description": "skip = nothing to do, run = has active tasks",
},
"tasks": {
"type": "string",
"description": "Natural-language summary of active tasks (required for run)",
},
},
"required": ["action"],
},
},
}
]
class HeartbeatService:
"""
Periodic heartbeat service that wakes the agent to check for tasks.
The agent reads HEARTBEAT.md from the workspace and executes any tasks
listed there. If it has something to report, the response is forwarded
to the user via on_notify. If nothing needs attention, the agent replies
HEARTBEAT_OK and the response is silently dropped.
Phase 1 (decision): reads HEARTBEAT.md and asks the LLM — via a virtual
tool call — whether there are active tasks. This avoids free-text parsing
and the unreliable HEARTBEAT_OK token.
Phase 2 (execution): only triggered when Phase 1 returns ``run``. The
``on_execute`` callback runs the task through the full agent loop and
returns the result to deliver.
"""
def __init__(
self,
workspace: Path,
on_heartbeat: Callable[[str], Coroutine[Any, Any, str]] | None = None,
provider: LLMProvider,
model: str,
on_execute: Callable[[str], Coroutine[Any, Any, str]] | None = None,
on_notify: Callable[[str], Coroutine[Any, Any, None]] | None = None,
interval_s: int = DEFAULT_HEARTBEAT_INTERVAL_S,
interval_s: int = 30 * 60,
enabled: bool = True,
):
self.workspace = workspace
self.on_heartbeat = on_heartbeat
self.provider = provider
self.model = model
self.on_execute = on_execute
self.on_notify = on_notify
self.interval_s = interval_s
self.enabled = enabled
self._running = False
self._task: asyncio.Task | None = None
@property
def heartbeat_file(self) -> Path:
return self.workspace / "HEARTBEAT.md"
def _read_heartbeat_file(self) -> str | None:
"""Read HEARTBEAT.md content."""
if self.heartbeat_file.exists():
try:
return self.heartbeat_file.read_text(encoding="utf-8")
except Exception:
return None
return None
async def _decide(self, content: str) -> tuple[str, str]:
"""Phase 1: ask LLM to decide skip/run via virtual tool call.
Returns (action, tasks) where action is 'skip' or 'run'.
"""
response = await self.provider.chat(
messages=[
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
{"role": "user", "content": (
"Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n"
f"{content}"
)},
],
tools=_HEARTBEAT_TOOL,
model=self.model,
)
if not response.has_tool_calls:
return "skip", ""
args = response.tool_calls[0].arguments
return args.get("action", "skip"), args.get("tasks", "")
async def start(self) -> None:
"""Start the heartbeat service."""
if not self.enabled:
@@ -83,18 +113,18 @@ class HeartbeatService:
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("Heartbeat started (every {}s)", self.interval_s)
def stop(self) -> None:
"""Stop the heartbeat service."""
self._running = False
if self._task:
self._task.cancel()
self._task = None
async def _run_loop(self) -> None:
"""Main heartbeat loop."""
while self._running:
@@ -106,32 +136,38 @@ class HeartbeatService:
break
except Exception as e:
logger.error("Heartbeat error: {}", e)
async def _tick(self) -> None:
"""Execute a single heartbeat tick."""
content = self._read_heartbeat_file()
# Skip if HEARTBEAT.md is empty or doesn't exist
if _is_heartbeat_empty(content):
logger.debug("Heartbeat: no tasks (HEARTBEAT.md empty)")
if not content:
logger.debug("Heartbeat: HEARTBEAT.md missing or empty")
return
logger.info("Heartbeat: checking for tasks...")
if self.on_heartbeat:
try:
response = await self.on_heartbeat(HEARTBEAT_PROMPT)
if HEARTBEAT_OK_TOKEN in response.upper():
logger.info("Heartbeat: OK (nothing to report)")
else:
try:
action, tasks = await self._decide(content)
if action != "run":
logger.info("Heartbeat: OK (nothing to report)")
return
logger.info("Heartbeat: tasks found, executing...")
if self.on_execute:
response = await self.on_execute(tasks)
if response and self.on_notify:
logger.info("Heartbeat: completed, delivering response")
if self.on_notify:
await self.on_notify(response)
except Exception:
logger.exception("Heartbeat execution failed")
await self.on_notify(response)
except Exception:
logger.exception("Heartbeat execution failed")
async def trigger_now(self) -> str | None:
"""Manually trigger a heartbeat."""
if self.on_heartbeat:
return await self.on_heartbeat(HEARTBEAT_PROMPT)
return None
content = self._read_heartbeat_file()
if not content:
return None
action, tasks = await self._decide(content)
if action != "run" or not self.on_execute:
return None
return await self.on_execute(tasks)

View File

@@ -12,8 +12,9 @@ from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
from nanobot.providers.registry import find_by_model, find_gateway
# Standard OpenAI chat-completion message keys; extras (e.g. reasoning_content) are stripped for strict providers.
_ALLOWED_MSG_KEYS = frozenset({"role", "content", "tool_calls", "tool_call_id", "name"})
# Standard OpenAI chat-completion message keys plus reasoning_content for
# thinking-enabled models (Kimi k2.5, DeepSeek-R1, etc.).
_ALLOWED_MSG_KEYS = frozenset({"role", "content", "tool_calls", "tool_call_id", "name", "reasoning_content"})
class LiteLLMProvider(LLMProvider):