Compare commits

...

2 Commits

Author SHA1 Message Date
Hua
0274ee5c95 fix(session): avoid blocking large chat cleanup
Some checks failed
Test Suite / test (3.11) (push) Failing after 1m7s
Test Suite / test (3.12) (push) Failing after 1m23s
Test Suite / test (3.13) (push) Failing after 1m9s
2026-03-20 12:47:52 +08:00
Hua
f34462c076 fix(qq): allow file_data uploads without media url 2026-03-20 11:33:47 +08:00
9 changed files with 428 additions and 85 deletions

View File

@@ -35,7 +35,7 @@ Do not commit real API keys, tokens, chat logs, or workspace data. Keep local se
- Agent runtime config should be hot-reloaded from the active `config.json` for safe in-process fields such as `tools.mcpServers`, `tools.web.*`, `tools.exec.*`, `tools.restrictToWorkspace`, `agents.defaults.model`, `agents.defaults.maxToolIterations`, `agents.defaults.contextWindowTokens`, `agents.defaults.maxTokens`, `agents.defaults.temperature`, `agents.defaults.reasoningEffort`, `channels.sendProgress`, and `channels.sendToolHints`. Channel connection settings and provider credentials still require a restart. - Agent runtime config should be hot-reloaded from the active `config.json` for safe in-process fields such as `tools.mcpServers`, `tools.web.*`, `tools.exec.*`, `tools.restrictToWorkspace`, `agents.defaults.model`, `agents.defaults.maxToolIterations`, `agents.defaults.contextWindowTokens`, `agents.defaults.maxTokens`, `agents.defaults.temperature`, `agents.defaults.reasoningEffort`, `channels.sendProgress`, and `channels.sendToolHints`. Channel connection settings and provider credentials still require a restart.
- nanobot does not expose local files over HTTP. If a feature needs a public URL for local files, provide your own static file server and point config such as `mediaBaseUrl` at it. - nanobot does not expose local files over HTTP. If a feature needs a public URL for local files, provide your own static file server and point config such as `mediaBaseUrl` at it.
- Generated screenshots, downloads, and other temporary user-delivery artifacts should be written under `workspace/out`, not the workspace root. Treat that as the generic delivery-artifact root for tools, MCP servers, and skills. - Generated screenshots, downloads, and other temporary user-delivery artifacts should be written under `workspace/out`, not the workspace root. Treat that as the generic delivery-artifact root for tools, MCP servers, and skills.
- QQ outbound media sends remote `http(s)` image URLs directly. For local QQ images, prefer the documented rich-media `file_data` upload path together with the public `url`, and keep the URL-only flow as a fallback for SDK/runtime compatibility. QQ consumes delivery artifacts produced elsewhere; `mediaBaseUrl` must expose those generated files through your own static file server. - QQ outbound media sends remote `http(s)` image URLs directly. For local QQ images, try `file_data` upload first. If `mediaBaseUrl` is configured, keep the URL-based path available as a fallback for SDK/runtime compatibility; without it, there is no URL fallback.
- `/skill` shells out to `npx clawhub@latest`; it requires Node.js/`npx` at runtime. - `/skill` shells out to `npx clawhub@latest`; it requires Node.js/`npx` at runtime.
- `/skill uninstall` runs in a non-interactive context, so keep passing `--yes` when shelling out to ClawHub. - `/skill uninstall` runs in a non-interactive context, so keep passing `--yes` when shelling out to ClawHub.
- Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user. - Treat empty `/skill search` output as a user-visible "no results" case rather than a silent success. Surface npm/registry failures directly to the user.

View File

