Merge remote-tracking branch 'origin/main'
Some checks failed
Test Suite / test (3.11) (push) Failing after 1m8s
Test Suite / test (3.12) (push) Failing after 1m8s
Test Suite / test (3.13) (push) Failing after 1m7s

# Conflicts:
#	README.md
#	nanobot/agent/context.py
#	nanobot/agent/loop.py
#	nanobot/channels/telegram.py
This commit is contained in:
Hua
2026-03-19 00:42:43 +08:00
13 changed files with 548 additions and 73 deletions

View File

@@ -70,6 +70,8 @@
</details>
> 🐈 nanobot is for educational, research, and technical exchange purposes only. It is unrelated to crypto and does not involve any official token or coin.
## Key Features of nanobot:
🪶 **Ultra-Lightweight**: A super lightweight implementation of OpenClaw — 99% smaller, significantly faster.
@@ -177,7 +179,11 @@ nanobot channels login
> [!TIP]
> Set your API key in `~/.nanobot/config.json`.
> Get API keys: [OpenRouter](https://openrouter.ai/keys) (Global) · [Brave Search](https://brave.com/search/api/) or a self-hosted SearXNG instance (optional, for web search)
> Get API keys: [OpenRouter](https://openrouter.ai/keys) (Global)
>
> For other LLM providers, please see the [Providers](#providers) section.
>
> For web search capability setup (Brave Search or SearXNG), please see [Web Search](#web-search).
**1. Initialize**
@@ -258,7 +264,9 @@ That's it! You have a working AI assistant in 2 minutes.
## 💬 Chat Apps
Connect nanobot to your favorite chat platform.
Connect nanobot to your favorite chat platform. Want to build your own? See the [Channel Plugin Guide](./docs/CHANNEL_PLUGIN_GUIDE.md).
> Channel plugin support is available in the `main` branch; not yet published to PyPI.
| Channel | What you need |
|---------|---------------|
@@ -924,10 +932,12 @@ Config file: `~/.nanobot/config.json`
> [!TIP]
> - **Groq** provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed.
> - **MiniMax Coding Plan**: Exclusive discount links for the nanobot community: [Overseas](https://platform.minimax.io/subscribe/coding-plan?code=9txpdXw04g&source=link) · [Mainland China](https://platform.minimaxi.com/subscribe/token-plan?code=GILTJpMTqZ&source=link)
> - **MiniMax (Mainland China)**: If your API key is from MiniMax's mainland China platform (minimaxi.com), set `"apiBase": "https://api.minimaxi.com/v1"` in your minimax provider config.
> - **VolcEngine / BytePlus Coding Plan**: Use dedicated providers `volcengineCodingPlan` or `byteplusCodingPlan` instead of the pay-per-use `volcengine` / `byteplus` providers.
> - **Zhipu Coding Plan**: If you're on Zhipu's coding plan, set `"apiBase": "https://open.bigmodel.cn/api/coding/paas/v4"` in your zhipu provider config.
> - **MiniMax (Mainland China)**: If your API key is from MiniMax's mainland China platform (minimaxi.com), set `"apiBase": "https://api.minimaxi.com/v1"` in your minimax provider config.
> - **Alibaba Cloud Coding Plan**: If you're on the Alibaba Cloud Coding Plan (BaiLian), set `"apiBase": "https://coding.dashscope.aliyuncs.com/v1"` in your dashscope provider config.
> - **Alibaba Cloud BaiLian**: If you're using Alibaba Cloud BaiLian's OpenAI-compatible endpoint, set `"apiBase": "https://dashscope.aliyuncs.com/compatible-mode/v1"` in your dashscope provider config.
| Provider | Purpose | Get API Key |
|----------|---------|-------------|
@@ -940,8 +950,8 @@ Config file: `~/.nanobot/config.json`
| `openai` | LLM (GPT direct) | [platform.openai.com](https://platform.openai.com) |
| `deepseek` | LLM (DeepSeek direct) | [platform.deepseek.com](https://platform.deepseek.com) |
| `groq` | LLM + **Voice transcription** (Whisper) | [console.groq.com](https://console.groq.com) |
| `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) |
| `minimax` | LLM (MiniMax direct) | [platform.minimaxi.com](https://platform.minimaxi.com) |
| `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) |
| `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) |
| `siliconflow` | LLM (SiliconFlow/硅基流动) | [siliconflow.cn](https://siliconflow.cn) |
| `dashscope` | LLM (Qwen) | [dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) |

View File

@@ -171,6 +171,7 @@ Reply directly with text for conversations. Only use the 'message' tool to send
chat_id: str | None = None,
persona: str | None = None,
language: str | None = None,
current_role: str = "user",
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
runtime_ctx = self._build_runtime_context(channel, chat_id)
@@ -186,7 +187,7 @@ Reply directly with text for conversations. Only use the 'message' tool to send
return [
{"role": "system", "content": self.build_system_prompt(skill_names, persona=persona, language=language)},
*history,
{"role": "user", "content": merged},
{"role": current_role, "content": merged},
]
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
@@ -205,7 +206,11 @@ Reply directly with text for conversations. Only use the 'message' tool to send
if not mime or not mime.startswith("image/"):
continue
b64 = base64.b64encode(raw).decode()
images.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
images.append({
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}"},
"_meta": {"path": str(p)},
})
if not images:
return text

View File

@@ -765,6 +765,8 @@ class AgentLoop:
await self.memory_consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
history = session.get_history(max_messages=0)
# Subagent results should be assistant role, other system messages use user role
current_role = "assistant" if msg.sender_id == "subagent" else "user"
messages = self.context.build_messages(
history=history,
current_message=msg.content,
@@ -772,6 +774,7 @@ class AgentLoop:
chat_id=chat_id,
persona=persona,
language=language,
current_role=current_role,
)
final_content, _, all_msgs = await self._run_agent_loop(messages)
self._save_turn(session, all_msgs, 1 + len(history))
@@ -883,7 +886,9 @@ class AgentLoop:
continue # Strip runtime context from multimodal messages
if (c.get("type") == "image_url"
and c.get("image_url", {}).get("url", "").startswith("data:image/")):
filtered.append({"type": "text", "text": "[image]"})
path = (c.get("_meta") or {}).get("path", "")
placeholder = f"[image: {path}]" if path else "[image]"
filtered.append({"type": "text", "text": placeholder})
else:
filtered.append(c)
if not filtered:

View File

@@ -1,11 +1,12 @@
"""Cron tool for scheduling reminders and tasks."""
from contextvars import ContextVar
from datetime import datetime, timezone
from typing import Any
from nanobot.agent.tools.base import Tool
from nanobot.cron.service import CronService
from nanobot.cron.types import CronSchedule
from nanobot.cron.types import CronJobState, CronSchedule
class CronTool(Tool):
@@ -143,11 +144,51 @@ class CronTool(Tool):
)
return f"Created job '{job.name}' (id: {job.id})"
@staticmethod
def _format_timing(schedule: CronSchedule) -> str:
"""Format schedule as a human-readable timing string."""
if schedule.kind == "cron":
tz = f" ({schedule.tz})" if schedule.tz else ""
return f"cron: {schedule.expr}{tz}"
if schedule.kind == "every" and schedule.every_ms:
ms = schedule.every_ms
if ms % 3_600_000 == 0:
return f"every {ms // 3_600_000}h"
if ms % 60_000 == 0:
return f"every {ms // 60_000}m"
if ms % 1000 == 0:
return f"every {ms // 1000}s"
return f"every {ms}ms"
if schedule.kind == "at" and schedule.at_ms:
dt = datetime.fromtimestamp(schedule.at_ms / 1000, tz=timezone.utc)
return f"at {dt.isoformat()}"
return schedule.kind
@staticmethod
def _format_state(state: CronJobState) -> list[str]:
"""Format job run state as display lines."""
lines: list[str] = []
if state.last_run_at_ms:
last_dt = datetime.fromtimestamp(state.last_run_at_ms / 1000, tz=timezone.utc)
info = f" Last run: {last_dt.isoformat()}{state.last_status or 'unknown'}"
if state.last_error:
info += f" ({state.last_error})"
lines.append(info)
if state.next_run_at_ms:
next_dt = datetime.fromtimestamp(state.next_run_at_ms / 1000, tz=timezone.utc)
lines.append(f" Next run: {next_dt.isoformat()}")
return lines
def _list_jobs(self) -> str:
jobs = self._cron.list_jobs()
if not jobs:
return "No scheduled jobs."
lines = [f"- {j.name} (id: {j.id}, {j.schedule.kind})" for j in jobs]
lines = []
for j in jobs:
timing = self._format_timing(j.schedule)
parts = [f"- {j.name} (id: {j.id}, {timing})"]
parts.extend(self._format_state(j.state))
lines.append("\n".join(parts))
return "Scheduled jobs:\n" + "\n".join(lines)
def _remove_job(self, job_id: str | None) -> str:

View File

@@ -23,6 +23,7 @@ from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.paths import get_media_dir
from nanobot.config.schema import TelegramConfig, TelegramInstanceConfig
from nanobot.security.network import validate_url_target
from nanobot.utils.helpers import split_message
TELEGRAM_MAX_MESSAGE_LEN = 4000 # Telegram message character limit
@@ -312,6 +313,10 @@ class TelegramChannel(BaseChannel):
return "audio"
return "document"
@staticmethod
def _is_remote_media_url(path: str) -> bool:
return path.startswith(("http://", "https://"))
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Telegram."""
if not self._app:
@@ -353,7 +358,21 @@ class TelegramChannel(BaseChannel):
"audio": self._app.bot.send_audio,
}.get(media_type, self._app.bot.send_document)
param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document"
with open(media_path, 'rb') as f:
# Telegram Bot API accepts HTTP(S) URLs directly for media params.
if self._is_remote_media_url(media_path):
ok, error = validate_url_target(media_path)
if not ok:
raise ValueError(f"unsafe media URL: {error}")
await sender(
chat_id=chat_id,
**{param: media_path},
reply_parameters=reply_params,
**thread_kwargs,
)
continue
with open(media_path, "rb") as f:
await sender(
chat_id=chat_id,
**{param: f},

View File

@@ -1,8 +1,30 @@
"""LLM provider abstraction module."""
from __future__ import annotations
from importlib import import_module
from typing import TYPE_CHECKING
from nanobot.providers.base import LLMProvider, LLMResponse
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import OpenAICodexProvider
from nanobot.providers.azure_openai_provider import AzureOpenAIProvider
__all__ = ["LLMProvider", "LLMResponse", "LiteLLMProvider", "OpenAICodexProvider", "AzureOpenAIProvider"]
_LAZY_IMPORTS = {
"LiteLLMProvider": ".litellm_provider",
"OpenAICodexProvider": ".openai_codex_provider",
"AzureOpenAIProvider": ".azure_openai_provider",
}
if TYPE_CHECKING:
from nanobot.providers.azure_openai_provider import AzureOpenAIProvider
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import OpenAICodexProvider
def __getattr__(name: str):
"""Lazily expose provider implementations without importing all backends up front."""
module_name = _LAZY_IMPORTS.get(name)
if module_name is None:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
module = import_module(module_name, __name__)
return getattr(module, name)

View File

@@ -89,14 +89,6 @@ class LLMProvider(ABC):
"server error",
"temporarily unavailable",
)
_IMAGE_UNSUPPORTED_MARKERS = (
"image_url is only supported",
"does not support image",
"images are not supported",
"image input is not supported",
"image_url is not supported",
"unsupported image input",
)
_SENTINEL = object()
@@ -107,11 +99,7 @@ class LLMProvider(ABC):
@staticmethod
def _sanitize_empty_content(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Replace empty text content that causes provider 400 errors.
Empty content can appear when MCP tools return nothing. Most providers
reject empty-string content or empty text blocks in list content.
"""
"""Sanitize message content: fix empty blocks, strip internal _meta fields."""
result: list[dict[str, Any]] = []
for msg in messages:
content = msg.get("content")
@@ -123,18 +111,25 @@ class LLMProvider(ABC):
continue
if isinstance(content, list):
filtered = [
item for item in content
if not (
new_items: list[Any] = []
changed = False
for item in content:
if (
isinstance(item, dict)
and item.get("type") in ("text", "input_text", "output_text")
and not item.get("text")
)
]
if len(filtered) != len(content):
):
changed = True
continue
if isinstance(item, dict) and "_meta" in item:
new_items.append({k: v for k, v in item.items() if k != "_meta"})
changed = True
else:
new_items.append(item)
if changed:
clean = dict(msg)
if filtered:
clean["content"] = filtered
if new_items:
clean["content"] = new_items
elif msg.get("role") == "assistant" and msg.get("tool_calls"):
clean["content"] = None
else:
@@ -197,11 +192,6 @@ class LLMProvider(ABC):
err = (content or "").lower()
return any(marker in err for marker in cls._TRANSIENT_ERROR_MARKERS)
@classmethod
def _is_image_unsupported_error(cls, content: str | None) -> bool:
err = (content or "").lower()
return any(marker in err for marker in cls._IMAGE_UNSUPPORTED_MARKERS)
@staticmethod
def _strip_image_content(messages: list[dict[str, Any]]) -> list[dict[str, Any]] | None:
"""Replace image_url blocks with text placeholder. Returns None if no images found."""
@@ -213,7 +203,9 @@ class LLMProvider(ABC):
new_content = []
for b in content:
if isinstance(b, dict) and b.get("type") == "image_url":
new_content.append({"type": "text", "text": "[image omitted]"})
path = (b.get("_meta") or {}).get("path", "")
placeholder = f"[image: {path}]" if path else "[image omitted]"
new_content.append({"type": "text", "text": placeholder})
found = True
else:
new_content.append(b)
@@ -267,11 +259,10 @@ class LLMProvider(ABC):
return response
if not self._is_transient_error(response.content):
if self._is_image_unsupported_error(response.content):
stripped = self._strip_image_content(messages)
if stripped is not None:
logger.warning("Model does not support image input, retrying without images")
return await self._safe_chat(**{**kw, "messages": stripped})
stripped = self._strip_image_content(messages)
if stripped is not None:
logger.warning("Non-transient LLM error with image content, retrying without images")
return await self._safe_chat(**{**kw, "messages": stripped})
return response
logger.warning(

Binary file not shown.

Before

Width:  |  Height:  |  Size: 610 KiB

After

Width:  |  Height:  |  Size: 187 KiB

View File

@@ -0,0 +1,250 @@
"""Tests for CronTool._list_jobs() output formatting."""
from nanobot.agent.tools.cron import CronTool
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJobState, CronSchedule
def _make_tool(tmp_path) -> CronTool:
service = CronService(tmp_path / "cron" / "jobs.json")
return CronTool(service)
# -- _format_timing tests --
def test_format_timing_cron_with_tz() -> None:
s = CronSchedule(kind="cron", expr="0 9 * * 1-5", tz="America/Denver")
assert CronTool._format_timing(s) == "cron: 0 9 * * 1-5 (America/Denver)"
def test_format_timing_cron_without_tz() -> None:
s = CronSchedule(kind="cron", expr="*/5 * * * *")
assert CronTool._format_timing(s) == "cron: */5 * * * *"
def test_format_timing_every_hours() -> None:
s = CronSchedule(kind="every", every_ms=7_200_000)
assert CronTool._format_timing(s) == "every 2h"
def test_format_timing_every_minutes() -> None:
s = CronSchedule(kind="every", every_ms=1_800_000)
assert CronTool._format_timing(s) == "every 30m"
def test_format_timing_every_seconds() -> None:
s = CronSchedule(kind="every", every_ms=30_000)
assert CronTool._format_timing(s) == "every 30s"
def test_format_timing_every_non_minute_seconds() -> None:
s = CronSchedule(kind="every", every_ms=90_000)
assert CronTool._format_timing(s) == "every 90s"
def test_format_timing_every_milliseconds() -> None:
s = CronSchedule(kind="every", every_ms=200)
assert CronTool._format_timing(s) == "every 200ms"
def test_format_timing_at() -> None:
s = CronSchedule(kind="at", at_ms=1773684000000)
result = CronTool._format_timing(s)
assert result.startswith("at 2026-")
def test_format_timing_fallback() -> None:
s = CronSchedule(kind="every") # no every_ms
assert CronTool._format_timing(s) == "every"
# -- _format_state tests --
def test_format_state_empty() -> None:
state = CronJobState()
assert CronTool._format_state(state) == []
def test_format_state_last_run_ok() -> None:
state = CronJobState(last_run_at_ms=1773673200000, last_status="ok")
lines = CronTool._format_state(state)
assert len(lines) == 1
assert "Last run:" in lines[0]
assert "ok" in lines[0]
def test_format_state_last_run_with_error() -> None:
state = CronJobState(last_run_at_ms=1773673200000, last_status="error", last_error="timeout")
lines = CronTool._format_state(state)
assert len(lines) == 1
assert "error" in lines[0]
assert "timeout" in lines[0]
def test_format_state_next_run_only() -> None:
state = CronJobState(next_run_at_ms=1773684000000)
lines = CronTool._format_state(state)
assert len(lines) == 1
assert "Next run:" in lines[0]
def test_format_state_both() -> None:
state = CronJobState(
last_run_at_ms=1773673200000, last_status="ok", next_run_at_ms=1773684000000
)
lines = CronTool._format_state(state)
assert len(lines) == 2
assert "Last run:" in lines[0]
assert "Next run:" in lines[1]
def test_format_state_unknown_status() -> None:
state = CronJobState(last_run_at_ms=1773673200000, last_status=None)
lines = CronTool._format_state(state)
assert "unknown" in lines[0]
# -- _list_jobs integration tests --
def test_list_empty(tmp_path) -> None:
tool = _make_tool(tmp_path)
assert tool._list_jobs() == "No scheduled jobs."
def test_list_cron_job_shows_expression_and_timezone(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Morning scan",
schedule=CronSchedule(kind="cron", expr="0 9 * * 1-5", tz="America/Denver"),
message="scan",
)
result = tool._list_jobs()
assert "cron: 0 9 * * 1-5 (America/Denver)" in result
def test_list_every_job_shows_human_interval(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Frequent check",
schedule=CronSchedule(kind="every", every_ms=1_800_000),
message="check",
)
result = tool._list_jobs()
assert "every 30m" in result
def test_list_every_job_hours(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Hourly check",
schedule=CronSchedule(kind="every", every_ms=7_200_000),
message="check",
)
result = tool._list_jobs()
assert "every 2h" in result
def test_list_every_job_seconds(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Fast check",
schedule=CronSchedule(kind="every", every_ms=30_000),
message="check",
)
result = tool._list_jobs()
assert "every 30s" in result
def test_list_every_job_non_minute_seconds(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Ninety-second check",
schedule=CronSchedule(kind="every", every_ms=90_000),
message="check",
)
result = tool._list_jobs()
assert "every 90s" in result
def test_list_every_job_milliseconds(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Sub-second check",
schedule=CronSchedule(kind="every", every_ms=200),
message="check",
)
result = tool._list_jobs()
assert "every 200ms" in result
def test_list_at_job_shows_iso_timestamp(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="One-shot",
schedule=CronSchedule(kind="at", at_ms=1773684000000),
message="fire",
)
result = tool._list_jobs()
assert "at 2026-" in result
def test_list_shows_last_run_state(tmp_path) -> None:
tool = _make_tool(tmp_path)
job = tool._cron.add_job(
name="Stateful job",
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"),
message="test",
)
# Simulate a completed run by updating state in the store
job.state.last_run_at_ms = 1773673200000
job.state.last_status = "ok"
tool._cron._save_store()
result = tool._list_jobs()
assert "Last run:" in result
assert "ok" in result
def test_list_shows_error_message(tmp_path) -> None:
tool = _make_tool(tmp_path)
job = tool._cron.add_job(
name="Failed job",
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"),
message="test",
)
job.state.last_run_at_ms = 1773673200000
job.state.last_status = "error"
job.state.last_error = "timeout"
tool._cron._save_store()
result = tool._list_jobs()
assert "error" in result
assert "timeout" in result
def test_list_shows_next_run(tmp_path) -> None:
tool = _make_tool(tmp_path)
tool._cron.add_job(
name="Upcoming job",
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"),
message="test",
)
result = tool._list_jobs()
assert "Next run:" in result
def test_list_excludes_disabled_jobs(tmp_path) -> None:
tool = _make_tool(tmp_path)
job = tool._cron.add_job(
name="Paused job",
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"),
message="test",
)
tool._cron.enable_job(job.id, enabled=False)
result = tool._list_jobs()
assert "Paused job" not in result
assert result == "No scheduled jobs."

View File

@@ -22,11 +22,30 @@ def test_save_turn_skips_multimodal_user_when_only_runtime_context() -> None:
assert session.messages == []
def test_save_turn_keeps_image_placeholder_after_runtime_strip() -> None:
def test_save_turn_keeps_image_placeholder_with_path_after_runtime_strip() -> None:
loop = _mk_loop()
session = Session(key="test:image")
runtime = ContextBuilder._RUNTIME_CONTEXT_TAG + "\nCurrent Time: now (UTC)"
loop._save_turn(
session,
[{
"role": "user",
"content": [
{"type": "text", "text": runtime},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, "_meta": {"path": "/media/feishu/photo.jpg"}},
],
}],
skip=0,
)
assert session.messages[0]["content"] == [{"type": "text", "text": "[image: /media/feishu/photo.jpg]"}]
def test_save_turn_keeps_image_placeholder_without_meta() -> None:
loop = _mk_loop()
session = Session(key="test:image-no-meta")
runtime = ContextBuilder._RUNTIME_CONTEXT_TAG + "\nCurrent Time: now (UTC)"
loop._save_turn(
session,
[{

View File

@@ -126,10 +126,17 @@ async def test_chat_with_retry_explicit_override_beats_defaults() -> None:
# ---------------------------------------------------------------------------
# Image-unsupported fallback tests
# Image fallback tests
# ---------------------------------------------------------------------------
_IMAGE_MSG = [
{"role": "user", "content": [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}, "_meta": {"path": "/media/test.png"}},
]},
]
_IMAGE_MSG_NO_META = [
{"role": "user", "content": [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
@@ -138,13 +145,10 @@ _IMAGE_MSG = [
@pytest.mark.asyncio
async def test_image_unsupported_error_retries_without_images() -> None:
"""If the model rejects image_url, retry once with images stripped."""
async def test_non_transient_error_with_images_retries_without_images() -> None:
"""Any non-transient error retries once with images stripped when images are present."""
provider = ScriptedProvider([
LLMResponse(
content="Invalid content type. image_url is only supported by certain models",
finish_reason="error",
),
LLMResponse(content="API调用参数有误,请检查文档", finish_reason="error"),
LLMResponse(content="ok, no image"),
])
@@ -157,17 +161,14 @@ async def test_image_unsupported_error_retries_without_images() -> None:
content = msg.get("content")
if isinstance(content, list):
assert all(b.get("type") != "image_url" for b in content)
assert any("[image omitted]" in (b.get("text") or "") for b in content)
assert any("[image: /media/test.png]" in (b.get("text") or "") for b in content)
@pytest.mark.asyncio
async def test_image_unsupported_error_no_retry_without_image_content() -> None:
"""If messages don't contain image_url blocks, don't retry on image error."""
async def test_non_transient_error_without_images_no_retry() -> None:
"""Non-transient errors without image content are returned immediately."""
provider = ScriptedProvider([
LLMResponse(
content="image_url is only supported by certain models",
finish_reason="error",
),
LLMResponse(content="401 unauthorized", finish_reason="error"),
])
response = await provider.chat_with_retry(
@@ -179,31 +180,34 @@ async def test_image_unsupported_error_no_retry_without_image_content() -> None:
@pytest.mark.asyncio
async def test_image_unsupported_fallback_returns_error_on_second_failure() -> None:
async def test_image_fallback_returns_error_on_second_failure() -> None:
"""If the image-stripped retry also fails, return that error."""
provider = ScriptedProvider([
LLMResponse(
content="does not support image input",
finish_reason="error",
),
LLMResponse(content="some other error", finish_reason="error"),
LLMResponse(content="some model error", finish_reason="error"),
LLMResponse(content="still failing", finish_reason="error"),
])
response = await provider.chat_with_retry(messages=_IMAGE_MSG)
assert provider.calls == 2
assert response.content == "some other error"
assert response.content == "still failing"
assert response.finish_reason == "error"
@pytest.mark.asyncio
async def test_non_image_error_does_not_trigger_image_fallback() -> None:
"""Regular non-transient errors must not trigger image stripping."""
async def test_image_fallback_without_meta_uses_default_placeholder() -> None:
"""When _meta is absent, fallback placeholder is '[image omitted]'."""
provider = ScriptedProvider([
LLMResponse(content="401 unauthorized", finish_reason="error"),
LLMResponse(content="error", finish_reason="error"),
LLMResponse(content="ok"),
])
response = await provider.chat_with_retry(messages=_IMAGE_MSG)
response = await provider.chat_with_retry(messages=_IMAGE_MSG_NO_META)
assert provider.calls == 1
assert response.content == "401 unauthorized"
assert response.content == "ok"
assert provider.calls == 2
msgs_on_retry = provider.last_kwargs["messages"]
for msg in msgs_on_retry:
content = msg.get("content")
if isinstance(content, list):
assert any("[image omitted]" in (b.get("text") or "") for b in content)

View File

@@ -0,0 +1,37 @@
"""Tests for lazy provider exports from nanobot.providers."""
from __future__ import annotations
import importlib
import sys
def test_importing_providers_package_is_lazy(monkeypatch) -> None:
monkeypatch.delitem(sys.modules, "nanobot.providers", raising=False)
monkeypatch.delitem(sys.modules, "nanobot.providers.litellm_provider", raising=False)
monkeypatch.delitem(sys.modules, "nanobot.providers.openai_codex_provider", raising=False)
monkeypatch.delitem(sys.modules, "nanobot.providers.azure_openai_provider", raising=False)
providers = importlib.import_module("nanobot.providers")
assert "nanobot.providers.litellm_provider" not in sys.modules
assert "nanobot.providers.openai_codex_provider" not in sys.modules
assert "nanobot.providers.azure_openai_provider" not in sys.modules
assert providers.__all__ == [
"LLMProvider",
"LLMResponse",
"LiteLLMProvider",
"OpenAICodexProvider",
"AzureOpenAIProvider",
]
def test_explicit_provider_import_still_works(monkeypatch) -> None:
monkeypatch.delitem(sys.modules, "nanobot.providers", raising=False)
monkeypatch.delitem(sys.modules, "nanobot.providers.litellm_provider", raising=False)
namespace: dict[str, object] = {}
exec("from nanobot.providers import LiteLLMProvider", namespace)
assert namespace["LiteLLMProvider"].__name__ == "LiteLLMProvider"
assert "nanobot.providers.litellm_provider" in sys.modules

View File

@@ -30,6 +30,7 @@ class _FakeUpdater:
class _FakeBot:
def __init__(self) -> None:
self.sent_messages: list[dict] = []
self.sent_media: list[dict] = []
self.get_me_calls = 0
async def get_me(self):
@@ -42,6 +43,18 @@ class _FakeBot:
async def send_message(self, **kwargs) -> None:
self.sent_messages.append(kwargs)
async def send_photo(self, **kwargs) -> None:
self.sent_media.append({"kind": "photo", **kwargs})
async def send_voice(self, **kwargs) -> None:
self.sent_media.append({"kind": "voice", **kwargs})
async def send_audio(self, **kwargs) -> None:
self.sent_media.append({"kind": "audio", **kwargs})
async def send_document(self, **kwargs) -> None:
self.sent_media.append({"kind": "document", **kwargs})
async def send_chat_action(self, **kwargs) -> None:
pass
@@ -231,6 +244,65 @@ async def test_send_reply_infers_topic_from_message_id_cache() -> None:
assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10
@pytest.mark.asyncio
async def test_send_remote_media_url_after_security_validation(monkeypatch) -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
monkeypatch.setattr("nanobot.channels.telegram.validate_url_target", lambda url: (True, ""))
await channel.send(
OutboundMessage(
channel="telegram",
chat_id="123",
content="",
media=["https://example.com/cat.jpg"],
)
)
assert channel._app.bot.sent_media == [
{
"kind": "photo",
"chat_id": 123,
"photo": "https://example.com/cat.jpg",
"reply_parameters": None,
}
]
@pytest.mark.asyncio
async def test_send_blocks_unsafe_remote_media_url(monkeypatch) -> None:
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
monkeypatch.setattr(
"nanobot.channels.telegram.validate_url_target",
lambda url: (False, "Blocked: example.com resolves to private/internal address 127.0.0.1"),
)
await channel.send(
OutboundMessage(
channel="telegram",
chat_id="123",
content="",
media=["http://example.com/internal.jpg"],
)
)
assert channel._app.bot.sent_media == []
assert channel._app.bot.sent_messages == [
{
"chat_id": 123,
"text": "[Failed to send: internal.jpg]",
"reply_parameters": None,
}
]
@pytest.mark.asyncio
async def test_group_policy_mention_ignores_unmentioned_group_message() -> None:
channel = TelegramChannel(