Merge remote-tracking branch 'origin/main' into pr-1206

This commit is contained in:
Re-bin
2026-02-27 02:17:04 +00:00
16 changed files with 2412 additions and 196 deletions

View File

@@ -138,12 +138,13 @@ Add or merge these **two parts** into your config (other options have defaults).
}
```
*Set your model*:
*Set your model* (optionally pin a provider — defaults to auto-detection):
```json
{
"agents": {
"defaults": {
"model": "anthropic/claude-opus-4-5"
"model": "anthropic/claude-opus-4-5",
"provider": "openrouter"
}
}
}
@@ -308,6 +309,72 @@ nanobot gateway
</details>
<details>
<summary><b>Matrix (Element)</b></summary>
Install Matrix dependencies first:
```bash
pip install nanobot-ai[matrix]
```
**1. Create/choose a Matrix account**
- Create or reuse a Matrix account on your homeserver (for example `matrix.org`).
- Confirm you can log in with Element.
**2. Get credentials**
- You need:
- `userId` (example: `@nanobot:matrix.org`)
- `accessToken`
- `deviceId` (recommended so sync tokens can be restored across restarts)
- You can obtain these from your homeserver login API (`/_matrix/client/v3/login`) or from your client's advanced session settings.
**3. Configure**
```json
{
"channels": {
"matrix": {
"enabled": true,
"homeserver": "https://matrix.org",
"userId": "@nanobot:matrix.org",
"accessToken": "syt_xxx",
"deviceId": "NANOBOT01",
"e2eeEnabled": true,
"allowFrom": [],
"groupPolicy": "open",
"groupAllowFrom": [],
"allowRoomMentions": false,
"maxMediaBytes": 20971520
}
}
}
```
> Keep a persistent `matrix-store` and stable `deviceId` — encrypted session state is lost if these change across restarts.
| Option | Description |
|--------|-------------|
| `allowFrom` | User IDs allowed to interact. Empty = all senders. |
| `groupPolicy` | `open` (default), `mention`, or `allowlist`. |
| `groupAllowFrom` | Room allowlist (used when policy is `allowlist`). |
| `allowRoomMentions` | Accept `@room` mentions in mention mode. |
| `e2eeEnabled` | E2EE support (default `true`). Set `false` for plaintext-only. |
| `maxMediaBytes` | Max attachment size (default `20MB`). Set `0` to block all media. |
**4. Run**
```bash
nanobot gateway
```
</details>
<details>
<summary><b>WhatsApp</b></summary>
@@ -807,6 +874,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
| Option | Default | Description |
|--------|---------|-------------|
| `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. |
| `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). |
| `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. |

View File

@@ -13,14 +13,10 @@ from nanobot.agent.skills import SkillsLoader
class ContextBuilder:
"""
Builds the context (system prompt + messages) for the agent.
Assembles bootstrap files, memory, skills, and conversation history
into a coherent prompt for the LLM.
"""
"""Builds the context (system prompt + messages) for the agent."""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
def __init__(self, workspace: Path):
self.workspace = workspace
@@ -28,39 +24,23 @@ class ContextBuilder:
self.skills = SkillsLoader(workspace)
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""
Build the system prompt from bootstrap files, memory, and skills.
Args:
skill_names: Optional list of skills to include.
Returns:
Complete system prompt.
"""
parts = []
# Core identity
parts.append(self._get_identity())
# Bootstrap files
"""Build the system prompt from identity, bootstrap files, memory, and skills."""
parts = [self._get_identity()]
bootstrap = self._load_bootstrap_files()
if bootstrap:
parts.append(bootstrap)
# Memory context
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
# Skills - progressive loading
# 1. Always-loaded skills: include full content
always_skills = self.skills.get_always_skills()
if always_skills:
always_content = self.skills.load_skills_for_context(always_skills)
if always_content:
parts.append(f"# Active Skills\n\n{always_content}")
# 2. Available skills: only show summary (agent uses read_file to load)
skills_summary = self.skills.build_skills_summary()
if skills_summary:
parts.append(f"""# Skills
@@ -69,7 +49,7 @@ The following skills extend your capabilities. To use a skill, read its SKILL.md
Skills with available="false" need dependencies installed first - you can try installing them with apt/brew.
{skills_summary}""")
return "\n\n---\n\n".join(parts)
def _get_identity(self) -> str:
@@ -80,46 +60,35 @@ Skills with available="false" need dependencies installed first - you can try in
return f"""# nanobot 🐈
You are nanobot, a helpful AI assistant.
You are nanobot, a helpful AI assistant.
## Runtime
{runtime}
## Workspace
Your workspace is at: {workspace_path}
- Long-term memory: {workspace_path}/memory/MEMORY.md
- Long-term memory: {workspace_path}/memory/MEMORY.md (write important facts here)
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable)
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel.
## Tool Call Guidelines
- Before calling tools, you may briefly state your intent (e.g. "Let me check that"), but NEVER predict or describe the expected result before receiving it.
- Before modifying a file, read it first to confirm its current content.
- Do not assume a file or directory exists — use list_dir or read_file to verify.
## nanobot Guidelines
- State intent before tool calls, but NEVER predict or claim results before receiving them.
- Before modifying a file, read it first. Do not assume files or directories exist.
- After writing or editing a file, re-read it if accuracy matters.
- If a tool call fails, analyze the error before retrying with a different approach.
- Ask for clarification when the request is ambiguous.
## Memory
- Remember important facts: write to {workspace_path}/memory/MEMORY.md
- Recall past events: grep {workspace_path}/memory/HISTORY.md"""
Reply directly with text for conversations. Only use the 'message' tool to send to a specific chat channel."""
@staticmethod
def _inject_runtime_context(
user_content: str | list[dict[str, Any]],
channel: str | None,
chat_id: str | None,
) -> str | list[dict[str, Any]]:
"""Append dynamic runtime context to the tail of the user message."""
def _build_runtime_context(channel: str | None, chat_id: str | None) -> str:
"""Build untrusted runtime metadata block for injection before the user message."""
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = time.strftime("%Z") or "UTC"
lines = [f"Current Time: {now} ({tz})"]
if channel and chat_id:
lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
block = "[Runtime Context]\n" + "\n".join(lines)
if isinstance(user_content, str):
return f"{user_content}\n\n{block}"
return [*user_content, {"type": "text", "text": block}]
return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines)
def _load_bootstrap_files(self) -> str:
"""Load all bootstrap files from workspace."""
@@ -142,35 +111,13 @@ Reply directly with text for conversations. Only use the 'message' tool to send
channel: str | None = None,
chat_id: str | None = None,
) -> list[dict[str, Any]]:
"""
Build the complete message list for an LLM call.
Args:
history: Previous conversation messages.
current_message: The new user message.
skill_names: Optional skills to include.
media: Optional list of local file paths for images/media.
channel: Current channel (telegram, feishu, etc.).
chat_id: Current chat/user ID.
Returns:
List of messages including system prompt.
"""
messages = []
# System prompt
system_prompt = self.build_system_prompt(skill_names)
messages.append({"role": "system", "content": system_prompt})
# History
messages.extend(history)
# Current message (with optional image attachments)
user_content = self._build_user_content(current_message, media)
user_content = self._inject_runtime_context(user_content, channel, chat_id)
messages.append({"role": "user", "content": user_content})
return messages
"""Build the complete message list for an LLM call."""
return [
{"role": "system", "content": self.build_system_prompt(skill_names)},
*history,
{"role": "user", "content": self._build_runtime_context(channel, chat_id)},
{"role": "user", "content": self._build_user_content(current_message, media)},
]
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional base64-encoded images."""
@@ -191,63 +138,24 @@ Reply directly with text for conversations. Only use the 'message' tool to send
return images + [{"type": "text", "text": text}]
def add_tool_result(
self,
messages: list[dict[str, Any]],
tool_call_id: str,
tool_name: str,
result: str
self, messages: list[dict[str, Any]],
tool_call_id: str, tool_name: str, result: str,
) -> list[dict[str, Any]]:
"""
Add a tool result to the message list.
Args:
messages: Current message list.
tool_call_id: ID of the tool call.
tool_name: Name of the tool.
result: Tool execution result.
Returns:
Updated message list.
"""
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": tool_name,
"content": result
})
"""Add a tool result to the message list."""
messages.append({"role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result})
return messages
def add_assistant_message(
self,
messages: list[dict[str, Any]],
self, messages: list[dict[str, Any]],
content: str | None,
tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
) -> list[dict[str, Any]]:
"""
Add an assistant message to the message list.
Args:
messages: Current message list.
content: Message content.
tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns:
Updated message list.
"""
msg: dict[str, Any] = {"role": "assistant"}
# Always include content — some providers (e.g. StepFun) reject
# assistant messages that omit the key entirely.
msg["content"] = content
"""Add an assistant message to the message list."""
msg: dict[str, Any] = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
# Include reasoning content when provided (required by some thinking models)
if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content
messages.append(msg)
return messages

View File

@@ -99,6 +99,8 @@ class AgentLoop:
self._consolidating: set[str] = set() # Session keys with consolidation in progress
self._consolidation_tasks: set[asyncio.Task] = set() # Strong refs to in-flight tasks
self._consolidation_locks: dict[str, asyncio.Lock] = {}
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._processing_lock = asyncio.Lock()
self._register_default_tools()
def _register_default_tools(self) -> None:
@@ -110,6 +112,7 @@ class AgentLoop:
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
path_append=self.exec_config.path_append,
))
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(WebFetchTool())
@@ -225,7 +228,11 @@ class AgentLoop:
messages, tool_call.id, tool_call.name, result
)
else:
final_content = self._strip_think(response.content)
clean = self._strip_think(response.content)
messages = self.context.add_assistant_message(
messages, clean, reasoning_content=response.reasoning_content,
)
final_content = clean
break
if final_content is None and iteration >= self.max_iterations:
@@ -238,35 +245,62 @@ class AgentLoop:
return final_content, tools_used, messages
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
"""Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
try:
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
try:
response = await self._process_message(msg)
if response is not None:
await self.bus.publish_outbound(response)
elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {},
))
except Exception as e:
logger.error("Error processing message: {}", e)
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}"
))
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
continue
if msg.content.strip().lower() == "/stop":
await self._handle_stop(msg)
else:
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(msg.session_key, []).append(task)
task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)
async def _handle_stop(self, msg: InboundMessage) -> None:
"""Cancel all active tasks and subagents for the session."""
tasks = self._active_tasks.pop(msg.session_key, [])
cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try:
await t
except (asyncio.CancelledError, Exception):
pass
sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop."
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
))
async def _dispatch(self, msg: InboundMessage) -> None:
"""Process a message under the global lock."""
async with self._processing_lock:
try:
response = await self._process_message(msg)
if response is not None:
await self.bus.publish_outbound(response)
elif msg.channel == "cli":
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="", metadata=msg.metadata or {},
))
except asyncio.CancelledError:
logger.info("Task cancelled for session {}", msg.session_key)
raise
except Exception:
logger.exception("Error processing message for session {}", msg.session_key)
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id,
content="Sorry, I encountered an error.",
))
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
@@ -358,7 +392,7 @@ class AgentLoop:
content="New session started.")
if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")
unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
@@ -442,6 +476,14 @@ class AgentLoop:
content = entry["content"]
if len(content) > self._TOOL_RESULT_MAX_CHARS:
entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)"
if entry.get("role") == "user" and isinstance(entry.get("content"), list):
entry["content"] = [
{"type": "text", "text": "[image]"} if (
c.get("type") == "image_url"
and c.get("image_url", {}).get("url", "").startswith("data:image/")
) else c
for c in entry["content"]
]
entry.setdefault("timestamp", datetime.now().isoformat())
session.messages.append(entry)
session.updated_at = datetime.now()

View File

@@ -18,13 +18,7 @@ from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
class SubagentManager:
"""
Manages background subagent execution.
Subagents are lightweight agent instances that run in the background
to handle specific tasks. They share the same LLM provider but have
isolated context and a focused system prompt.
"""
"""Manages background subagent execution."""
def __init__(
self,
@@ -49,6 +43,7 @@ class SubagentManager:
self.exec_config = exec_config or ExecToolConfig()
self.restrict_to_workspace = restrict_to_workspace
self._running_tasks: dict[str, asyncio.Task[None]] = {}
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
async def spawn(
self,
@@ -56,35 +51,28 @@ class SubagentManager:
label: str | None = None,
origin_channel: str = "cli",
origin_chat_id: str = "direct",
session_key: str | None = None,
) -> str:
"""
Spawn a subagent to execute a task in the background.
Args:
task: The task description for the subagent.
label: Optional human-readable label for the task.
origin_channel: The channel to announce results to.
origin_chat_id: The chat ID to announce results to.
Returns:
Status message indicating the subagent was started.
"""
"""Spawn a subagent to execute a task in the background."""
task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {
"channel": origin_channel,
"chat_id": origin_chat_id,
}
# Create background task
origin = {"channel": origin_channel, "chat_id": origin_chat_id}
bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin)
)
self._running_tasks[task_id] = bg_task
# Cleanup when done
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))
if session_key:
self._session_tasks.setdefault(session_key, set()).add(task_id)
def _cleanup(_: asyncio.Task) -> None:
self._running_tasks.pop(task_id, None)
if session_key and (ids := self._session_tasks.get(session_key)):
ids.discard(task_id)
if not ids:
del self._session_tasks[session_key]
bg_task.add_done_callback(_cleanup)
logger.info("Spawned subagent [{}]: {}", task_id, display_label)
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."
@@ -111,6 +99,7 @@ class SubagentManager:
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
path_append=self.exec_config.path_append,
))
tools.register(WebSearchTool(api_key=self.brave_api_key))
tools.register(WebFetchTool())
@@ -252,6 +241,16 @@ Skills are available at: {self.workspace}/skills/ (read SKILL.md files as needed
When you have completed the task, provide a clear summary of your findings or actions."""
async def cancel_by_session(self, session_key: str) -> int:
"""Cancel all subagents for the given session. Returns count cancelled."""
tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, [])
if tid in self._running_tasks and not self._running_tasks[tid].done()]
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
return len(tasks)
def get_running_count(self) -> int:
"""Return the number of currently running subagents."""
return len(self._running_tasks)

View File

@@ -19,6 +19,7 @@ class ExecTool(Tool):
deny_patterns: list[str] | None = None,
allow_patterns: list[str] | None = None,
restrict_to_workspace: bool = False,
path_append: str = "",
):
self.timeout = timeout
self.working_dir = working_dir
@@ -35,6 +36,7 @@ class ExecTool(Tool):
]
self.allow_patterns = allow_patterns or []
self.restrict_to_workspace = restrict_to_workspace
self.path_append = path_append
@property
def name(self) -> str:
@@ -67,12 +69,17 @@ class ExecTool(Tool):
if guard_error:
return guard_error
env = os.environ.copy()
if self.path_append:
env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
)
try:

View File

@@ -15,11 +15,13 @@ class SpawnTool(Tool):
self._manager = manager
self._origin_channel = "cli"
self._origin_chat_id = "direct"
self._session_key = "cli:direct"
def set_context(self, channel: str, chat_id: str) -> None:
"""Set the origin context for subagent announcements."""
self._origin_channel = channel
self._origin_chat_id = chat_id
self._session_key = f"{channel}:{chat_id}"
@property
def name(self) -> str:
@@ -57,4 +59,5 @@ class SpawnTool(Tool):
label=label,
origin_channel=self._origin_channel,
origin_chat_id=self._origin_chat_id,
session_key=self._session_key,
)

682
nanobot/channels/matrix.py Normal file
View File

@@ -0,0 +1,682 @@
"""Matrix (Element) channel — inbound sync + outbound message/media delivery."""
import asyncio
import logging
import mimetypes
from pathlib import Path
from typing import Any, TypeAlias
from loguru import logger
try:
import nh3
from mistune import create_markdown
from nio import (
AsyncClient, AsyncClientConfig, ContentRepositoryConfigError,
DownloadError, InviteEvent, JoinError, MatrixRoom, MemoryDownloadResponse,
RoomEncryptedMedia, RoomMessage, RoomMessageMedia, RoomMessageText,
RoomSendError, RoomTypingError, SyncError, UploadError,
)
from nio.crypto.attachments import decrypt_attachment
from nio.exceptions import EncryptionError
except ImportError as e:
raise ImportError(
"Matrix dependencies not installed. Run: pip install nanobot-ai[matrix]"
) from e
from nanobot.bus.events import OutboundMessage
from nanobot.channels.base import BaseChannel
from nanobot.config.loader import get_data_dir
from nanobot.utils.helpers import safe_filename
TYPING_NOTICE_TIMEOUT_MS = 30_000
# Must stay below TYPING_NOTICE_TIMEOUT_MS so the indicator doesn't expire mid-processing.
TYPING_KEEPALIVE_INTERVAL_MS = 20_000
MATRIX_HTML_FORMAT = "org.matrix.custom.html"
_ATTACH_MARKER = "[attachment: {}]"
_ATTACH_TOO_LARGE = "[attachment: {} - too large]"
_ATTACH_FAILED = "[attachment: {} - download failed]"
_ATTACH_UPLOAD_FAILED = "[attachment: {} - upload failed]"
_DEFAULT_ATTACH_NAME = "attachment"
_MSGTYPE_MAP = {"m.image": "image", "m.audio": "audio", "m.video": "video", "m.file": "file"}
MATRIX_MEDIA_EVENT_FILTER = (RoomMessageMedia, RoomEncryptedMedia)
MatrixMediaEvent: TypeAlias = RoomMessageMedia | RoomEncryptedMedia
MATRIX_MARKDOWN = create_markdown(
escape=True,
plugins=["table", "strikethrough", "url", "superscript", "subscript"],
)
MATRIX_ALLOWED_HTML_TAGS = {
"p", "a", "strong", "em", "del", "code", "pre", "blockquote",
"ul", "ol", "li", "h1", "h2", "h3", "h4", "h5", "h6",
"hr", "br", "table", "thead", "tbody", "tr", "th", "td",
"caption", "sup", "sub", "img",
}
MATRIX_ALLOWED_HTML_ATTRIBUTES: dict[str, set[str]] = {
"a": {"href"}, "code": {"class"}, "ol": {"start"},
"img": {"src", "alt", "title", "width", "height"},
}
MATRIX_ALLOWED_URL_SCHEMES = {"https", "http", "matrix", "mailto", "mxc"}
def _filter_matrix_html_attribute(tag: str, attr: str, value: str) -> str | None:
"""Filter attribute values to a safe Matrix-compatible subset."""
if tag == "a" and attr == "href":
return value if value.lower().startswith(("https://", "http://", "matrix:", "mailto:")) else None
if tag == "img" and attr == "src":
return value if value.lower().startswith("mxc://") else None
if tag == "code" and attr == "class":
classes = [c for c in value.split() if c.startswith("language-") and not c.startswith("language-_")]
return " ".join(classes) if classes else None
return value
MATRIX_HTML_CLEANER = nh3.Cleaner(
tags=MATRIX_ALLOWED_HTML_TAGS,
attributes=MATRIX_ALLOWED_HTML_ATTRIBUTES,
attribute_filter=_filter_matrix_html_attribute,
url_schemes=MATRIX_ALLOWED_URL_SCHEMES,
strip_comments=True,
link_rel="noopener noreferrer",
)
def _render_markdown_html(text: str) -> str | None:
"""Render markdown to sanitized HTML; returns None for plain text."""
try:
formatted = MATRIX_HTML_CLEANER.clean(MATRIX_MARKDOWN(text)).strip()
except Exception:
return None
if not formatted:
return None
# Skip formatted_body for plain <p>text</p> to keep payload minimal.
if formatted.startswith("<p>") and formatted.endswith("</p>"):
inner = formatted[3:-4]
if "<" not in inner and ">" not in inner:
return None
return formatted
def _build_matrix_text_content(text: str) -> dict[str, object]:
"""Build Matrix m.text payload with optional HTML formatted_body."""
content: dict[str, object] = {"msgtype": "m.text", "body": text, "m.mentions": {}}
if html := _render_markdown_html(text):
content["format"] = MATRIX_HTML_FORMAT
content["formatted_body"] = html
return content
class _NioLoguruHandler(logging.Handler):
"""Route matrix-nio stdlib logs into Loguru."""
def emit(self, record: logging.LogRecord) -> None:
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame, depth = frame.f_back, depth + 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
def _configure_nio_logging_bridge() -> None:
"""Bridge matrix-nio logs to Loguru (idempotent)."""
nio_logger = logging.getLogger("nio")
if not any(isinstance(h, _NioLoguruHandler) for h in nio_logger.handlers):
nio_logger.handlers = [_NioLoguruHandler()]
nio_logger.propagate = False
class MatrixChannel(BaseChannel):
"""Matrix (Element) channel using long-polling sync."""
name = "matrix"
def __init__(self, config: Any, bus, *, restrict_to_workspace: bool = False,
workspace: Path | None = None):
super().__init__(config, bus)
self.client: AsyncClient | None = None
self._sync_task: asyncio.Task | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
self._restrict_to_workspace = restrict_to_workspace
self._workspace = workspace.expanduser().resolve() if workspace else None
self._server_upload_limit_bytes: int | None = None
self._server_upload_limit_checked = False
async def start(self) -> None:
"""Start Matrix client and begin sync loop."""
self._running = True
_configure_nio_logging_bridge()
store_path = get_data_dir() / "matrix-store"
store_path.mkdir(parents=True, exist_ok=True)
self.client = AsyncClient(
homeserver=self.config.homeserver, user=self.config.user_id,
store_path=store_path,
config=AsyncClientConfig(store_sync_tokens=True, encryption_enabled=self.config.e2ee_enabled),
)
self.client.user_id = self.config.user_id
self.client.access_token = self.config.access_token
self.client.device_id = self.config.device_id
self._register_event_callbacks()
self._register_response_callbacks()
if not self.config.e2ee_enabled:
logger.warning("Matrix E2EE disabled; encrypted rooms may be undecryptable.")
if self.config.device_id:
try:
self.client.load_store()
except Exception:
logger.exception("Matrix store load failed; restart may replay recent messages.")
else:
logger.warning("Matrix device_id empty; restart may replay recent messages.")
self._sync_task = asyncio.create_task(self._sync_loop())
async def stop(self) -> None:
"""Stop the Matrix channel with graceful sync shutdown."""
self._running = False
for room_id in list(self._typing_tasks):
await self._stop_typing_keepalive(room_id, clear_typing=False)
if self.client:
self.client.stop_sync_forever()
if self._sync_task:
try:
await asyncio.wait_for(asyncio.shield(self._sync_task),
timeout=self.config.sync_stop_grace_seconds)
except (asyncio.TimeoutError, asyncio.CancelledError):
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
if self.client:
await self.client.close()
def _is_workspace_path_allowed(self, path: Path) -> bool:
"""Check path is inside workspace (when restriction enabled)."""
if not self._restrict_to_workspace or not self._workspace:
return True
try:
path.resolve(strict=False).relative_to(self._workspace)
return True
except ValueError:
return False
def _collect_outbound_media_candidates(self, media: list[str]) -> list[Path]:
"""Deduplicate and resolve outbound attachment paths."""
seen: set[str] = set()
candidates: list[Path] = []
for raw in media:
if not isinstance(raw, str) or not raw.strip():
continue
path = Path(raw.strip()).expanduser()
try:
key = str(path.resolve(strict=False))
except OSError:
key = str(path)
if key not in seen:
seen.add(key)
candidates.append(path)
return candidates
@staticmethod
def _build_outbound_attachment_content(
*, filename: str, mime: str, size_bytes: int,
mxc_url: str, encryption_info: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build Matrix content payload for an uploaded file/image/audio/video."""
prefix = mime.split("/")[0]
msgtype = {"image": "m.image", "audio": "m.audio", "video": "m.video"}.get(prefix, "m.file")
content: dict[str, Any] = {
"msgtype": msgtype, "body": filename, "filename": filename,
"info": {"mimetype": mime, "size": size_bytes}, "m.mentions": {},
}
if encryption_info:
content["file"] = {**encryption_info, "url": mxc_url}
else:
content["url"] = mxc_url
return content
def _is_encrypted_room(self, room_id: str) -> bool:
if not self.client:
return False
room = getattr(self.client, "rooms", {}).get(room_id)
return bool(getattr(room, "encrypted", False))
async def _send_room_content(self, room_id: str, content: dict[str, Any]) -> None:
"""Send m.room.message with E2EE options."""
if not self.client:
return
kwargs: dict[str, Any] = {"room_id": room_id, "message_type": "m.room.message", "content": content}
if self.config.e2ee_enabled:
kwargs["ignore_unverified_devices"] = True
await self.client.room_send(**kwargs)
async def _resolve_server_upload_limit_bytes(self) -> int | None:
"""Query homeserver upload limit once per channel lifecycle."""
if self._server_upload_limit_checked:
return self._server_upload_limit_bytes
self._server_upload_limit_checked = True
if not self.client:
return None
try:
response = await self.client.content_repository_config()
except Exception:
return None
upload_size = getattr(response, "upload_size", None)
if isinstance(upload_size, int) and upload_size > 0:
self._server_upload_limit_bytes = upload_size
return upload_size
return None
async def _effective_media_limit_bytes(self) -> int:
"""min(local config, server advertised) — 0 blocks all uploads."""
local_limit = max(int(self.config.max_media_bytes), 0)
server_limit = await self._resolve_server_upload_limit_bytes()
if server_limit is None:
return local_limit
return min(local_limit, server_limit) if local_limit else 0
async def _upload_and_send_attachment(
self, room_id: str, path: Path, limit_bytes: int,
relates_to: dict[str, Any] | None = None,
) -> str | None:
"""Upload one local file to Matrix and send it as a media message. Returns failure marker or None."""
if not self.client:
return _ATTACH_UPLOAD_FAILED.format(path.name or _DEFAULT_ATTACH_NAME)
resolved = path.expanduser().resolve(strict=False)
filename = safe_filename(resolved.name) or _DEFAULT_ATTACH_NAME
fail = _ATTACH_UPLOAD_FAILED.format(filename)
if not resolved.is_file() or not self._is_workspace_path_allowed(resolved):
return fail
try:
size_bytes = resolved.stat().st_size
except OSError:
return fail
if limit_bytes <= 0 or size_bytes > limit_bytes:
return _ATTACH_TOO_LARGE.format(filename)
mime = mimetypes.guess_type(filename, strict=False)[0] or "application/octet-stream"
try:
with resolved.open("rb") as f:
upload_result = await self.client.upload(
f, content_type=mime, filename=filename,
encrypt=self.config.e2ee_enabled and self._is_encrypted_room(room_id),
filesize=size_bytes,
)
except Exception:
return fail
upload_response = upload_result[0] if isinstance(upload_result, tuple) else upload_result
encryption_info = upload_result[1] if isinstance(upload_result, tuple) and isinstance(upload_result[1], dict) else None
if isinstance(upload_response, UploadError):
return fail
mxc_url = getattr(upload_response, "content_uri", None)
if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"):
return fail
content = self._build_outbound_attachment_content(
filename=filename, mime=mime, size_bytes=size_bytes,
mxc_url=mxc_url, encryption_info=encryption_info,
)
if relates_to:
content["m.relates_to"] = relates_to
try:
await self._send_room_content(room_id, content)
except Exception:
return fail
return None
async def send(self, msg: OutboundMessage) -> None:
"""Send outbound content; clear typing for non-progress messages."""
if not self.client:
return
text = msg.content or ""
candidates = self._collect_outbound_media_candidates(msg.media)
relates_to = self._build_thread_relates_to(msg.metadata)
is_progress = bool((msg.metadata or {}).get("_progress"))
try:
failures: list[str] = []
if candidates:
limit_bytes = await self._effective_media_limit_bytes()
for path in candidates:
if fail := await self._upload_and_send_attachment(
msg.chat_id, path, limit_bytes, relates_to):
failures.append(fail)
if failures:
text = f"{text.rstrip()}\n{chr(10).join(failures)}" if text.strip() else "\n".join(failures)
if text or not candidates:
content = _build_matrix_text_content(text)
if relates_to:
content["m.relates_to"] = relates_to
await self._send_room_content(msg.chat_id, content)
finally:
if not is_progress:
await self._stop_typing_keepalive(msg.chat_id, clear_typing=True)
def _register_event_callbacks(self) -> None:
self.client.add_event_callback(self._on_message, RoomMessageText)
self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_FILTER)
self.client.add_event_callback(self._on_room_invite, InviteEvent)
def _register_response_callbacks(self) -> None:
self.client.add_response_callback(self._on_sync_error, SyncError)
self.client.add_response_callback(self._on_join_error, JoinError)
self.client.add_response_callback(self._on_send_error, RoomSendError)
def _log_response_error(self, label: str, response: Any) -> None:
"""Log Matrix response errors — auth errors at ERROR level, rest at WARNING."""
code = getattr(response, "status_code", None)
is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"}
is_fatal = is_auth or getattr(response, "soft_logout", False)
(logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response)
async def _on_sync_error(self, response: SyncError) -> None:
self._log_response_error("sync", response)
async def _on_join_error(self, response: JoinError) -> None:
self._log_response_error("join", response)
async def _on_send_error(self, response: RoomSendError) -> None:
self._log_response_error("send", response)
async def _set_typing(self, room_id: str, typing: bool) -> None:
"""Best-effort typing indicator update."""
if not self.client:
return
try:
response = await self.client.room_typing(room_id=room_id, typing_state=typing,
timeout=TYPING_NOTICE_TIMEOUT_MS)
if isinstance(response, RoomTypingError):
logger.debug("Matrix typing failed for {}: {}", room_id, response)
except Exception:
pass
async def _start_typing_keepalive(self, room_id: str) -> None:
"""Start periodic typing refresh (spec-recommended keepalive)."""
await self._stop_typing_keepalive(room_id, clear_typing=False)
await self._set_typing(room_id, True)
if not self._running:
return
async def loop() -> None:
try:
while self._running:
await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000)
await self._set_typing(room_id, True)
except asyncio.CancelledError:
pass
self._typing_tasks[room_id] = asyncio.create_task(loop())
async def _stop_typing_keepalive(self, room_id: str, *, clear_typing: bool) -> None:
if task := self._typing_tasks.pop(room_id, None):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if clear_typing:
await self._set_typing(room_id, False)
async def _sync_loop(self) -> None:
while self._running:
try:
await self.client.sync_forever(timeout=30000, full_state=True)
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(2)
async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None:
allow_from = self.config.allow_from or []
if not allow_from or event.sender in allow_from:
await self.client.join(room.room_id)
def _is_direct_room(self, room: MatrixRoom) -> bool:
count = getattr(room, "member_count", None)
return isinstance(count, int) and count <= 2
def _is_bot_mentioned(self, event: RoomMessage) -> bool:
"""Check m.mentions payload for bot mention."""
source = getattr(event, "source", None)
if not isinstance(source, dict):
return False
mentions = (source.get("content") or {}).get("m.mentions")
if not isinstance(mentions, dict):
return False
user_ids = mentions.get("user_ids")
if isinstance(user_ids, list) and self.config.user_id in user_ids:
return True
return bool(self.config.allow_room_mentions and mentions.get("room") is True)
def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool:
"""Apply sender and room policy checks."""
if not self.is_allowed(event.sender):
return False
if self._is_direct_room(room):
return True
policy = self.config.group_policy
if policy == "open":
return True
if policy == "allowlist":
return room.room_id in (self.config.group_allow_from or [])
if policy == "mention":
return self._is_bot_mentioned(event)
return False
def _media_dir(self) -> Path:
d = get_data_dir() / "media" / "matrix"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _event_source_content(event: RoomMessage) -> dict[str, Any]:
source = getattr(event, "source", None)
if not isinstance(source, dict):
return {}
content = source.get("content")
return content if isinstance(content, dict) else {}
def _event_thread_root_id(self, event: RoomMessage) -> str | None:
relates_to = self._event_source_content(event).get("m.relates_to")
if not isinstance(relates_to, dict) or relates_to.get("rel_type") != "m.thread":
return None
root_id = relates_to.get("event_id")
return root_id if isinstance(root_id, str) and root_id else None
def _thread_metadata(self, event: RoomMessage) -> dict[str, str] | None:
if not (root_id := self._event_thread_root_id(event)):
return None
meta: dict[str, str] = {"thread_root_event_id": root_id}
if isinstance(reply_to := getattr(event, "event_id", None), str) and reply_to:
meta["thread_reply_to_event_id"] = reply_to
return meta
@staticmethod
def _build_thread_relates_to(metadata: dict[str, Any] | None) -> dict[str, Any] | None:
if not metadata:
return None
root_id = metadata.get("thread_root_event_id")
if not isinstance(root_id, str) or not root_id:
return None
reply_to = metadata.get("thread_reply_to_event_id") or metadata.get("event_id")
if not isinstance(reply_to, str) or not reply_to:
return None
return {"rel_type": "m.thread", "event_id": root_id,
"m.in_reply_to": {"event_id": reply_to}, "is_falling_back": True}
def _event_attachment_type(self, event: MatrixMediaEvent) -> str:
msgtype = self._event_source_content(event).get("msgtype")
return _MSGTYPE_MAP.get(msgtype, "file")
@staticmethod
def _is_encrypted_media_event(event: MatrixMediaEvent) -> bool:
return (isinstance(getattr(event, "key", None), dict)
and isinstance(getattr(event, "hashes", None), dict)
and isinstance(getattr(event, "iv", None), str))
def _event_declared_size_bytes(self, event: MatrixMediaEvent) -> int | None:
info = self._event_source_content(event).get("info")
size = info.get("size") if isinstance(info, dict) else None
return size if isinstance(size, int) and size >= 0 else None
def _event_mime(self, event: MatrixMediaEvent) -> str | None:
info = self._event_source_content(event).get("info")
if isinstance(info, dict) and isinstance(m := info.get("mimetype"), str) and m:
return m
m = getattr(event, "mimetype", None)
return m if isinstance(m, str) and m else None
def _event_filename(self, event: MatrixMediaEvent, attachment_type: str) -> str:
body = getattr(event, "body", None)
if isinstance(body, str) and body.strip():
if candidate := safe_filename(Path(body).name):
return candidate
return _DEFAULT_ATTACH_NAME if attachment_type == "file" else attachment_type
def _build_attachment_path(self, event: MatrixMediaEvent, attachment_type: str,
filename: str, mime: str | None) -> Path:
safe_name = safe_filename(Path(filename).name) or _DEFAULT_ATTACH_NAME
suffix = Path(safe_name).suffix
if not suffix and mime:
if guessed := mimetypes.guess_extension(mime, strict=False):
safe_name, suffix = f"{safe_name}{guessed}", guessed
stem = (Path(safe_name).stem or attachment_type)[:72]
suffix = suffix[:16]
event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$"))
event_prefix = (event_id[:24] or "evt").strip("_")
return self._media_dir() / f"{event_prefix}_{stem}{suffix}"
async def _download_media_bytes(self, mxc_url: str) -> bytes | None:
if not self.client:
return None
response = await self.client.download(mxc=mxc_url)
if isinstance(response, DownloadError):
logger.warning("Matrix download failed for {}: {}", mxc_url, response)
return None
body = getattr(response, "body", None)
if isinstance(body, (bytes, bytearray)):
return bytes(body)
if isinstance(response, MemoryDownloadResponse):
return bytes(response.body)
if isinstance(body, (str, Path)):
path = Path(body)
if path.is_file():
try:
return path.read_bytes()
except OSError:
return None
return None
def _decrypt_media_bytes(self, event: MatrixMediaEvent, ciphertext: bytes) -> bytes | None:
key_obj, hashes, iv = getattr(event, "key", None), getattr(event, "hashes", None), getattr(event, "iv", None)
key = key_obj.get("k") if isinstance(key_obj, dict) else None
sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None
if not all(isinstance(v, str) for v in (key, sha256, iv)):
return None
try:
return decrypt_attachment(ciphertext, key, sha256, iv)
except (EncryptionError, ValueError, TypeError):
logger.warning("Matrix decrypt failed for event {}", getattr(event, "event_id", ""))
return None
async def _fetch_media_attachment(
self, room: MatrixRoom, event: MatrixMediaEvent,
) -> tuple[dict[str, Any] | None, str]:
"""Download, decrypt if needed, and persist a Matrix attachment."""
atype = self._event_attachment_type(event)
mime = self._event_mime(event)
filename = self._event_filename(event, atype)
mxc_url = getattr(event, "url", None)
fail = _ATTACH_FAILED.format(filename)
if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"):
return None, fail
limit_bytes = await self._effective_media_limit_bytes()
declared = self._event_declared_size_bytes(event)
if declared is not None and declared > limit_bytes:
return None, _ATTACH_TOO_LARGE.format(filename)
downloaded = await self._download_media_bytes(mxc_url)
if downloaded is None:
return None, fail
encrypted = self._is_encrypted_media_event(event)
data = downloaded
if encrypted:
if (data := self._decrypt_media_bytes(event, downloaded)) is None:
return None, fail
if len(data) > limit_bytes:
return None, _ATTACH_TOO_LARGE.format(filename)
path = self._build_attachment_path(event, atype, filename, mime)
try:
path.write_bytes(data)
except OSError:
return None, fail
attachment = {
"type": atype, "mime": mime, "filename": filename,
"event_id": str(getattr(event, "event_id", "") or ""),
"encrypted": encrypted, "size_bytes": len(data),
"path": str(path), "mxc_url": mxc_url,
}
return attachment, _ATTACH_MARKER.format(path)
def _base_metadata(self, room: MatrixRoom, event: RoomMessage) -> dict[str, Any]:
"""Build common metadata for text and media handlers."""
meta: dict[str, Any] = {"room": getattr(room, "display_name", room.room_id)}
if isinstance(eid := getattr(event, "event_id", None), str) and eid:
meta["event_id"] = eid
if thread := self._thread_metadata(event):
meta.update(thread)
return meta
async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None:
if event.sender == self.config.user_id or not self._should_process_message(room, event):
return
await self._start_typing_keepalive(room.room_id)
try:
await self._handle_message(
sender_id=event.sender, chat_id=room.room_id,
content=event.body, metadata=self._base_metadata(room, event),
)
except Exception:
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
raise
async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None:
if event.sender == self.config.user_id or not self._should_process_message(room, event):
return
attachment, marker = await self._fetch_media_attachment(room, event)
parts: list[str] = []
if isinstance(body := getattr(event, "body", None), str) and body.strip():
parts.append(body.strip())
parts.append(marker)
await self._start_typing_keepalive(room.room_id)
try:
meta = self._base_metadata(room, event)
if attachment:
meta["attachments"] = [attachment]
await self._handle_message(
sender_id=event.sender, chat_id=room.room_id,
content="\n".join(parts),
media=[attachment["path"]] if attachment else [],
metadata=meta,
)
except Exception:
await self._stop_typing_keepalive(room.room_id, clear_typing=True)
raise

View File

@@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel):
BOT_COMMANDS = [
BotCommand("start", "Start the bot"),
BotCommand("new", "Start a new conversation"),
BotCommand("stop", "Stop the current task"),
BotCommand("help", "Show available commands"),
]
@@ -299,6 +300,7 @@ class TelegramChannel(BaseChannel):
await update.message.reply_text(
"🐈 nanobot commands:\n"
"/new — Start a new conversation\n"
"/stop — Stop the current task\n"
"/help — Show available commands"
)

View File

@@ -1,6 +1,8 @@
"""Configuration schema using Pydantic."""
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings
@@ -61,6 +63,23 @@ class DiscordConfig(Base):
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
class MatrixConfig(Base):
"""Matrix (Element) channel configuration."""
enabled: bool = False
homeserver: str = "https://matrix.org"
access_token: str = ""
user_id: str = "" # @bot:matrix.org
device_id: str = ""
e2ee_enabled: bool = True # Enable Matrix E2EE support (encryption + encrypted room handling).
sync_stop_grace_seconds: int = 2 # Max seconds to wait for sync_forever to stop gracefully before cancellation fallback.
max_media_bytes: int = 20 * 1024 * 1024 # Max attachment size accepted for Matrix media handling (inbound + outbound).
allow_from: list[str] = Field(default_factory=list)
group_policy: Literal["open", "mention", "allowlist"] = "open"
group_allow_from: list[str] = Field(default_factory=list)
allow_room_mentions: bool = False
class EmailConfig(Base):
"""Email channel configuration (IMAP inbound + SMTP outbound)."""
@@ -179,6 +198,7 @@ class ChannelsConfig(Base):
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig)
qq: QQConfig = Field(default_factory=QQConfig)
matrix: MatrixConfig = Field(default_factory=MatrixConfig)
class AgentDefaults(Base):
@@ -186,6 +206,7 @@ class AgentDefaults(Base):
workspace: str = "~/.nanobot/workspace"
model: str = "anthropic/claude-opus-4-5"
provider: str = "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection
max_tokens: int = 8192
temperature: float = 0.1
max_tool_iterations: int = 40
@@ -260,6 +281,7 @@ class ExecToolConfig(Base):
"""Shell exec tool configuration."""
timeout: int = 60
path_append: str = ""
class MCPServerConfig(Base):
@@ -300,6 +322,11 @@ class Config(BaseSettings):
"""Match provider config and its registry name. Returns (config, spec_name)."""
from nanobot.providers.registry import PROVIDERS
forced = self.agents.defaults.provider
if forced != "auto":
p = getattr(self.providers, forced, None)
return (p, forced) if p else (None, None)
model_lower = (model or self.agents.defaults.model).lower()
model_normalized = model_lower.replace("-", "_")
model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else ""

View File

@@ -2,14 +2,6 @@
You are a helpful AI assistant. Be concise, accurate, and friendly.
## Guidelines
- Before calling tools, briefly state your intent — but NEVER predict results before receiving them
- Use precise tense: "I will run X" before the call, "X returned Y" after
- NEVER claim success before a tool result confirms it
- Ask for clarification when the request is ambiguous
- Remember important information in `memory/MEMORY.md`; past events are logged in `memory/HISTORY.md`
## Scheduled Reminders
When user asks for a reminder at a specific time, use `exec` to run:

View File

@@ -3,7 +3,6 @@
from pathlib import Path
from datetime import datetime
def ensure_dir(path: Path) -> Path:
"""Ensure a directory exists, creating it if necessary."""
path.mkdir(parents=True, exist_ok=True)
@@ -77,4 +76,4 @@ def parse_session_key(key: str) -> tuple[str, str]:
parts = key.split(":", 1)
if len(parts) != 2:
raise ValueError(f"Invalid session key: {key}")
return parts[0], parts[1]
return parts[0], parts[1]

View File

@@ -45,6 +45,11 @@ dependencies = [
]
[project.optional-dependencies]
matrix = [
"matrix-nio[e2e]>=0.25.2",
"mistune>=3.0.0,<4.0.0",
"nh3>=0.2.17,<1.0.0",
]
dev = [
"pytest>=9.0.0,<10.0.0",
"pytest-asyncio>=1.3.0,<2.0.0",

View File

@@ -39,8 +39,8 @@ def test_system_prompt_stays_stable_when_clock_changes(tmp_path, monkeypatch) ->
assert prompt1 == prompt2
def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None:
"""Dynamic runtime details should be added at the tail user message, not system."""
def test_runtime_context_is_separate_untrusted_user_message(tmp_path) -> None:
"""Runtime metadata should be a separate user message before the actual user message."""
workspace = _make_workspace(tmp_path)
builder = ContextBuilder(workspace)
@@ -54,10 +54,13 @@ def test_runtime_context_is_appended_to_current_user_message(tmp_path) -> None:
assert messages[0]["role"] == "system"
assert "## Current Session" not in messages[0]["content"]
assert messages[-2]["role"] == "user"
runtime_content = messages[-2]["content"]
assert isinstance(runtime_content, str)
assert ContextBuilder._RUNTIME_CONTEXT_TAG in runtime_content
assert "Current Time:" in runtime_content
assert "Channel: cli" in runtime_content
assert "Chat ID: direct" in runtime_content
assert messages[-1]["role"] == "user"
user_content = messages[-1]["content"]
assert isinstance(user_content, str)
assert "Return exactly: OK" in user_content
assert "Current Time:" in user_content
assert "Channel: cli" in user_content
assert "Chat ID: direct" in user_content
assert messages[-1]["content"] == "Return exactly: OK"

1302
tests/test_matrix_channel.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
import pytest
from nanobot.agent.tools.message import MessageTool
@pytest.mark.asyncio
async def test_message_tool_returns_error_when_no_target_context() -> None:
tool = MessageTool()
result = await tool.execute(content="test")
assert result == "Error: No target channel/chat specified"

167
tests/test_task_cancel.py Normal file
View File

@@ -0,0 +1,167 @@
"""Tests for /stop task cancellation."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
def _make_loop():
"""Create a minimal AgentLoop with mocked dependencies."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
return loop, bus
class TestHandleStop:
@pytest.mark.asyncio
async def test_stop_no_active_task(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "No active task" in out.content
@pytest.mark.asyncio
async def test_stop_cancels_active_task(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
cancelled = asyncio.Event()
async def slow_task():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(slow_task())
await asyncio.sleep(0)
loop._active_tasks["test:c1"] = [task]
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
assert cancelled.is_set()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "stopped" in out.content.lower()
@pytest.mark.asyncio
async def test_stop_cancels_multiple_tasks(self):
from nanobot.bus.events import InboundMessage
loop, bus = _make_loop()
events = [asyncio.Event(), asyncio.Event()]
async def slow(idx):
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
events[idx].set()
raise
tasks = [asyncio.create_task(slow(i)) for i in range(2)]
await asyncio.sleep(0)
loop._active_tasks["test:c1"] = tasks
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/stop")
await loop._handle_stop(msg)
assert all(e.is_set() for e in events)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "2 task" in out.content
class TestDispatch:
@pytest.mark.asyncio
async def test_dispatch_processes_and_publishes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = _make_loop()
msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="hello")
loop._process_message = AsyncMock(
return_value=OutboundMessage(channel="test", chat_id="c1", content="hi")
)
await loop._dispatch(msg)
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert out.content == "hi"
@pytest.mark.asyncio
async def test_processing_lock_serializes(self):
from nanobot.bus.events import InboundMessage, OutboundMessage
loop, bus = _make_loop()
order = []
async def mock_process(m, **kwargs):
order.append(f"start-{m.content}")
await asyncio.sleep(0.05)
order.append(f"end-{m.content}")
return OutboundMessage(channel="test", chat_id="c1", content=m.content)
loop._process_message = mock_process
msg1 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="a")
msg2 = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="b")
t1 = asyncio.create_task(loop._dispatch(msg1))
t2 = asyncio.create_task(loop._dispatch(msg2))
await asyncio.gather(t1, t2)
assert order == ["start-a", "end-a", "start-b", "end-b"]
class TestSubagentCancellation:
@pytest.mark.asyncio
async def test_cancel_by_session(self):
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
cancelled = asyncio.Event()
async def slow():
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
cancelled.set()
raise
task = asyncio.create_task(slow())
await asyncio.sleep(0)
mgr._running_tasks["sub-1"] = task
mgr._session_tasks["test:c1"] = {"sub-1"}
count = await mgr.cancel_by_session("test:c1")
assert count == 1
assert cancelled.is_set()
@pytest.mark.asyncio
async def test_cancel_by_session_no_tasks(self):
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
mgr = SubagentManager(provider=provider, workspace=MagicMock(), bus=bus)
assert await mgr.cancel_by_session("nonexistent") == 0