@@ -706,11 +706,10 @@ Uses **botpy SDK** with WebSocket — no public IP required. Currently supports
} }
``` ```
`mediaBaseUrl` is optional, but it is required if you want nanobot to send local screenshots or `mediaBaseUrl` is optional. For local QQ images, nanobot will first try direct `file_data` upload
other local image files through QQ. nanobot does not serve local files over HTTP, so from generated delivery artifacts under `workspace/out`. Configuring `mediaBaseUrl` is still
`mediaBaseUrl` must point to your own static file server. Generated delivery artifacts should be recommended, because nanobot can then map those files onto your own static file server and fall
written under `workspace/out`, and `mediaBaseUrl` should expose that directory with matching back to the URL-based rich-media flow when needed.
relative paths.
Multi-bot example: Multi-bot example:
@@ -747,14 +746,11 @@ nanobot gateway
Now send a message to the bot from QQ — it should respond! Now send a message to the bot from QQ — it should respond!
Outbound QQ media sends remote `http(s)` images through the QQ rich-media `url` flow directly. Outbound QQ media sends remote `http(s)` images through the QQ rich-media `url` flow directly.
For local image files, nanobot first publishes or maps the file to a public URL, then tries the For local image files, nanobot always tries `file_data` upload first. When `mediaBaseUrl` is
documented `file_data` upload path together with that URL; if the installed QQ SDK/runtime path configured, nanobot also maps the same local file onto that public URL and can fall back to the
does not accept that upload, nanobot falls back to the existing URL-only rich-media flow. existing URL-only rich-media flow if direct upload fails. Without `mediaBaseUrl`, nanobot still
nanobot does not serve local files itself, so `mediaBaseUrl` must point to your own HTTP server attempts direct upload, but there is no URL fallback path. Tools and skills should write
that exposes generated delivery artifacts. Tools and skills should write deliverable files under deliverable files under `workspace/out`; QQ accepts only local image files from that directory.
`workspace/out`; QQ maps local image paths from that directory onto `mediaBaseUrl` using the same
relative path. Files outside `workspace/out` are rejected. Without that publishing config, local
files still fall back to a text notice.
When an agent uses shell/browser tools to create screenshots or other temporary files for delivery, When an agent uses shell/browser tools to create screenshots or other temporary files for delivery,
it should write them under `workspace/out` instead of the workspace root so channel publishing rules it should write them under `workspace/out` instead of the workspace root so channel publishing rules

View File

@@ -71,6 +71,7 @@ class AgentLoop:
"registry.npmjs.org", "registry.npmjs.org",
) )
_CLAWHUB_NPM_CACHE_DIR = Path(tempfile.gettempdir()) / "nanobot-npm-cache" _CLAWHUB_NPM_CACHE_DIR = Path(tempfile.gettempdir()) / "nanobot-npm-cache"
_PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS = 1.5
def __init__( def __init__(
self, self,
@@ -137,7 +138,8 @@ class AgentLoop:
self._mcp_connected = False self._mcp_connected = False
self._mcp_connecting = False self._mcp_connecting = False
self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks
self._background_tasks: list[asyncio.Task] = [] self._background_tasks: set[asyncio.Task] = set()
self._token_consolidation_tasks: dict[str, asyncio.Task[None]] = {}
self._processing_lock = asyncio.Lock() self._processing_lock = asyncio.Lock()
self.memory_consolidator = MemoryConsolidator( self.memory_consolidator = MemoryConsolidator(
workspace=workspace, workspace=workspace,
@@ -933,15 +935,55 @@ class AgentLoop:
async def close_mcp(self) -> None: async def close_mcp(self) -> None:
"""Drain pending background archives, then close MCP connections.""" """Drain pending background archives, then close MCP connections."""
if self._background_tasks: if self._background_tasks:
await asyncio.gather(*self._background_tasks, return_exceptions=True) await asyncio.gather(*list(self._background_tasks), return_exceptions=True)
self._background_tasks.clear() self._background_tasks.clear()
self._token_consolidation_tasks.clear()
await self._reset_mcp_connections() await self._reset_mcp_connections()
def _schedule_background(self, coro) -> None: def _track_background_task(self, task: asyncio.Task) -> asyncio.Task:
"""Track a background task until completion."""
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
return task
def _schedule_background(self, coro) -> asyncio.Task:
"""Schedule a coroutine as a tracked background task (drained on shutdown).""" """Schedule a coroutine as a tracked background task (drained on shutdown)."""
task = asyncio.create_task(coro) task = asyncio.create_task(coro)
self._background_tasks.append(task) return self._track_background_task(task)
task.add_done_callback(self._background_tasks.remove)
def _ensure_background_token_consolidation(self, session: Session) -> asyncio.Task[None]:
"""Ensure at most one token-consolidation task runs per session."""
existing = self._token_consolidation_tasks.get(session.key)
if existing and not existing.done():
return existing
task = asyncio.create_task(self.memory_consolidator.maybe_consolidate_by_tokens(session))
self._token_consolidation_tasks[session.key] = task
self._track_background_task(task)
def _cleanup(done: asyncio.Task[None]) -> None:
if self._token_consolidation_tasks.get(session.key) is done:
self._token_consolidation_tasks.pop(session.key, None)
task.add_done_callback(_cleanup)
return task
async def _run_preflight_token_consolidation(self, session: Session) -> None:
"""Give token consolidation a short head start, then continue in background if needed."""
task = self._ensure_background_token_consolidation(session)
try:
await asyncio.wait_for(
asyncio.shield(task),
timeout=self._PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS,
)
except asyncio.TimeoutError:
logger.warning(
"Token consolidation still running for {} after {:.1f}s; continuing in background",
session.key,
self._PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS,
)
except Exception:
logger.exception("Preflight token consolidation failed for {}", session.key)
def stop(self) -> None: def stop(self) -> None:
"""Stop the agent loop.""" """Stop the agent loop."""
@@ -967,7 +1009,7 @@ class AgentLoop:
persona = self._get_session_persona(session) persona = self._get_session_persona(session)
language = self._get_session_language(session) language = self._get_session_language(session)
await self._connect_mcp() await self._connect_mcp()
await self.memory_consolidator.maybe_consolidate_by_tokens(session) await self._run_preflight_token_consolidation(session)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
history = session.get_history(max_messages=0) history = session.get_history(max_messages=0)
# Subagent results should be assistant role, other system messages use user role # Subagent results should be assistant role, other system messages use user role
@@ -984,7 +1026,7 @@ class AgentLoop:
final_content, _, all_msgs = await self._run_agent_loop(messages) final_content, _, all_msgs = await self._run_agent_loop(messages)
self._save_turn(session, all_msgs, 1 + len(history)) self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session) self.sessions.save(session)
self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session)) self._ensure_background_token_consolidation(session)
return OutboundMessage(channel=channel, chat_id=chat_id, return OutboundMessage(channel=channel, chat_id=chat_id,
content=final_content or "Background task completed.") content=final_content or "Background task completed.")
@@ -1022,7 +1064,7 @@ class AgentLoop:
channel=msg.channel, chat_id=msg.chat_id, content="\n".join(help_lines(language)), channel=msg.channel, chat_id=msg.chat_id, content="\n".join(help_lines(language)),
) )
await self._connect_mcp() await self._connect_mcp()
await self.memory_consolidator.maybe_consolidate_by_tokens(session) await self._run_preflight_token_consolidation(session)
self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
if message_tool := self.tools.get("message"): if message_tool := self.tools.get("message"):
@@ -1057,7 +1099,7 @@ class AgentLoop:
self._save_turn(session, all_msgs, 1 + len(history)) self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session) self.sessions.save(session)
self._schedule_background(self.memory_consolidator.maybe_consolidate_by_tokens(session)) self._ensure_background_token_consolidation(session)
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
return None return None

View File

@@ -36,11 +36,12 @@ if TYPE_CHECKING:
def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
"""Create a botpy Client subclass bound to the given channel.""" """Create a botpy Client subclass bound to the given channel."""
intents = botpy.Intents(public_messages=True, direct_message=True) intents = botpy.Intents(public_messages=True, direct_message=True)
http_timeout_seconds = 20
class _Bot(botpy.Client): class _Bot(botpy.Client):
def __init__(self): def __init__(self):
# Disable botpy's file log — nanobot uses loguru; default "botpy.log" fails on read-only fs # Disable botpy's file log — nanobot uses loguru; default "botpy.log" fails on read-only fs
super().__init__(intents=intents, ext_handlers=False) super().__init__(intents=intents, timeout=http_timeout_seconds, ext_handlers=False)
async def on_ready(self): async def on_ready(self):
logger.info("QQ bot ready: {}", self.robot.name) logger.info("QQ bot ready: {}", self.robot.name)
@@ -96,16 +97,17 @@ class QQChannel(BaseChannel):
"""Return the active workspace root used by QQ publishing.""" """Return the active workspace root used by QQ publishing."""
return (self._workspace or Path.cwd()).resolve(strict=False) return (self._workspace or Path.cwd()).resolve(strict=False)
async def _publish_local_media(self, media_path: str) -> tuple[str | None, str | None]: async def _publish_local_media(
"""Map a local delivery artifact to its served URL.""" self,
_, media_url, error = resolve_delivery_media( media_path: str,
) -> tuple[Path | None, str | None, str | None]:
"""Resolve a local delivery artifact and optionally map it to its served URL."""
local_path, media_url, error = resolve_delivery_media(
media_path, media_path,
self._workspace_root(), self._workspace_root(),
self.config.media_base_url, self.config.media_base_url,
) )
if error: return local_path, media_url, error
return None, error
return media_url, None
def _next_msg_seq(self) -> int: def _next_msg_seq(self) -> int:
"""Return the next QQ message sequence number.""" """Return the next QQ message sequence number."""
@@ -174,21 +176,22 @@ class QQChannel(BaseChannel):
self, self,
chat_id: str, chat_id: str,
msg_type: str, msg_type: str,
media_url: str, media_url: str | None,
local_path: Path, local_path: Path,
content: str | None, content: str | None,
msg_id: str | None, msg_id: str | None,
) -> None: ) -> None:
"""Upload a local QQ image using the documented file_data field, then send it.""" """Upload a local QQ image using file_data and, when available, a public URL."""
if not self._client or Route is None: if not self._client or Route is None:
raise RuntimeError("QQ client not initialized") raise RuntimeError("QQ client not initialized")
payload = { payload = {
"file_type": 1, "file_type": 1,
"url": media_url,
"file_data": self._encode_file_data(local_path), "file_data": self._encode_file_data(local_path),
"srv_send_msg": False, "srv_send_msg": False,
} }
if media_url:
payload["url"] = media_url
if msg_type == "group": if msg_type == "group":
route = Route("POST", "/v2/groups/{group_openid}/files", group_openid=chat_id) route = Route("POST", "/v2/groups/{group_openid}/files", group_openid=chat_id)
media = await self._client.api._http.request(route, json=payload) media = await self._client.api._http.request(route, json=payload)
@@ -265,9 +268,10 @@ class QQChannel(BaseChannel):
resolved_media = media_path resolved_media = media_path
local_media_path: Path | None = None local_media_path: Path | None = None
if not self._is_remote_media(media_path): if not self._is_remote_media(media_path):
local_media_path = Path(media_path).expanduser() local_media_path, resolved_media, publish_error = await self._publish_local_media(
resolved_media, publish_error = await self._publish_local_media(media_path) media_path
if not resolved_media: )
if local_media_path is None:
logger.warning( logger.warning(
"QQ outbound local media could not be published: {} ({})", "QQ outbound local media could not be published: {} ({})",
media_path, media_path,
@@ -278,11 +282,12 @@ class QQChannel(BaseChannel):
) )
continue continue
ok, error = validate_url_target(resolved_media) if resolved_media:
if not ok: ok, error = validate_url_target(resolved_media)
logger.warning("QQ outbound media blocked by URL validation: {}", error) if not ok:
fallback_lines.append(self._failed_media_notice(media_path, error)) logger.warning("QQ outbound media blocked by URL validation: {}", error)
continue fallback_lines.append(self._failed_media_notice(media_path, error))
continue
try: try:
if local_media_path is not None: if local_media_path is not None:
@@ -296,18 +301,32 @@ class QQChannel(BaseChannel):
msg_id, msg_id,
) )
except Exception as local_upload_error: except Exception as local_upload_error:
logger.warning( if resolved_media:
"QQ local file_data upload failed for {}: {}, falling back to URL-only upload", logger.warning(
local_media_path, "QQ local file_data upload failed for {}: {}, falling back to URL-only upload",
local_upload_error, local_media_path,
) local_upload_error,
await self._post_remote_media_message( )
msg.chat_id, await self._post_remote_media_message(
msg_type, msg.chat_id,
resolved_media, msg_type,
msg.content if msg.content and not content_sent else None, resolved_media,
msg_id, msg.content if msg.content and not content_sent else None,
) msg_id,
)
else:
logger.warning(
"QQ local file_data upload failed for {} without mediaBaseUrl fallback: {}",
local_media_path,
local_upload_error,
)
fallback_lines.append(
self._failed_media_notice(
media_path,
"QQ local file_data upload failed",
)
)
continue
else: else:
await self._post_remote_media_message( await self._post_remote_media_message(
msg.chat_id, msg.chat_id,

View File

@@ -31,6 +31,9 @@ class Session:
updated_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict)
last_consolidated: int = 0 # Number of messages already consolidated to files last_consolidated: int = 0 # Number of messages already consolidated to files
_persisted_message_count: int = field(default=0, init=False, repr=False)
_persisted_metadata_state: str = field(default="", init=False, repr=False)
_requires_full_save: bool = field(default=False, init=False, repr=False)
def add_message(self, role: str, content: str, **kwargs: Any) -> None: def add_message(self, role: str, content: str, **kwargs: Any) -> None:
"""Add a message to the session.""" """Add a message to the session."""
@@ -97,6 +100,7 @@ class Session:
self.messages = [] self.messages = []
self.last_consolidated = 0 self.last_consolidated = 0
self.updated_at = datetime.now() self.updated_at = datetime.now()
self._requires_full_save = True
class SessionManager: class SessionManager:
@@ -178,33 +182,87 @@ class SessionManager:
else: else:
messages.append(data) messages.append(data)
return Session( session = Session(
key=key, key=key,
messages=messages, messages=messages,
created_at=created_at or datetime.now(), created_at=created_at or datetime.now(),
updated_at=datetime.fromtimestamp(path.stat().st_mtime),
metadata=metadata, metadata=metadata,
last_consolidated=last_consolidated last_consolidated=last_consolidated
) )
self._mark_persisted(session)
return session
except Exception as e: except Exception as e:
logger.warning("Failed to load session {}: {}", key, e) logger.warning("Failed to load session {}: {}", key, e)
return None return None
@staticmethod
def _metadata_state(session: Session) -> str:
"""Serialize metadata fields that require a checkpoint line."""
return json.dumps(
{
"key": session.key,
"created_at": session.created_at.isoformat(),
"metadata": session.metadata,
"last_consolidated": session.last_consolidated,
},
ensure_ascii=False,
sort_keys=True,
)
@staticmethod
def _metadata_line(session: Session) -> dict[str, Any]:
"""Build a metadata checkpoint record."""
return {
"_type": "metadata",
"key": session.key,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata,
"last_consolidated": session.last_consolidated
}
@staticmethod
def _write_jsonl_line(handle: Any, payload: dict[str, Any]) -> None:
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
def _mark_persisted(self, session: Session) -> None:
session._persisted_message_count = len(session.messages)
session._persisted_metadata_state = self._metadata_state(session)
session._requires_full_save = False
def _rewrite_session_file(self, path: Path, session: Session) -> None:
with open(path, "w", encoding="utf-8") as f:
self._write_jsonl_line(f, self._metadata_line(session))
for msg in session.messages:
self._write_jsonl_line(f, msg)
self._mark_persisted(session)
def save(self, session: Session) -> None: def save(self, session: Session) -> None:
"""Save a session to disk.""" """Save a session to disk."""
path = self._get_session_path(session.key) path = self._get_session_path(session.key)
metadata_state = self._metadata_state(session)
needs_full_rewrite = (
session._requires_full_save
or not path.exists()
or session._persisted_message_count > len(session.messages)
)
with open(path, "w", encoding="utf-8") as f: if needs_full_rewrite:
metadata_line = { session.updated_at = datetime.now()
"_type": "metadata", self._rewrite_session_file(path, session)
"key": session.key, else:
"created_at": session.created_at.isoformat(), new_messages = session.messages[session._persisted_message_count:]
"updated_at": session.updated_at.isoformat(), metadata_changed = metadata_state != session._persisted_metadata_state
"metadata": session.metadata,
"last_consolidated": session.last_consolidated if new_messages or metadata_changed:
} session.updated_at = datetime.now()
f.write(json.dumps(metadata_line, ensure_ascii=False) + "\n") with open(path, "a", encoding="utf-8") as f:
for msg in session.messages: for msg in new_messages:
f.write(json.dumps(msg, ensure_ascii=False) + "\n") self._write_jsonl_line(f, msg)
if metadata_changed:
self._write_jsonl_line(f, self._metadata_line(session))
self._mark_persisted(session)
self._cache[session.key] = session self._cache[session.key] = session
@@ -223,19 +281,24 @@ class SessionManager:
for path in self.sessions_dir.glob("*.jsonl"): for path in self.sessions_dir.glob("*.jsonl"):
try: try:
# Read just the metadata line created_at = None
key = path.stem.replace("_", ":", 1)
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
first_line = f.readline().strip() first_line = f.readline().strip()
if first_line: if first_line:
data = json.loads(first_line) data = json.loads(first_line)
if data.get("_type") == "metadata": if data.get("_type") == "metadata":
key = data.get("key") or path.stem.replace("_", ":", 1) key = data.get("key") or key
sessions.append({ created_at = data.get("created_at")
"key": key,
"created_at": data.get("created_at"), # Incremental saves append messages without rewriting the first metadata line,
"updated_at": data.get("updated_at"), # so use file mtime as the session's latest activity timestamp.
"path": str(path) sessions.append({
}) "key": key,
"created_at": created_at,
"updated_at": datetime.fromtimestamp(path.stat().st_mtime).isoformat(),
"path": str(path)
})
except Exception: except Exception:
continue continue

View File

@@ -28,11 +28,9 @@ def is_image_file(path: Path) -> bool:
def resolve_delivery_media( def resolve_delivery_media(
media_path: str | Path, media_path: str | Path,
workspace: Path, workspace: Path,
media_base_url: str, media_base_url: str = "",
) -> tuple[Path | None, str | None, str | None]: ) -> tuple[Path | None, str | None, str | None]:
"""Resolve a local delivery artifact to a public URL under media_base_url.""" """Resolve a local delivery artifact and optionally map it to a public URL."""
if not media_base_url:
return None, None, "local media publishing is not configured"
source = Path(media_path).expanduser() source = Path(media_path).expanduser()
try: try:
@@ -55,6 +53,9 @@ def resolve_delivery_media(
if not is_image_file(resolved): if not is_image_file(resolved):
return None, None, "local delivery media must be an image" return None, None, "local delivery media must be an image"
if not media_base_url:
return resolved, None, None
media_url = urljoin( media_url = urljoin(
f"{media_base_url.rstrip('/')}/", f"{media_base_url.rstrip('/')}/",
quote(relative_path.as_posix(), safe="/"), quote(relative_path.as_posix(), safe="/"),

View File

@@ -1,9 +1,10 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from nanobot.agent.loop import AgentLoop
import nanobot.agent.memory as memory_module import nanobot.agent.memory as memory_module
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse from nanobot.providers.base import LLMResponse
@@ -188,3 +189,36 @@ async def test_preflight_consolidation_before_llm_call(tmp_path, monkeypatch) ->
assert "consolidate" in order assert "consolidate" in order
assert "llm" in order assert "llm" in order
assert order.index("consolidate") < order.index("llm") assert order.index("consolidate") < order.index("llm")
@pytest.mark.asyncio
async def test_slow_preflight_consolidation_continues_in_background(tmp_path, monkeypatch) -> None:
order: list[str] = []
loop = _make_loop(tmp_path, estimated_tokens=0, context_window_tokens=200)
monkeypatch.setattr(loop, "_PREFLIGHT_CONSOLIDATION_BUDGET_SECONDS", 0.01)
release = asyncio.Event()
async def slow_consolidation(_session):
order.append("consolidate-start")
await release.wait()
order.append("consolidate-end")
async def track_llm(*args, **kwargs):
order.append("llm")
return LLMResponse(content="ok", tool_calls=[])
loop.memory_consolidator.maybe_consolidate_by_tokens = slow_consolidation # type: ignore[method-assign]
loop.provider.chat_with_retry = track_llm
await loop.process_direct("hello", session_key="cli:test")
assert "consolidate-start" in order
assert "llm" in order
assert "consolidate-end" not in order
release.set()
await loop.close_mcp()
assert "consolidate-end" in order

View File

@@ -5,7 +5,7 @@ import pytest
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.qq import QQChannel from nanobot.channels.qq import QQChannel, _make_bot_class
from nanobot.config.schema import QQConfig from nanobot.config.schema import QQConfig
@@ -54,6 +54,23 @@ class _FakeClient:
self.api = _FakeApi() self.api = _FakeApi()
def test_make_bot_class_uses_longer_http_timeout(monkeypatch) -> None:
if not hasattr(__import__("nanobot.channels.qq", fromlist=["botpy"]).botpy, "Client"):
pytest.skip("botpy not installed")
captured: dict[str, object] = {}
def fake_init(self, *args, **kwargs) -> None: # noqa: ARG001
captured["kwargs"] = kwargs
monkeypatch.setattr("nanobot.channels.qq.botpy.Client.__init__", fake_init)
bot_cls = _make_bot_class(SimpleNamespace(_on_message=None))
bot_cls()
assert captured["kwargs"]["timeout"] == 20
assert captured["kwargs"]["ext_handlers"] is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_on_group_message_routes_to_group_chat_id() -> None: async def test_on_group_message_routes_to_group_chat_id() -> None:
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["user1"]), MessageBus()) channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["user1"]), MessageBus())
@@ -164,8 +181,21 @@ async def test_send_group_remote_media_url_uses_file_api_then_media_message(monk
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_local_media_falls_back_to_text_notice_when_publishing_not_configured() -> None: async def test_send_local_media_without_media_base_url_uses_file_data_only(
channel = QQChannel(QQConfig(app_id="app", secret="secret", allow_from=["*"]), MessageBus()) tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
out_dir = workspace / "out"
out_dir.mkdir()
source = out_dir / "demo.png"
source.write_bytes(b"\x89PNG\r\n\x1a\nfake-png")
channel = QQChannel(
QQConfig(app_id="app", secret="secret", allow_from=["*"]),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient() channel._client = _FakeClient()
await channel.send( await channel.send(
@@ -173,18 +203,31 @@ async def test_send_local_media_falls_back_to_text_notice_when_publishing_not_co
channel="qq", channel="qq",
chat_id="user123", chat_id="user123",
content="hello", content="hello",
media=["/tmp/demo.png"], media=[str(source)],
metadata={"message_id": "msg1"}, metadata={"message_id": "msg1"},
) )
) )
assert channel._client.api.c2c_file_calls == [] assert channel._client.api.c2c_file_calls == []
assert channel._client.api.group_file_calls == [] assert channel._client.api.group_file_calls == []
assert channel._client.api.raw_file_upload_calls == [
{
"method": "POST",
"path": "/v2/users/{openid}/files",
"params": {"openid": "user123"},
"json": {
"file_type": 1,
"file_data": b64encode(b"\x89PNG\r\n\x1a\nfake-png").decode("ascii"),
"srv_send_msg": False,
},
}
]
assert channel._client.api.c2c_calls == [ assert channel._client.api.c2c_calls == [
{ {
"openid": "user123", "openid": "user123",
"msg_type": 0, "msg_type": 7,
"content": "hello\n[Failed to send: demo.png - local media publishing is not configured]", "content": "hello",
"media": {"file_info": "c2c-file-info", "file_uuid": "c2c-file", "ttl": 60},
"msg_id": "msg1", "msg_id": "msg1",
"msg_seq": 2, "msg_seq": 2,
} }
@@ -420,6 +463,47 @@ async def test_send_local_media_falls_back_to_url_only_upload_when_file_data_upl
] ]
@pytest.mark.asyncio
async def test_send_local_media_without_media_base_url_falls_back_to_text_notice_when_file_data_upload_fails(
tmp_path,
) -> None:
workspace = tmp_path / "workspace"
workspace.mkdir()
out_dir = workspace / "out"
out_dir.mkdir()
source = out_dir / "demo.png"
source.write_bytes(b"\x89PNG\r\n\x1a\nfake-png")
channel = QQChannel(
QQConfig(app_id="app", secret="secret", allow_from=["*"]),
MessageBus(),
workspace=workspace,
)
channel._client = _FakeClient()
channel._client.api.raise_on_raw_file_upload = True
await channel.send(
OutboundMessage(
channel="qq",
chat_id="user123",
content="hello",
media=[str(source)],
metadata={"message_id": "msg1"},
)
)
assert channel._client.api.c2c_file_calls == []
assert channel._client.api.c2c_calls == [
{
"openid": "user123",
"msg_type": 0,
"content": "hello\n[Failed to send: demo.png - QQ local file_data upload failed]",
"msg_id": "msg1",
"msg_seq": 2,
}
]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_local_media_symlink_to_outside_out_dir_is_rejected( async def test_send_local_media_symlink_to_outside_out_dir_is_rejected(
monkeypatch, monkeypatch,

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import json
import os
import time
from datetime import datetime
from pathlib import Path
from nanobot.session.manager import SessionManager
def _read_jsonl(path: Path) -> list[dict]:
return [
json.loads(line)
for line in path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
def test_save_appends_only_new_messages(tmp_path: Path) -> None:
manager = SessionManager(tmp_path)
session = manager.get_or_create("qq:test")
session.add_message("user", "hello")
session.add_message("assistant", "hi")
manager.save(session)
path = manager._get_session_path(session.key)
original_text = path.read_text(encoding="utf-8")
session.add_message("user", "next")
manager.save(session)
lines = _read_jsonl(path)
assert path.read_text(encoding="utf-8").startswith(original_text)
assert sum(1 for line in lines if line.get("_type") == "metadata") == 1
assert [line["content"] for line in lines if line.get("role")] == ["hello", "hi", "next"]
def test_save_appends_metadata_checkpoint_without_rewriting_history(tmp_path: Path) -> None:
manager = SessionManager(tmp_path)
session = manager.get_or_create("qq:test")
session.add_message("user", "hello")
session.add_message("assistant", "hi")
manager.save(session)
path = manager._get_session_path(session.key)
original_text = path.read_text(encoding="utf-8")
session.last_consolidated = 2
manager.save(session)
lines = _read_jsonl(path)
assert path.read_text(encoding="utf-8").startswith(original_text)
assert sum(1 for line in lines if line.get("_type") == "metadata") == 2
assert lines[-1]["_type"] == "metadata"
assert lines[-1]["last_consolidated"] == 2
manager.invalidate(session.key)
reloaded = manager.get_or_create("qq:test")
assert reloaded.last_consolidated == 2
assert [message["content"] for message in reloaded.messages] == ["hello", "hi"]
def test_clear_rewrites_session_file(tmp_path: Path) -> None:
manager = SessionManager(tmp_path)
session = manager.get_or_create("qq:test")
session.add_message("user", "hello")
session.add_message("assistant", "hi")
manager.save(session)
path = manager._get_session_path(session.key)
session.clear()
manager.save(session)
lines = _read_jsonl(path)
assert len(lines) == 1
assert lines[0]["_type"] == "metadata"
assert lines[0]["last_consolidated"] == 0
manager.invalidate(session.key)
reloaded = manager.get_or_create("qq:test")
assert reloaded.messages == []
assert reloaded.last_consolidated == 0
def test_list_sessions_uses_file_mtime_for_append_only_updates(tmp_path: Path) -> None:
manager = SessionManager(tmp_path)
session = manager.get_or_create("qq:test")
session.add_message("user", "hello")
manager.save(session)
path = manager._get_session_path(session.key)
stale_time = time.time() - 3600
os.utime(path, (stale_time, stale_time))
before = datetime.fromisoformat(manager.list_sessions()[0]["updated_at"])
assert before.timestamp() < time.time() - 3000
session.add_message("assistant", "hi")
manager.save(session)
after = datetime.fromisoformat(manager.list_sessions()[0]["updated_at"])
assert after > before