diff --git a/.gitignore b/.gitignore index d7b930d..374875a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.worktrees/ .assets .env *.pyc @@ -19,4 +20,4 @@ __pycache__/ poetry.lock .pytest_cache/ botpy.log -tests/ + diff --git a/README.md b/README.md index 9066d5a..03f042a 100644 --- a/README.md +++ b/README.md @@ -12,26 +12,48 @@
-🐈 **nanobot** is an **ultra-lightweight** personal AI assistant inspired by [OpenClaw](https://github.com/openclaw/openclaw) +🐈 **nanobot** is an **ultra-lightweight** personal AI assistant inspired by [OpenClaw](https://github.com/openclaw/openclaw). -⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. +⚡️ Delivers core agent functionality with **99% fewer lines of code** than OpenClaw. -📏 Real-time line count: **3,663 lines** (run `bash core_agent_lines.sh` to verify anytime) +📏 Real-time line count: run `bash core_agent_lines.sh` to verify anytime. ## 📢 News -- **2026-02-13** 🎉 Released v0.1.3.post7 — includes security hardening and multiple improvements. All users are recommended to upgrade to the latest version. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post7) for more details. +- **2026-02-28** 🚀 Released **v0.1.4.post3** — cleaner context, hardened session history, and smarter agent. Please see [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post3) for details. +- **2026-02-27** 🧠 Experimental thinking mode support, DingTalk media messages, Feishu and QQ channel fixes. +- **2026-02-26** 🛡️ Session poisoning fix, WhatsApp dedup, Windows path guard, Mistral compatibility. +- **2026-02-25** 🧹 New Matrix channel, cleaner session context, auto workspace template sync. +- **2026-02-24** 🚀 Released **v0.1.4.post2** — a reliability-focused release with a redesigned heartbeat, prompt cache optimization, and hardened provider & channel stability. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post2) for details. +- **2026-02-23** 🔧 Virtual tool-call heartbeat, prompt cache optimization, Slack mrkdwn fixes. +- **2026-02-22** 🛡️ Slack thread isolation, Discord typing fix, agent reliability improvements. +- **2026-02-21** 🎉 Released **v0.1.4.post1** — new providers, media support across channels, and major stability improvements. See [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.4.post1) for details. +- **2026-02-20** 🐦 Feishu now receives multimodal files from users. More reliable memory under the hood. +- **2026-02-19** ✨ Slack now sends files, Discord splits long messages, and subagents work in CLI mode. + +text
to keep payload minimal. + if formatted.startswith("") and formatted.endswith("
"): + inner = formatted[3:-4] + if "<" not in inner and ">" not in inner: + return None + return formatted + + +def _build_matrix_text_content(text: str) -> dict[str, object]: + """Build Matrix m.text payload with optional HTML formatted_body.""" + content: dict[str, object] = {"msgtype": "m.text", "body": text, "m.mentions": {}} + if html := _render_markdown_html(text): + content["format"] = MATRIX_HTML_FORMAT + content["formatted_body"] = html + return content + + +class _NioLoguruHandler(logging.Handler): + """Route matrix-nio stdlib logs into Loguru.""" + + def emit(self, record: logging.LogRecord) -> None: + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + frame, depth = logging.currentframe(), 2 + while frame and frame.f_code.co_filename == logging.__file__: + frame, depth = frame.f_back, depth + 1 + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + +def _configure_nio_logging_bridge() -> None: + """Bridge matrix-nio logs to Loguru (idempotent).""" + nio_logger = logging.getLogger("nio") + if not any(isinstance(h, _NioLoguruHandler) for h in nio_logger.handlers): + nio_logger.handlers = [_NioLoguruHandler()] + nio_logger.propagate = False + + +class MatrixChannel(BaseChannel): + """Matrix (Element) channel using long-polling sync.""" + + name = "matrix" + + def __init__(self, config: Any, bus, *, restrict_to_workspace: bool = False, + workspace: Path | None = None): + super().__init__(config, bus) + self.client: AsyncClient | None = None + self._sync_task: asyncio.Task | None = None + self._typing_tasks: dict[str, asyncio.Task] = {} + self._restrict_to_workspace = restrict_to_workspace + self._workspace = workspace.expanduser().resolve() if workspace else None + self._server_upload_limit_bytes: int | None = None + self._server_upload_limit_checked = False + + async def start(self) -> None: + """Start Matrix client and begin sync loop.""" + self._running = True + _configure_nio_logging_bridge() + + store_path = get_data_dir() / "matrix-store" + store_path.mkdir(parents=True, exist_ok=True) + + self.client = AsyncClient( + homeserver=self.config.homeserver, user=self.config.user_id, + store_path=store_path, + config=AsyncClientConfig(store_sync_tokens=True, encryption_enabled=self.config.e2ee_enabled), + ) + self.client.user_id = self.config.user_id + self.client.access_token = self.config.access_token + self.client.device_id = self.config.device_id + + self._register_event_callbacks() + self._register_response_callbacks() + + if not self.config.e2ee_enabled: + logger.warning("Matrix E2EE disabled; encrypted rooms may be undecryptable.") + + if self.config.device_id: + try: + self.client.load_store() + except Exception: + logger.exception("Matrix store load failed; restart may replay recent messages.") + else: + logger.warning("Matrix device_id empty; restart may replay recent messages.") + + self._sync_task = asyncio.create_task(self._sync_loop()) + + async def stop(self) -> None: + """Stop the Matrix channel with graceful sync shutdown.""" + self._running = False + for room_id in list(self._typing_tasks): + await self._stop_typing_keepalive(room_id, clear_typing=False) + if self.client: + self.client.stop_sync_forever() + if self._sync_task: + try: + await asyncio.wait_for(asyncio.shield(self._sync_task), + timeout=self.config.sync_stop_grace_seconds) + except (asyncio.TimeoutError, asyncio.CancelledError): + self._sync_task.cancel() + try: + await self._sync_task + except asyncio.CancelledError: + pass + if self.client: + await self.client.close() + + def _is_workspace_path_allowed(self, path: Path) -> bool: + """Check path is inside workspace (when restriction enabled).""" + if not self._restrict_to_workspace or not self._workspace: + return True + try: + path.resolve(strict=False).relative_to(self._workspace) + return True + except ValueError: + return False + + def _collect_outbound_media_candidates(self, media: list[str]) -> list[Path]: + """Deduplicate and resolve outbound attachment paths.""" + seen: set[str] = set() + candidates: list[Path] = [] + for raw in media: + if not isinstance(raw, str) or not raw.strip(): + continue + path = Path(raw.strip()).expanduser() + try: + key = str(path.resolve(strict=False)) + except OSError: + key = str(path) + if key not in seen: + seen.add(key) + candidates.append(path) + return candidates + + @staticmethod + def _build_outbound_attachment_content( + *, filename: str, mime: str, size_bytes: int, + mxc_url: str, encryption_info: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Build Matrix content payload for an uploaded file/image/audio/video.""" + prefix = mime.split("/")[0] + msgtype = {"image": "m.image", "audio": "m.audio", "video": "m.video"}.get(prefix, "m.file") + content: dict[str, Any] = { + "msgtype": msgtype, "body": filename, "filename": filename, + "info": {"mimetype": mime, "size": size_bytes}, "m.mentions": {}, + } + if encryption_info: + content["file"] = {**encryption_info, "url": mxc_url} + else: + content["url"] = mxc_url + return content + + def _is_encrypted_room(self, room_id: str) -> bool: + if not self.client: + return False + room = getattr(self.client, "rooms", {}).get(room_id) + return bool(getattr(room, "encrypted", False)) + + async def _send_room_content(self, room_id: str, content: dict[str, Any]) -> None: + """Send m.room.message with E2EE options.""" + if not self.client: + return + kwargs: dict[str, Any] = {"room_id": room_id, "message_type": "m.room.message", "content": content} + if self.config.e2ee_enabled: + kwargs["ignore_unverified_devices"] = True + await self.client.room_send(**kwargs) + + async def _resolve_server_upload_limit_bytes(self) -> int | None: + """Query homeserver upload limit once per channel lifecycle.""" + if self._server_upload_limit_checked: + return self._server_upload_limit_bytes + self._server_upload_limit_checked = True + if not self.client: + return None + try: + response = await self.client.content_repository_config() + except Exception: + return None + upload_size = getattr(response, "upload_size", None) + if isinstance(upload_size, int) and upload_size > 0: + self._server_upload_limit_bytes = upload_size + return upload_size + return None + + async def _effective_media_limit_bytes(self) -> int: + """min(local config, server advertised) — 0 blocks all uploads.""" + local_limit = max(int(self.config.max_media_bytes), 0) + server_limit = await self._resolve_server_upload_limit_bytes() + if server_limit is None: + return local_limit + return min(local_limit, server_limit) if local_limit else 0 + + async def _upload_and_send_attachment( + self, room_id: str, path: Path, limit_bytes: int, + relates_to: dict[str, Any] | None = None, + ) -> str | None: + """Upload one local file to Matrix and send it as a media message. Returns failure marker or None.""" + if not self.client: + return _ATTACH_UPLOAD_FAILED.format(path.name or _DEFAULT_ATTACH_NAME) + + resolved = path.expanduser().resolve(strict=False) + filename = safe_filename(resolved.name) or _DEFAULT_ATTACH_NAME + fail = _ATTACH_UPLOAD_FAILED.format(filename) + + if not resolved.is_file() or not self._is_workspace_path_allowed(resolved): + return fail + try: + size_bytes = resolved.stat().st_size + except OSError: + return fail + if limit_bytes <= 0 or size_bytes > limit_bytes: + return _ATTACH_TOO_LARGE.format(filename) + + mime = mimetypes.guess_type(filename, strict=False)[0] or "application/octet-stream" + try: + with resolved.open("rb") as f: + upload_result = await self.client.upload( + f, content_type=mime, filename=filename, + encrypt=self.config.e2ee_enabled and self._is_encrypted_room(room_id), + filesize=size_bytes, + ) + except Exception: + return fail + + upload_response = upload_result[0] if isinstance(upload_result, tuple) else upload_result + encryption_info = upload_result[1] if isinstance(upload_result, tuple) and isinstance(upload_result[1], dict) else None + if isinstance(upload_response, UploadError): + return fail + mxc_url = getattr(upload_response, "content_uri", None) + if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): + return fail + + content = self._build_outbound_attachment_content( + filename=filename, mime=mime, size_bytes=size_bytes, + mxc_url=mxc_url, encryption_info=encryption_info, + ) + if relates_to: + content["m.relates_to"] = relates_to + try: + await self._send_room_content(room_id, content) + except Exception: + return fail + return None + + async def send(self, msg: OutboundMessage) -> None: + """Send outbound content; clear typing for non-progress messages.""" + if not self.client: + return + text = msg.content or "" + candidates = self._collect_outbound_media_candidates(msg.media) + relates_to = self._build_thread_relates_to(msg.metadata) + is_progress = bool((msg.metadata or {}).get("_progress")) + try: + failures: list[str] = [] + if candidates: + limit_bytes = await self._effective_media_limit_bytes() + for path in candidates: + if fail := await self._upload_and_send_attachment( + room_id=msg.chat_id, + path=path, + limit_bytes=limit_bytes, + relates_to=relates_to, + ): + failures.append(fail) + if failures: + text = f"{text.rstrip()}\n{chr(10).join(failures)}" if text.strip() else "\n".join(failures) + if text or not candidates: + content = _build_matrix_text_content(text) + if relates_to: + content["m.relates_to"] = relates_to + await self._send_room_content(msg.chat_id, content) + finally: + if not is_progress: + await self._stop_typing_keepalive(msg.chat_id, clear_typing=True) + + def _register_event_callbacks(self) -> None: + self.client.add_event_callback(self._on_message, RoomMessageText) + self.client.add_event_callback(self._on_media_message, MATRIX_MEDIA_EVENT_FILTER) + self.client.add_event_callback(self._on_room_invite, InviteEvent) + + def _register_response_callbacks(self) -> None: + self.client.add_response_callback(self._on_sync_error, SyncError) + self.client.add_response_callback(self._on_join_error, JoinError) + self.client.add_response_callback(self._on_send_error, RoomSendError) + + def _log_response_error(self, label: str, response: Any) -> None: + """Log Matrix response errors — auth errors at ERROR level, rest at WARNING.""" + code = getattr(response, "status_code", None) + is_auth = code in {"M_UNKNOWN_TOKEN", "M_FORBIDDEN", "M_UNAUTHORIZED"} + is_fatal = is_auth or getattr(response, "soft_logout", False) + (logger.error if is_fatal else logger.warning)("Matrix {} failed: {}", label, response) + + async def _on_sync_error(self, response: SyncError) -> None: + self._log_response_error("sync", response) + + async def _on_join_error(self, response: JoinError) -> None: + self._log_response_error("join", response) + + async def _on_send_error(self, response: RoomSendError) -> None: + self._log_response_error("send", response) + + async def _set_typing(self, room_id: str, typing: bool) -> None: + """Best-effort typing indicator update.""" + if not self.client: + return + try: + response = await self.client.room_typing(room_id=room_id, typing_state=typing, + timeout=TYPING_NOTICE_TIMEOUT_MS) + if isinstance(response, RoomTypingError): + logger.debug("Matrix typing failed for {}: {}", room_id, response) + except Exception: + pass + + async def _start_typing_keepalive(self, room_id: str) -> None: + """Start periodic typing refresh (spec-recommended keepalive).""" + await self._stop_typing_keepalive(room_id, clear_typing=False) + await self._set_typing(room_id, True) + if not self._running: + return + + async def loop() -> None: + try: + while self._running: + await asyncio.sleep(TYPING_KEEPALIVE_INTERVAL_MS / 1000) + await self._set_typing(room_id, True) + except asyncio.CancelledError: + pass + + self._typing_tasks[room_id] = asyncio.create_task(loop()) + + async def _stop_typing_keepalive(self, room_id: str, *, clear_typing: bool) -> None: + if task := self._typing_tasks.pop(room_id, None): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + if clear_typing: + await self._set_typing(room_id, False) + + async def _sync_loop(self) -> None: + while self._running: + try: + await self.client.sync_forever(timeout=30000, full_state=True) + except asyncio.CancelledError: + break + except Exception: + await asyncio.sleep(2) + + async def _on_room_invite(self, room: MatrixRoom, event: InviteEvent) -> None: + if self.is_allowed(event.sender): + await self.client.join(room.room_id) + + def _is_direct_room(self, room: MatrixRoom) -> bool: + count = getattr(room, "member_count", None) + return isinstance(count, int) and count <= 2 + + def _is_bot_mentioned(self, event: RoomMessage) -> bool: + """Check m.mentions payload for bot mention.""" + source = getattr(event, "source", None) + if not isinstance(source, dict): + return False + mentions = (source.get("content") or {}).get("m.mentions") + if not isinstance(mentions, dict): + return False + user_ids = mentions.get("user_ids") + if isinstance(user_ids, list) and self.config.user_id in user_ids: + return True + return bool(self.config.allow_room_mentions and mentions.get("room") is True) + + def _should_process_message(self, room: MatrixRoom, event: RoomMessage) -> bool: + """Apply sender and room policy checks.""" + if not self.is_allowed(event.sender): + return False + if self._is_direct_room(room): + return True + policy = self.config.group_policy + if policy == "open": + return True + if policy == "allowlist": + return room.room_id in (self.config.group_allow_from or []) + if policy == "mention": + return self._is_bot_mentioned(event) + return False + + def _media_dir(self) -> Path: + d = get_data_dir() / "media" / "matrix" + d.mkdir(parents=True, exist_ok=True) + return d + + @staticmethod + def _event_source_content(event: RoomMessage) -> dict[str, Any]: + source = getattr(event, "source", None) + if not isinstance(source, dict): + return {} + content = source.get("content") + return content if isinstance(content, dict) else {} + + def _event_thread_root_id(self, event: RoomMessage) -> str | None: + relates_to = self._event_source_content(event).get("m.relates_to") + if not isinstance(relates_to, dict) or relates_to.get("rel_type") != "m.thread": + return None + root_id = relates_to.get("event_id") + return root_id if isinstance(root_id, str) and root_id else None + + def _thread_metadata(self, event: RoomMessage) -> dict[str, str] | None: + if not (root_id := self._event_thread_root_id(event)): + return None + meta: dict[str, str] = {"thread_root_event_id": root_id} + if isinstance(reply_to := getattr(event, "event_id", None), str) and reply_to: + meta["thread_reply_to_event_id"] = reply_to + return meta + + @staticmethod + def _build_thread_relates_to(metadata: dict[str, Any] | None) -> dict[str, Any] | None: + if not metadata: + return None + root_id = metadata.get("thread_root_event_id") + if not isinstance(root_id, str) or not root_id: + return None + reply_to = metadata.get("thread_reply_to_event_id") or metadata.get("event_id") + if not isinstance(reply_to, str) or not reply_to: + return None + return {"rel_type": "m.thread", "event_id": root_id, + "m.in_reply_to": {"event_id": reply_to}, "is_falling_back": True} + + def _event_attachment_type(self, event: MatrixMediaEvent) -> str: + msgtype = self._event_source_content(event).get("msgtype") + return _MSGTYPE_MAP.get(msgtype, "file") + + @staticmethod + def _is_encrypted_media_event(event: MatrixMediaEvent) -> bool: + return (isinstance(getattr(event, "key", None), dict) + and isinstance(getattr(event, "hashes", None), dict) + and isinstance(getattr(event, "iv", None), str)) + + def _event_declared_size_bytes(self, event: MatrixMediaEvent) -> int | None: + info = self._event_source_content(event).get("info") + size = info.get("size") if isinstance(info, dict) else None + return size if isinstance(size, int) and size >= 0 else None + + def _event_mime(self, event: MatrixMediaEvent) -> str | None: + info = self._event_source_content(event).get("info") + if isinstance(info, dict) and isinstance(m := info.get("mimetype"), str) and m: + return m + m = getattr(event, "mimetype", None) + return m if isinstance(m, str) and m else None + + def _event_filename(self, event: MatrixMediaEvent, attachment_type: str) -> str: + body = getattr(event, "body", None) + if isinstance(body, str) and body.strip(): + if candidate := safe_filename(Path(body).name): + return candidate + return _DEFAULT_ATTACH_NAME if attachment_type == "file" else attachment_type + + def _build_attachment_path(self, event: MatrixMediaEvent, attachment_type: str, + filename: str, mime: str | None) -> Path: + safe_name = safe_filename(Path(filename).name) or _DEFAULT_ATTACH_NAME + suffix = Path(safe_name).suffix + if not suffix and mime: + if guessed := mimetypes.guess_extension(mime, strict=False): + safe_name, suffix = f"{safe_name}{guessed}", guessed + stem = (Path(safe_name).stem or attachment_type)[:72] + suffix = suffix[:16] + event_id = safe_filename(str(getattr(event, "event_id", "") or "evt").lstrip("$")) + event_prefix = (event_id[:24] or "evt").strip("_") + return self._media_dir() / f"{event_prefix}_{stem}{suffix}" + + async def _download_media_bytes(self, mxc_url: str) -> bytes | None: + if not self.client: + return None + response = await self.client.download(mxc=mxc_url) + if isinstance(response, DownloadError): + logger.warning("Matrix download failed for {}: {}", mxc_url, response) + return None + body = getattr(response, "body", None) + if isinstance(body, (bytes, bytearray)): + return bytes(body) + if isinstance(response, MemoryDownloadResponse): + return bytes(response.body) + if isinstance(body, (str, Path)): + path = Path(body) + if path.is_file(): + try: + return path.read_bytes() + except OSError: + return None + return None + + def _decrypt_media_bytes(self, event: MatrixMediaEvent, ciphertext: bytes) -> bytes | None: + key_obj, hashes, iv = getattr(event, "key", None), getattr(event, "hashes", None), getattr(event, "iv", None) + key = key_obj.get("k") if isinstance(key_obj, dict) else None + sha256 = hashes.get("sha256") if isinstance(hashes, dict) else None + if not all(isinstance(v, str) for v in (key, sha256, iv)): + return None + try: + return decrypt_attachment(ciphertext, key, sha256, iv) + except (EncryptionError, ValueError, TypeError): + logger.warning("Matrix decrypt failed for event {}", getattr(event, "event_id", "")) + return None + + async def _fetch_media_attachment( + self, room: MatrixRoom, event: MatrixMediaEvent, + ) -> tuple[dict[str, Any] | None, str]: + """Download, decrypt if needed, and persist a Matrix attachment.""" + atype = self._event_attachment_type(event) + mime = self._event_mime(event) + filename = self._event_filename(event, atype) + mxc_url = getattr(event, "url", None) + fail = _ATTACH_FAILED.format(filename) + + if not isinstance(mxc_url, str) or not mxc_url.startswith("mxc://"): + return None, fail + + limit_bytes = await self._effective_media_limit_bytes() + declared = self._event_declared_size_bytes(event) + if declared is not None and declared > limit_bytes: + return None, _ATTACH_TOO_LARGE.format(filename) + + downloaded = await self._download_media_bytes(mxc_url) + if downloaded is None: + return None, fail + + encrypted = self._is_encrypted_media_event(event) + data = downloaded + if encrypted: + if (data := self._decrypt_media_bytes(event, downloaded)) is None: + return None, fail + + if len(data) > limit_bytes: + return None, _ATTACH_TOO_LARGE.format(filename) + + path = self._build_attachment_path(event, atype, filename, mime) + try: + path.write_bytes(data) + except OSError: + return None, fail + + attachment = { + "type": atype, "mime": mime, "filename": filename, + "event_id": str(getattr(event, "event_id", "") or ""), + "encrypted": encrypted, "size_bytes": len(data), + "path": str(path), "mxc_url": mxc_url, + } + return attachment, _ATTACH_MARKER.format(path) + + def _base_metadata(self, room: MatrixRoom, event: RoomMessage) -> dict[str, Any]: + """Build common metadata for text and media handlers.""" + meta: dict[str, Any] = {"room": getattr(room, "display_name", room.room_id)} + if isinstance(eid := getattr(event, "event_id", None), str) and eid: + meta["event_id"] = eid + if thread := self._thread_metadata(event): + meta.update(thread) + return meta + + async def _on_message(self, room: MatrixRoom, event: RoomMessageText) -> None: + if event.sender == self.config.user_id or not self._should_process_message(room, event): + return + await self._start_typing_keepalive(room.room_id) + try: + await self._handle_message( + sender_id=event.sender, chat_id=room.room_id, + content=event.body, metadata=self._base_metadata(room, event), + ) + except Exception: + await self._stop_typing_keepalive(room.room_id, clear_typing=True) + raise + + async def _on_media_message(self, room: MatrixRoom, event: MatrixMediaEvent) -> None: + if event.sender == self.config.user_id or not self._should_process_message(room, event): + return + attachment, marker = await self._fetch_media_attachment(room, event) + parts: list[str] = [] + if isinstance(body := getattr(event, "body", None), str) and body.strip(): + parts.append(body.strip()) + if marker: + parts.append(marker) + + await self._start_typing_keepalive(room.room_id) + try: + meta = self._base_metadata(room, event) + meta["attachments"] = [] + if attachment: + meta["attachments"] = [attachment] + await self._handle_message( + sender_id=event.sender, chat_id=room.room_id, + content="\n".join(parts), + media=[attachment["path"]] if attachment else [], + metadata=meta, + ) + except Exception: + await self._stop_typing_keepalive(room.room_id, clear_typing=True) + raise diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 30c3dbf..e762dfd 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -322,7 +322,7 @@ class MochatChannel(BaseChannel): await self._api_send("/api/claw/sessions/send", "sessionId", target.id, content, msg.reply_to) except Exception as e: - logger.error(f"Failed to send Mochat message: {e}") + logger.error("Failed to send Mochat message: {}", e) # ---- config / init helpers --------------------------------------------- @@ -380,7 +380,7 @@ class MochatChannel(BaseChannel): @client.event async def connect_error(data: Any) -> None: - logger.error(f"Mochat websocket connect error: {data}") + logger.error("Mochat websocket connect error: {}", data) @client.on("claw.session.events") async def on_session_events(payload: dict[str, Any]) -> None: @@ -407,7 +407,7 @@ class MochatChannel(BaseChannel): ) return True except Exception as e: - logger.error(f"Failed to connect Mochat websocket: {e}") + logger.error("Failed to connect Mochat websocket: {}", e) try: await client.disconnect() except Exception: @@ -444,7 +444,7 @@ class MochatChannel(BaseChannel): "limit": self.config.watch_limit, }) if not ack.get("result"): - logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") + logger.error("Mochat subscribeSessions failed: {}", ack.get('message', 'unknown error')) return False data = ack.get("data") @@ -466,7 +466,7 @@ class MochatChannel(BaseChannel): return True ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) if not ack.get("result"): - logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") + logger.error("Mochat subscribePanels failed: {}", ack.get('message', 'unknown error')) return False return True @@ -488,7 +488,7 @@ class MochatChannel(BaseChannel): try: await self._refresh_targets(subscribe_new=self._ws_ready) except Exception as e: - logger.warning(f"Mochat refresh failed: {e}") + logger.warning("Mochat refresh failed: {}", e) if self._fallback_mode: await self._ensure_fallback_workers() @@ -502,7 +502,7 @@ class MochatChannel(BaseChannel): try: response = await self._post_json("/api/claw/sessions/list", {}) except Exception as e: - logger.warning(f"Mochat listSessions failed: {e}") + logger.warning("Mochat listSessions failed: {}", e) return sessions = response.get("sessions") @@ -536,7 +536,7 @@ class MochatChannel(BaseChannel): try: response = await self._post_json("/api/claw/groups/get", {}) except Exception as e: - logger.warning(f"Mochat getWorkspaceGroup failed: {e}") + logger.warning("Mochat getWorkspaceGroup failed: {}", e) return raw_panels = response.get("panels") @@ -598,7 +598,7 @@ class MochatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Mochat watch fallback error ({session_id}): {e}") + logger.warning("Mochat watch fallback error ({}): {}", session_id, e) await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) async def _panel_poll_worker(self, panel_id: str) -> None: @@ -625,7 +625,7 @@ class MochatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Mochat panel polling error ({panel_id}): {e}") + logger.warning("Mochat panel polling error ({}): {}", panel_id, e) await asyncio.sleep(sleep_s) # ---- inbound event processing ------------------------------------------ @@ -836,7 +836,7 @@ class MochatChannel(BaseChannel): try: data = json.loads(self._cursor_path.read_text("utf-8")) except Exception as e: - logger.warning(f"Failed to read Mochat cursor file: {e}") + logger.warning("Failed to read Mochat cursor file: {}", e) return cursors = data.get("cursors") if isinstance(data, dict) else None if isinstance(cursors, dict): @@ -852,7 +852,7 @@ class MochatChannel(BaseChannel): "cursors": self._session_cursor, }, ensure_ascii=False, indent=2) + "\n", "utf-8") except Exception as e: - logger.warning(f"Failed to save Mochat cursor file: {e}") + logger.warning("Failed to save Mochat cursor file: {}", e) # ---- HTTP helpers ------------------------------------------------------ diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 0e8fe66..4809fd3 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -13,16 +13,17 @@ from nanobot.config.schema import QQConfig try: import botpy - from botpy.message import C2CMessage + from botpy.message import C2CMessage, GroupMessage QQ_AVAILABLE = True except ImportError: QQ_AVAILABLE = False botpy = None C2CMessage = None + GroupMessage = None if TYPE_CHECKING: - from botpy.message import C2CMessage + from botpy.message import C2CMessage, GroupMessage def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": @@ -31,16 +32,20 @@ def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]": class _Bot(botpy.Client): def __init__(self): - super().__init__(intents=intents) + # Disable botpy's file log — nanobot uses loguru; default "botpy.log" fails on read-only fs + super().__init__(intents=intents, ext_handlers=False) async def on_ready(self): - logger.info(f"QQ bot ready: {self.robot.name}") + logger.info("QQ bot ready: {}", self.robot.name) async def on_c2c_message_create(self, message: "C2CMessage"): - await channel._on_message(message) + await channel._on_message(message, is_group=False) + + async def on_group_at_message_create(self, message: "GroupMessage"): + await channel._on_message(message, is_group=True) async def on_direct_message_create(self, message): - await channel._on_message(message) + await channel._on_message(message, is_group=False) return _Bot @@ -55,7 +60,8 @@ class QQChannel(BaseChannel): self.config: QQConfig = config self._client: "botpy.Client | None" = None self._processed_ids: deque = deque(maxlen=1000) - self._bot_task: asyncio.Task | None = None + self._msg_seq: int = 1 # 消息序列号,避免被 QQ API 去重 + self._chat_type_cache: dict[str, str] = {} async def start(self) -> None: """Start the QQ bot.""" @@ -70,9 +76,8 @@ class QQChannel(BaseChannel): self._running = True BotClass = _make_bot_class(self) self._client = BotClass() - - self._bot_task = asyncio.create_task(self._run_bot()) - logger.info("QQ bot started (C2C private message)") + logger.info("QQ bot started (C2C & Group supported)") + await self._run_bot() async def _run_bot(self) -> None: """Run the bot connection with auto-reconnect.""" @@ -80,7 +85,7 @@ class QQChannel(BaseChannel): try: await self._client.start(appid=self.config.app_id, secret=self.config.secret) except Exception as e: - logger.warning(f"QQ bot error: {e}") + logger.warning("QQ bot error: {}", e) if self._running: logger.info("Reconnecting QQ bot in 5 seconds...") await asyncio.sleep(5) @@ -88,11 +93,10 @@ class QQChannel(BaseChannel): async def stop(self) -> None: """Stop the QQ bot.""" self._running = False - if self._bot_task: - self._bot_task.cancel() + if self._client: try: - await self._bot_task - except asyncio.CancelledError: + await self._client.close() + except Exception: pass logger.info("QQ bot stopped") @@ -101,16 +105,31 @@ class QQChannel(BaseChannel): if not self._client: logger.warning("QQ client not initialized") return - try: - await self._client.api.post_c2c_message( - openid=msg.chat_id, - msg_type=0, - content=msg.content, - ) - except Exception as e: - logger.error(f"Error sending QQ message: {e}") - async def _on_message(self, data: "C2CMessage") -> None: + try: + msg_id = msg.metadata.get("message_id") + self._msg_seq += 1 + msg_type = self._chat_type_cache.get(msg.chat_id, "c2c") + if msg_type == "group": + await self._client.api.post_group_message( + group_openid=msg.chat_id, + msg_type=0, + content=msg.content, + msg_id=msg_id, + msg_seq=self._msg_seq, + ) + else: + await self._client.api.post_c2c_message( + openid=msg.chat_id, + msg_type=0, + content=msg.content, + msg_id=msg_id, + msg_seq=self._msg_seq, + ) + except Exception as e: + logger.error("Error sending QQ message: {}", e) + + async def _on_message(self, data: "C2CMessage | GroupMessage", is_group: bool = False) -> None: """Handle incoming message from QQ.""" try: # Dedup by message ID @@ -118,17 +137,24 @@ class QQChannel(BaseChannel): return self._processed_ids.append(data.id) - author = data.author - user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown')) content = (data.content or "").strip() if not content: return + if is_group: + chat_id = data.group_openid + user_id = data.author.member_openid + self._chat_type_cache[chat_id] = "group" + else: + chat_id = str(getattr(data.author, 'id', None) or getattr(data.author, 'user_openid', 'unknown')) + user_id = chat_id + self._chat_type_cache[chat_id] = "c2c" + await self._handle_message( sender_id=user_id, - chat_id=user_id, + chat_id=chat_id, content=content, metadata={"message_id": data.id}, ) - except Exception as e: - logger.error(f"Error handling QQ message: {e}") + except Exception: + logger.exception("Error handling QQ message") diff --git a/nanobot/channels/slack.py b/nanobot/channels/slack.py index fb86b3a..a4e7324 100644 --- a/nanobot/channels/slack.py +++ b/nanobot/channels/slack.py @@ -5,10 +5,11 @@ import re from typing import Any from loguru import logger -from slack_sdk.socket_mode.websockets import SocketModeClient from slack_sdk.socket_mode.request import SocketModeRequest from slack_sdk.socket_mode.response import SocketModeResponse +from slack_sdk.socket_mode.websockets import SocketModeClient from slack_sdk.web.async_client import AsyncWebClient +from slackify_markdown import slackify_markdown from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus @@ -34,7 +35,7 @@ class SlackChannel(BaseChannel): logger.error("Slack bot/app token not configured") return if self.config.mode != "socket": - logger.error(f"Unsupported Slack mode: {self.config.mode}") + logger.error("Unsupported Slack mode: {}", self.config.mode) return self._running = True @@ -51,9 +52,9 @@ class SlackChannel(BaseChannel): try: auth = await self._web_client.auth_test() self._bot_user_id = auth.get("user_id") - logger.info(f"Slack bot connected as {self._bot_user_id}") + logger.info("Slack bot connected as {}", self._bot_user_id) except Exception as e: - logger.warning(f"Slack auth_test failed: {e}") + logger.warning("Slack auth_test failed: {}", e) logger.info("Starting Slack Socket Mode client...") await self._socket_client.connect() @@ -68,7 +69,7 @@ class SlackChannel(BaseChannel): try: await self._socket_client.close() except Exception as e: - logger.warning(f"Slack socket close failed: {e}") + logger.warning("Slack socket close failed: {}", e) self._socket_client = None async def send(self, msg: OutboundMessage) -> None: @@ -81,14 +82,28 @@ class SlackChannel(BaseChannel): thread_ts = slack_meta.get("thread_ts") channel_type = slack_meta.get("channel_type") # Only reply in thread for channel/group messages; DMs don't use threads - use_thread = thread_ts and channel_type != "im" - await self._web_client.chat_postMessage( - channel=msg.chat_id, - text=msg.content or " display."""
+
+ def dw(s: str) -> int:
+ return sum(2 if unicodedata.east_asian_width(c) in ('W', 'F') else 1 for c in s)
+
+ rows: list[list[str]] = []
+ has_sep = False
+ for line in table_lines:
+ cells = [_strip_md(c) for c in line.strip().strip('|').split('|')]
+ if all(re.match(r'^:?-+:?$', c) for c in cells if c):
+ has_sep = True
+ continue
+ rows.append(cells)
+ if not rows or not has_sep:
+ return '\n'.join(table_lines)
+
+ ncols = max(len(r) for r in rows)
+ for r in rows:
+ r.extend([''] * (ncols - len(r)))
+ widths = [max(dw(r[c]) for r in rows) for c in range(ncols)]
+
+ def dr(cells: list[str]) -> str:
+ return ' '.join(f'{c}{" " * (w - dw(c))}' for c, w in zip(cells, widths))
+
+ out = [dr(rows[0])]
+ out.append(' '.join('─' * w for w in widths))
+ for row in rows[1:]:
+ out.append(dr(row))
+ return '\n'.join(out)
def _markdown_to_telegram_html(text: str) -> str:
@@ -21,79 +68,101 @@ def _markdown_to_telegram_html(text: str) -> str:
"""
if not text:
return ""
-
+
# 1. Extract and protect code blocks (preserve content from other processing)
code_blocks: list[str] = []
def save_code_block(m: re.Match) -> str:
code_blocks.append(m.group(1))
return f"\x00CB{len(code_blocks) - 1}\x00"
-
+
text = re.sub(r'```[\w]*\n?([\s\S]*?)```', save_code_block, text)
-
+
+ # 1.5. Convert markdown tables to box-drawing (reuse code_block placeholders)
+ lines = text.split('\n')
+ rebuilt: list[str] = []
+ li = 0
+ while li < len(lines):
+ if re.match(r'^\s*\|.+\|', lines[li]):
+ tbl: list[str] = []
+ while li < len(lines) and re.match(r'^\s*\|.+\|', lines[li]):
+ tbl.append(lines[li])
+ li += 1
+ box = _render_table_box(tbl)
+ if box != '\n'.join(tbl):
+ code_blocks.append(box)
+ rebuilt.append(f"\x00CB{len(code_blocks) - 1}\x00")
+ else:
+ rebuilt.extend(tbl)
+ else:
+ rebuilt.append(lines[li])
+ li += 1
+ text = '\n'.join(rebuilt)
+
# 2. Extract and protect inline code
inline_codes: list[str] = []
def save_inline_code(m: re.Match) -> str:
inline_codes.append(m.group(1))
return f"\x00IC{len(inline_codes) - 1}\x00"
-
+
text = re.sub(r'`([^`]+)`', save_inline_code, text)
-
+
# 3. Headers # Title -> just the title text
text = re.sub(r'^#{1,6}\s+(.+)$', r'\1', text, flags=re.MULTILINE)
-
+
# 4. Blockquotes > text -> just the text (before HTML escaping)
text = re.sub(r'^>\s*(.*)$', r'\1', text, flags=re.MULTILINE)
-
+
# 5. Escape HTML special characters
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
-
+
# 6. Links [text](url) - must be before bold/italic to handle nested cases
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text)
-
+
# 7. Bold **text** or __text__
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
-
+
# 8. Italic _text_ (avoid matching inside words like some_var_name)
text = re.sub(r'(?\1', text)
-
+
# 9. Strikethrough ~~text~~
text = re.sub(r'~~(.+?)~~', r'\1', text)
-
+
# 10. Bullet lists - item -> • item
text = re.sub(r'^[-*]\s+', '• ', text, flags=re.MULTILINE)
-
+
# 11. Restore inline code with HTML tags
for i, code in enumerate(inline_codes):
# Escape HTML in code content
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00IC{i}\x00", f"{escaped}")
-
+
# 12. Restore code blocks with HTML tags
for i, code in enumerate(code_blocks):
# Escape HTML in code content
escaped = code.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(f"\x00CB{i}\x00", f"{escaped}
")
-
+
return text
class TelegramChannel(BaseChannel):
"""
Telegram channel using long polling.
-
+
Simple and reliable - no webhook/public IP needed.
"""
-
+
name = "telegram"
-
+
# Commands registered with Telegram's command menu
BOT_COMMANDS = [
BotCommand("start", "Start the bot"),
BotCommand("new", "Start a new conversation"),
+ BotCommand("stop", "Stop the current task"),
BotCommand("help", "Show available commands"),
]
-
+
def __init__(
self,
config: TelegramConfig,
@@ -106,163 +175,345 @@ class TelegramChannel(BaseChannel):
self._app: Application | None = None
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
-
+ self._media_group_buffers: dict[str, dict] = {}
+ self._media_group_tasks: dict[str, asyncio.Task] = {}
+ self._message_threads: dict[tuple[str, int], int] = {}
+
+ def is_allowed(self, sender_id: str) -> bool:
+ """Preserve Telegram's legacy id|username allowlist matching."""
+ if super().is_allowed(sender_id):
+ return True
+
+ allow_list = getattr(self.config, "allow_from", [])
+ if not allow_list or "*" in allow_list:
+ return False
+
+ sender_str = str(sender_id)
+ if sender_str.count("|") != 1:
+ return False
+
+ sid, username = sender_str.split("|", 1)
+ if not sid.isdigit() or not username:
+ return False
+
+ return sid in allow_list or username in allow_list
+
async def start(self) -> None:
"""Start the Telegram bot with long polling."""
if not self.config.token:
logger.error("Telegram bot token not configured")
return
-
+
self._running = True
-
+
# Build the application with larger connection pool to avoid pool-timeout on long runs
- req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0)
+ req = HTTPXRequest(
+ connection_pool_size=16,
+ pool_timeout=5.0,
+ connect_timeout=30.0,
+ read_timeout=30.0,
+ proxy=self.config.proxy if self.config.proxy else None,
+ )
builder = Application.builder().token(self.config.token).request(req).get_updates_request(req)
- if self.config.proxy:
- builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
self._app = builder.build()
self._app.add_error_handler(self._on_error)
-
+
# Add command handlers
self._app.add_handler(CommandHandler("start", self._on_start))
self._app.add_handler(CommandHandler("new", self._forward_command))
- self._app.add_handler(CommandHandler("help", self._forward_command))
-
+ self._app.add_handler(CommandHandler("stop", self._forward_command))
+ self._app.add_handler(CommandHandler("help", self._on_help))
+
# Add message handler for text, photos, voice, documents
self._app.add_handler(
MessageHandler(
- (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL)
- & ~filters.COMMAND,
+ (filters.TEXT | filters.PHOTO | filters.VOICE | filters.AUDIO | filters.Document.ALL)
+ & ~filters.COMMAND,
self._on_message
)
)
-
+
logger.info("Starting Telegram bot (polling mode)...")
-
+
# Initialize and start polling
await self._app.initialize()
await self._app.start()
-
+
# Get bot info and register command menu
bot_info = await self._app.bot.get_me()
- logger.info(f"Telegram bot @{bot_info.username} connected")
-
+ logger.info("Telegram bot @{} connected", bot_info.username)
+
try:
await self._app.bot.set_my_commands(self.BOT_COMMANDS)
logger.debug("Telegram bot commands registered")
except Exception as e:
- logger.warning(f"Failed to register bot commands: {e}")
-
+ logger.warning("Failed to register bot commands: {}", e)
+
# Start polling (this runs until stopped)
await self._app.updater.start_polling(
allowed_updates=["message"],
drop_pending_updates=True # Ignore old messages on startup
)
-
+
# Keep running until stopped
while self._running:
await asyncio.sleep(1)
-
+
async def stop(self) -> None:
"""Stop the Telegram bot."""
self._running = False
-
+
# Cancel all typing indicators
for chat_id in list(self._typing_tasks):
self._stop_typing(chat_id)
-
+
+ for task in self._media_group_tasks.values():
+ task.cancel()
+ self._media_group_tasks.clear()
+ self._media_group_buffers.clear()
+
if self._app:
logger.info("Stopping Telegram bot...")
await self._app.updater.stop()
await self._app.stop()
await self._app.shutdown()
self._app = None
-
+
+ @staticmethod
+ def _get_media_type(path: str) -> str:
+ """Guess media type from file extension."""
+ ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
+ if ext in ("jpg", "jpeg", "png", "gif", "webp"):
+ return "photo"
+ if ext == "ogg":
+ return "voice"
+ if ext in ("mp3", "m4a", "wav", "aac"):
+ return "audio"
+ return "document"
+
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Telegram."""
if not self._app:
logger.warning("Telegram bot not running")
return
-
- # Stop typing indicator for this chat
- self._stop_typing(msg.chat_id)
-
+
+ # Only stop typing indicator for final responses
+ if not msg.metadata.get("_progress", False):
+ self._stop_typing(msg.chat_id)
+
try:
- # chat_id should be the Telegram chat ID (integer)
chat_id = int(msg.chat_id)
- # Convert markdown to Telegram HTML
- html_content = _markdown_to_telegram_html(msg.content)
- await self._app.bot.send_message(
- chat_id=chat_id,
- text=html_content,
- parse_mode="HTML"
- )
except ValueError:
- logger.error(f"Invalid chat_id: {msg.chat_id}")
+ logger.error("Invalid chat_id: {}", msg.chat_id)
+ return
+ reply_to_message_id = msg.metadata.get("message_id")
+ message_thread_id = msg.metadata.get("message_thread_id")
+ if message_thread_id is None and reply_to_message_id is not None:
+ message_thread_id = self._message_threads.get((msg.chat_id, reply_to_message_id))
+ thread_kwargs = {}
+ if message_thread_id is not None:
+ thread_kwargs["message_thread_id"] = message_thread_id
+
+ reply_params = None
+ if self.config.reply_to_message:
+ if reply_to_message_id:
+ reply_params = ReplyParameters(
+ message_id=reply_to_message_id,
+ allow_sending_without_reply=True
+ )
+
+ # Send media files
+ for media_path in (msg.media or []):
+ try:
+ media_type = self._get_media_type(media_path)
+ sender = {
+ "photo": self._app.bot.send_photo,
+ "voice": self._app.bot.send_voice,
+ "audio": self._app.bot.send_audio,
+ }.get(media_type, self._app.bot.send_document)
+ param = "photo" if media_type == "photo" else media_type if media_type in ("voice", "audio") else "document"
+ with open(media_path, 'rb') as f:
+ await sender(
+ chat_id=chat_id,
+ **{param: f},
+ reply_parameters=reply_params,
+ **thread_kwargs,
+ )
+ except Exception as e:
+ filename = media_path.rsplit("/", 1)[-1]
+ logger.error("Failed to send media {}: {}", media_path, e)
+ await self._app.bot.send_message(
+ chat_id=chat_id,
+ text=f"[Failed to send: {filename}]",
+ reply_parameters=reply_params,
+ **thread_kwargs,
+ )
+
+ # Send text content
+ if msg.content and msg.content != "[empty message]":
+ is_progress = msg.metadata.get("_progress", False)
+
+ for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
+ # Final response: simulate streaming via draft, then persist
+ if not is_progress:
+ await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs)
+ else:
+ await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
+
+ async def _send_text(
+ self,
+ chat_id: int,
+ text: str,
+ reply_params=None,
+ thread_kwargs: dict | None = None,
+ ) -> None:
+ """Send a plain text message with HTML fallback."""
+ try:
+ html = _markdown_to_telegram_html(text)
+ await self._app.bot.send_message(
+ chat_id=chat_id, text=html, parse_mode="HTML",
+ reply_parameters=reply_params,
+ **(thread_kwargs or {}),
+ )
except Exception as e:
- # Fallback to plain text if HTML parsing fails
- logger.warning(f"HTML parse failed, falling back to plain text: {e}")
+ logger.warning("HTML parse failed, falling back to plain text: {}", e)
try:
await self._app.bot.send_message(
- chat_id=int(msg.chat_id),
- text=msg.content
+ chat_id=chat_id,
+ text=text,
+ reply_parameters=reply_params,
+ **(thread_kwargs or {}),
)
except Exception as e2:
- logger.error(f"Error sending Telegram message: {e2}")
-
+ logger.error("Error sending Telegram message: {}", e2)
+
+ async def _send_with_streaming(
+ self,
+ chat_id: int,
+ text: str,
+ reply_params=None,
+ thread_kwargs: dict | None = None,
+ ) -> None:
+ """Simulate streaming via send_message_draft, then persist with send_message."""
+ draft_id = int(time.time() * 1000) % (2**31)
+ try:
+ step = max(len(text) // 8, 40)
+ for i in range(step, len(text), step):
+ await self._app.bot.send_message_draft(
+ chat_id=chat_id, draft_id=draft_id, text=text[:i],
+ )
+ await asyncio.sleep(0.04)
+ await self._app.bot.send_message_draft(
+ chat_id=chat_id, draft_id=draft_id, text=text,
+ )
+ await asyncio.sleep(0.15)
+ except Exception:
+ pass
+ await self._send_text(chat_id, text, reply_params, thread_kwargs)
+
async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /start command."""
if not update.message or not update.effective_user:
return
-
+
user = update.effective_user
await update.message.reply_text(
f"👋 Hi {user.first_name}! I'm nanobot.\n\n"
"Send me a message and I'll respond!\n"
"Type /help to see available commands."
)
-
+
+ async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ """Handle /help command, bypassing ACL so all users can access it."""
+ if not update.message:
+ return
+ await update.message.reply_text(
+ "🐈 nanobot commands:\n"
+ "/new — Start a new conversation\n"
+ "/stop — Stop the current task\n"
+ "/help — Show available commands"
+ )
+
+ @staticmethod
+ def _sender_id(user) -> str:
+ """Build sender_id with username for allowlist matching."""
+ sid = str(user.id)
+ return f"{sid}|{user.username}" if user.username else sid
+
+ @staticmethod
+ def _derive_topic_session_key(message) -> str | None:
+ """Derive topic-scoped session key for non-private Telegram chats."""
+ message_thread_id = getattr(message, "message_thread_id", None)
+ if message.chat.type == "private" or message_thread_id is None:
+ return None
+ return f"telegram:{message.chat_id}:topic:{message_thread_id}"
+
+ @staticmethod
+ def _build_message_metadata(message, user) -> dict:
+ """Build common Telegram inbound metadata payload."""
+ return {
+ "message_id": message.message_id,
+ "user_id": user.id,
+ "username": user.username,
+ "first_name": user.first_name,
+ "is_group": message.chat.type != "private",
+ "message_thread_id": getattr(message, "message_thread_id", None),
+ "is_forum": bool(getattr(message.chat, "is_forum", False)),
+ }
+
+ def _remember_thread_context(self, message) -> None:
+ """Cache topic thread id by chat/message id for follow-up replies."""
+ message_thread_id = getattr(message, "message_thread_id", None)
+ if message_thread_id is None:
+ return
+ key = (str(message.chat_id), message.message_id)
+ self._message_threads[key] = message_thread_id
+ if len(self._message_threads) > 1000:
+ self._message_threads.pop(next(iter(self._message_threads)))
+
async def _forward_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Forward slash commands to the bus for unified handling in AgentLoop."""
if not update.message or not update.effective_user:
return
+ message = update.message
+ user = update.effective_user
+ self._remember_thread_context(message)
await self._handle_message(
- sender_id=str(update.effective_user.id),
- chat_id=str(update.message.chat_id),
- content=update.message.text,
+ sender_id=self._sender_id(user),
+ chat_id=str(message.chat_id),
+ content=message.text,
+ metadata=self._build_message_metadata(message, user),
+ session_key=self._derive_topic_session_key(message),
)
-
+
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming messages (text, photos, voice, documents)."""
if not update.message or not update.effective_user:
return
-
+
message = update.message
user = update.effective_user
chat_id = message.chat_id
-
- # Use stable numeric ID, but keep username for allowlist compatibility
- sender_id = str(user.id)
- if user.username:
- sender_id = f"{sender_id}|{user.username}"
-
+ sender_id = self._sender_id(user)
+ self._remember_thread_context(message)
+
# Store chat_id for replies
self._chat_ids[sender_id] = chat_id
-
+
# Build content from text and/or media
content_parts = []
media_paths = []
-
+
# Text content
if message.text:
content_parts.append(message.text)
if message.caption:
content_parts.append(message.caption)
-
+
# Handle media files
media_file = None
media_type = None
-
+
if message.photo:
media_file = message.photo[-1] # Largest photo
media_type = "image"
@@ -275,77 +526,112 @@ class TelegramChannel(BaseChannel):
elif message.document:
media_file = message.document
media_type = "file"
-
+
# Download media if present
if media_file and self._app:
try:
file = await self._app.bot.get_file(media_file.file_id)
- ext = self._get_extension(media_type, getattr(media_file, 'mime_type', None))
-
+ ext = self._get_extension(
+ media_type,
+ getattr(media_file, 'mime_type', None),
+ getattr(media_file, 'file_name', None),
+ )
# Save to workspace/media/
from pathlib import Path
media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)
-
+
file_path = media_dir / f"{media_file.file_id[:16]}{ext}"
await file.download_to_drive(str(file_path))
-
+
media_paths.append(str(file_path))
-
+
# Handle voice transcription
if media_type == "voice" or media_type == "audio":
from nanobot.providers.transcription import GroqTranscriptionProvider
transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
transcription = await transcriber.transcribe(file_path)
if transcription:
- logger.info(f"Transcribed {media_type}: {transcription[:50]}...")
+ logger.info("Transcribed {}: {}...", media_type, transcription[:50])
content_parts.append(f"[transcription: {transcription}]")
else:
content_parts.append(f"[{media_type}: {file_path}]")
else:
content_parts.append(f"[{media_type}: {file_path}]")
-
- logger.debug(f"Downloaded {media_type} to {file_path}")
+
+ logger.debug("Downloaded {} to {}", media_type, file_path)
except Exception as e:
- logger.error(f"Failed to download media: {e}")
+ logger.error("Failed to download media: {}", e)
content_parts.append(f"[{media_type}: download failed]")
-
+
content = "\n".join(content_parts) if content_parts else "[empty message]"
-
- logger.debug(f"Telegram message from {sender_id}: {content[:50]}...")
-
+
+ logger.debug("Telegram message from {}: {}...", sender_id, content[:50])
+
str_chat_id = str(chat_id)
-
+ metadata = self._build_message_metadata(message, user)
+ session_key = self._derive_topic_session_key(message)
+
+ # Telegram media groups: buffer briefly, forward as one aggregated turn.
+ if media_group_id := getattr(message, "media_group_id", None):
+ key = f"{str_chat_id}:{media_group_id}"
+ if key not in self._media_group_buffers:
+ self._media_group_buffers[key] = {
+ "sender_id": sender_id, "chat_id": str_chat_id,
+ "contents": [], "media": [],
+ "metadata": metadata,
+ "session_key": session_key,
+ }
+ self._start_typing(str_chat_id)
+ buf = self._media_group_buffers[key]
+ if content and content != "[empty message]":
+ buf["contents"].append(content)
+ buf["media"].extend(media_paths)
+ if key not in self._media_group_tasks:
+ self._media_group_tasks[key] = asyncio.create_task(self._flush_media_group(key))
+ return
+
# Start typing indicator before processing
self._start_typing(str_chat_id)
-
+
# Forward to the message bus
await self._handle_message(
sender_id=sender_id,
chat_id=str_chat_id,
content=content,
media=media_paths,
- metadata={
- "message_id": message.message_id,
- "user_id": user.id,
- "username": user.username,
- "first_name": user.first_name,
- "is_group": message.chat.type != "private"
- }
+ metadata=metadata,
+ session_key=session_key,
)
-
+
+ async def _flush_media_group(self, key: str) -> None:
+ """Wait briefly, then forward buffered media-group as one turn."""
+ try:
+ await asyncio.sleep(0.6)
+ if not (buf := self._media_group_buffers.pop(key, None)):
+ return
+ content = "\n".join(buf["contents"]) or "[empty message]"
+ await self._handle_message(
+ sender_id=buf["sender_id"], chat_id=buf["chat_id"],
+ content=content, media=list(dict.fromkeys(buf["media"])),
+ metadata=buf["metadata"],
+ session_key=buf.get("session_key"),
+ )
+ finally:
+ self._media_group_tasks.pop(key, None)
+
def _start_typing(self, chat_id: str) -> None:
"""Start sending 'typing...' indicator for a chat."""
# Cancel any existing typing task for this chat
self._stop_typing(chat_id)
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
-
+
def _stop_typing(self, chat_id: str) -> None:
"""Stop the typing indicator for a chat."""
task = self._typing_tasks.pop(chat_id, None)
if task and not task.done():
task.cancel()
-
+
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled."""
try:
@@ -355,14 +641,19 @@ class TelegramChannel(BaseChannel):
except asyncio.CancelledError:
pass
except Exception as e:
- logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
-
+ logger.debug("Typing indicator stopped for {}: {}", chat_id, e)
+
async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log polling / handler errors instead of silently swallowing them."""
- logger.error(f"Telegram error: {context.error}")
+ logger.error("Telegram error: {}", context.error)
- def _get_extension(self, media_type: str, mime_type: str | None) -> str:
- """Get file extension based on media type."""
+ def _get_extension(
+ self,
+ media_type: str,
+ mime_type: str | None,
+ filename: str | None = None,
+ ) -> str:
+ """Get file extension based on media type or original filename."""
if mime_type:
ext_map = {
"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif",
@@ -370,6 +661,14 @@ class TelegramChannel(BaseChannel):
}
if mime_type in ext_map:
return ext_map[mime_type]
-
+
type_map = {"image": ".jpg", "voice": ".ogg", "audio": ".mp3", "file": ""}
- return type_map.get(media_type, "")
+ if ext := type_map.get(media_type, ""):
+ return ext
+
+ if filename:
+ from pathlib import Path
+
+ return "".join(Path(filename).suffixes)
+
+ return ""
diff --git a/nanobot/channels/whatsapp.py b/nanobot/channels/whatsapp.py
index 0cf2dd7..1307716 100644
--- a/nanobot/channels/whatsapp.py
+++ b/nanobot/channels/whatsapp.py
@@ -2,7 +2,8 @@
import asyncio
import json
-from typing import Any
+import mimetypes
+from collections import OrderedDict
from loguru import logger
@@ -15,29 +16,30 @@ from nanobot.config.schema import WhatsAppConfig
class WhatsAppChannel(BaseChannel):
"""
WhatsApp channel that connects to a Node.js bridge.
-
+
The bridge uses @whiskeysockets/baileys to handle the WhatsApp Web protocol.
Communication between Python and Node.js is via WebSocket.
"""
-
+
name = "whatsapp"
-
+
def __init__(self, config: WhatsAppConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: WhatsAppConfig = config
self._ws = None
self._connected = False
-
+ self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
+
async def start(self) -> None:
"""Start the WhatsApp channel by connecting to the bridge."""
import websockets
-
+
bridge_url = self.config.bridge_url
-
- logger.info(f"Connecting to WhatsApp bridge at {bridge_url}...")
-
+
+ logger.info("Connecting to WhatsApp bridge at {}...", bridge_url)
+
self._running = True
-
+
while self._running:
try:
async with websockets.connect(bridge_url) as ws:
@@ -47,102 +49,122 @@ class WhatsAppChannel(BaseChannel):
await ws.send(json.dumps({"type": "auth", "token": self.config.bridge_token}))
self._connected = True
logger.info("Connected to WhatsApp bridge")
-
+
# Listen for messages
async for message in ws:
try:
await self._handle_bridge_message(message)
except Exception as e:
- logger.error(f"Error handling bridge message: {e}")
-
+ logger.error("Error handling bridge message: {}", e)
+
except asyncio.CancelledError:
break
except Exception as e:
self._connected = False
self._ws = None
- logger.warning(f"WhatsApp bridge connection error: {e}")
-
+ logger.warning("WhatsApp bridge connection error: {}", e)
+
if self._running:
logger.info("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
-
+
async def stop(self) -> None:
"""Stop the WhatsApp channel."""
self._running = False
self._connected = False
-
+
if self._ws:
await self._ws.close()
self._ws = None
-
+
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through WhatsApp."""
if not self._ws or not self._connected:
logger.warning("WhatsApp bridge not connected")
return
-
+
try:
payload = {
"type": "send",
"to": msg.chat_id,
"text": msg.content
}
- await self._ws.send(json.dumps(payload))
+ await self._ws.send(json.dumps(payload, ensure_ascii=False))
except Exception as e:
- logger.error(f"Error sending WhatsApp message: {e}")
-
+ logger.error("Error sending WhatsApp message: {}", e)
+
async def _handle_bridge_message(self, raw: str) -> None:
"""Handle a message from the bridge."""
try:
data = json.loads(raw)
except json.JSONDecodeError:
- logger.warning(f"Invalid JSON from bridge: {raw[:100]}")
+ logger.warning("Invalid JSON from bridge: {}", raw[:100])
return
-
+
msg_type = data.get("type")
-
+
if msg_type == "message":
# Incoming message from WhatsApp
# Deprecated by whatsapp: old phone number style typically: @s.whatspp.net
pn = data.get("pn", "")
- # New LID sytle typically:
+ # New LID sytle typically:
sender = data.get("sender", "")
content = data.get("content", "")
-
+ message_id = data.get("id", "")
+
+ if message_id:
+ if message_id in self._processed_message_ids:
+ return
+ self._processed_message_ids[message_id] = None
+ while len(self._processed_message_ids) > 1000:
+ self._processed_message_ids.popitem(last=False)
+
# Extract just the phone number or lid as chat_id
user_id = pn if pn else sender
sender_id = user_id.split("@")[0] if "@" in user_id else user_id
- logger.info(f"Sender {sender}")
-
+ logger.info("Sender {}", sender)
+
# Handle voice transcription if it's a voice message
if content == "[Voice Message]":
- logger.info(f"Voice message received from {sender_id}, but direct download from bridge is not yet supported.")
+ logger.info("Voice message received from {}, but direct download from bridge is not yet supported.", sender_id)
content = "[Voice Message: Transcription not available for WhatsApp yet]"
-
+
+ # Extract media paths (images/documents/videos downloaded by the bridge)
+ media_paths = data.get("media") or []
+
+ # Build content tags matching Telegram's pattern: [image: /path] or [file: /path]
+ if media_paths:
+ for p in media_paths:
+ mime, _ = mimetypes.guess_type(p)
+ media_type = "image" if mime and mime.startswith("image/") else "file"
+ media_tag = f"[{media_type}: {p}]"
+ content = f"{content}\n{media_tag}" if content else media_tag
+
await self._handle_message(
sender_id=sender_id,
chat_id=sender, # Use full LID for replies
content=content,
+ media=media_paths,
metadata={
- "message_id": data.get("id"),
+ "message_id": message_id,
"timestamp": data.get("timestamp"),
"is_group": data.get("isGroup", False)
}
)
-
+
elif msg_type == "status":
# Connection status update
status = data.get("status")
- logger.info(f"WhatsApp status: {status}")
-
+ logger.info("WhatsApp status: {}", status)
+
if status == "connected":
self._connected = True
elif status == "disconnected":
self._connected = False
-
+
elif msg_type == "qr":
# QR code for authentication
logger.info("Scan QR code in the bridge terminal to connect WhatsApp")
-
+
elif msg_type == "error":
- logger.error(f"WhatsApp bridge error: {data.get('error')}")
+ logger.error("WhatsApp bridge error: {}", data.get('error'))
diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py
index 6a9c92f..ca5d8d7 100644
--- a/nanobot/cli/commands.py
+++ b/nanobot/cli/commands.py
@@ -2,23 +2,36 @@
import asyncio
import os
-import signal
-from pathlib import Path
import select
+import signal
import sys
+from pathlib import Path
+
+# Force UTF-8 encoding for Windows console
+if sys.platform == "win32":
+ import locale
+ if sys.stdout.encoding != "utf-8":
+ os.environ["PYTHONIOENCODING"] = "utf-8"
+ # Re-open stdout/stderr with UTF-8 encoding
+ try:
+ sys.stdout.reconfigure(encoding="utf-8", errors="replace")
+ sys.stderr.reconfigure(encoding="utf-8", errors="replace")
+ except Exception:
+ pass
import typer
+from prompt_toolkit import PromptSession
+from prompt_toolkit.formatted_text import HTML
+from prompt_toolkit.history import FileHistory
+from prompt_toolkit.patch_stdout import patch_stdout
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from rich.text import Text
-from prompt_toolkit import PromptSession
-from prompt_toolkit.formatted_text import HTML
-from prompt_toolkit.history import FileHistory
-from prompt_toolkit.patch_stdout import patch_stdout
-
-from nanobot import __version__, __logo__
+from nanobot import __logo__, __version__
+from nanobot.config.schema import Config
+from nanobot.utils.helpers import sync_workspace_templates
app = typer.Typer(
name="nanobot",
@@ -158,9 +171,9 @@ def onboard():
from nanobot.config.loader import get_config_path, load_config, save_config
from nanobot.config.schema import Config
from nanobot.utils.helpers import get_workspace_path
-
+
config_path = get_config_path()
-
+
if config_path.exists():
console.print(f"[yellow]Config already exists at {config_path}[/yellow]")
console.print(" [bold]y[/bold] = overwrite with defaults (existing values will be lost)")
@@ -176,17 +189,16 @@ def onboard():
else:
save_config(Config())
console.print(f"[green]✓[/green] Created config at {config_path}")
-
+
# Create workspace
workspace = get_workspace_path()
-
+
if not workspace.exists():
workspace.mkdir(parents=True, exist_ok=True)
console.print(f"[green]✓[/green] Created workspace at {workspace}")
-
- # Create default bootstrap files
- _create_workspace_templates(workspace)
-
+
+ sync_workspace_templates(workspace)
+
console.print(f"\n{__logo__} nanobot is ready!")
console.print("\nNext steps:")
console.print(" 1. Add your API key to [cyan]~/.nanobot/config.json[/cyan]")
@@ -197,102 +209,57 @@ def onboard():
-def _create_workspace_templates(workspace: Path):
- """Create default workspace template files."""
- templates = {
- "AGENTS.md": """# Agent Instructions
-You are a helpful AI assistant. Be concise, accurate, and friendly.
+def _make_provider(config: Config):
+ """Create the appropriate LLM provider from config."""
+ from nanobot.providers.openai_codex_provider import OpenAICodexProvider
+ from nanobot.providers.azure_openai_provider import AzureOpenAIProvider
-## Guidelines
-
-- Always explain what you're doing before taking actions
-- Ask for clarification when the request is ambiguous
-- Use tools to help accomplish tasks
-- Remember important information in memory/MEMORY.md; past events are logged in memory/HISTORY.md
-""",
- "SOUL.md": """# Soul
-
-I am nanobot, a lightweight AI assistant.
-
-## Personality
-
-- Helpful and friendly
-- Concise and to the point
-- Curious and eager to learn
-
-## Values
-
-- Accuracy over speed
-- User privacy and safety
-- Transparency in actions
-""",
- "USER.md": """# User
-
-Information about the user goes here.
-
-## Preferences
-
-- Communication style: (casual/formal)
-- Timezone: (your timezone)
-- Language: (your preferred language)
-""",
- }
-
- for filename, content in templates.items():
- file_path = workspace / filename
- if not file_path.exists():
- file_path.write_text(content)
- console.print(f" [dim]Created {filename}[/dim]")
-
- # Create memory directory and MEMORY.md
- memory_dir = workspace / "memory"
- memory_dir.mkdir(exist_ok=True)
- memory_file = memory_dir / "MEMORY.md"
- if not memory_file.exists():
- memory_file.write_text("""# Long-term Memory
-
-This file stores important information that should persist across sessions.
-
-## User Information
-
-(Important facts about the user)
-
-## Preferences
-
-(User preferences learned over time)
-
-## Important Notes
-
-(Things to remember)
-""")
- console.print(" [dim]Created memory/MEMORY.md[/dim]")
-
- history_file = memory_dir / "HISTORY.md"
- if not history_file.exists():
- history_file.write_text("")
- console.print(" [dim]Created memory/HISTORY.md[/dim]")
-
- # Create skills directory for custom user skills
- skills_dir = workspace / "skills"
- skills_dir.mkdir(exist_ok=True)
-
-
-def _make_provider(config):
- """Create LiteLLMProvider from config. Exits if no API key found."""
- from nanobot.providers.litellm_provider import LiteLLMProvider
- p = config.get_provider()
model = config.agents.defaults.model
- if not (p and p.api_key) and not model.startswith("bedrock/"):
+ provider_name = config.get_provider_name(model)
+ p = config.get_provider(model)
+
+ # OpenAI Codex (OAuth)
+ if provider_name == "openai_codex" or model.startswith("openai-codex/"):
+ return OpenAICodexProvider(default_model=model)
+
+ # Custom: direct OpenAI-compatible endpoint, bypasses LiteLLM
+ from nanobot.providers.custom_provider import CustomProvider
+ if provider_name == "custom":
+ return CustomProvider(
+ api_key=p.api_key if p else "no-key",
+ api_base=config.get_api_base(model) or "http://localhost:8000/v1",
+ default_model=model,
+ )
+
+ # Azure OpenAI: direct Azure OpenAI endpoint with deployment name
+ if provider_name == "azure_openai":
+ if not p or not p.api_key or not p.api_base:
+ console.print("[red]Error: Azure OpenAI requires api_key and api_base.[/red]")
+ console.print("Set them in ~/.nanobot/config.json under providers.azure_openai section")
+ console.print("Use the model field to specify the deployment name.")
+ raise typer.Exit(1)
+
+ return AzureOpenAIProvider(
+ api_key=p.api_key,
+ api_base=p.api_base,
+ default_model=model,
+ )
+
+ from nanobot.providers.litellm_provider import LiteLLMProvider
+ from nanobot.providers.registry import find_by_name
+ spec = find_by_name(provider_name)
+ if not model.startswith("bedrock/") and not (p and p.api_key) and not (spec and spec.is_oauth):
console.print("[red]Error: No API key configured.[/red]")
console.print("Set one in ~/.nanobot/config.json under providers section")
raise typer.Exit(1)
+
return LiteLLMProvider(
api_key=p.api_key if p else None,
- api_base=config.get_api_base(),
+ api_base=config.get_api_base(model),
default_model=model,
extra_headers=p.extra_headers if p else None,
- provider_name=config.get_provider_name(),
+ provider_name=provider_name,
)
@@ -304,33 +271,40 @@ def _make_provider(config):
@app.command()
def gateway(
port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
+ workspace: str | None = typer.Option(None, "--workspace", "-w", help="Workspace directory"),
+ config: str | None = typer.Option(None, "--config", "-c", help="Config file path"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
):
"""Start the nanobot gateway."""
- from nanobot.config.loader import load_config, get_data_dir
- from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop
+ from nanobot.bus.queue import MessageBus
from nanobot.channels.manager import ChannelManager
- from nanobot.session.manager import SessionManager
+ from nanobot.config.loader import load_config
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService
-
+ from nanobot.session.manager import SessionManager
+
if verbose:
import logging
logging.basicConfig(level=logging.DEBUG)
-
+
+ config_path = Path(config) if config else None
+ config = load_config(config_path)
+ if workspace:
+ config.agents.defaults.workspace = workspace
+
console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
-
- config = load_config()
+ sync_workspace_templates(config.workspace_path)
bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
-
+
# Create cron service first (callback set after agent creation)
- cron_store_path = get_data_dir() / "cron" / "jobs.json"
+ # Use workspace path for per-instance cron store
+ cron_store_path = config.workspace_path / "cron" / "jobs.json"
cron = CronService(cron_store_path)
-
+
# Create agent with cron service
agent = AgentLoop(
bus=bus,
@@ -341,59 +315,123 @@ def gateway(
max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
+ reasoning_effort=config.agents.defaults.reasoning_effort,
brave_api_key=config.tools.web.search.api_key or None,
+ web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec,
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
mcp_servers=config.tools.mcp_servers,
+ channels_config=config.channels,
)
-
+
# Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent."""
- response = await agent.process_direct(
- job.payload.message,
- session_key=f"cron:{job.id}",
- channel=job.payload.channel or "cli",
- chat_id=job.payload.to or "direct",
+ from nanobot.agent.tools.cron import CronTool
+ from nanobot.agent.tools.message import MessageTool
+ reminder_note = (
+ "[Scheduled Task] Timer finished.\n\n"
+ f"Task '{job.name}' has been triggered.\n"
+ f"Scheduled instruction: {job.payload.message}"
)
- if job.payload.deliver and job.payload.to:
+
+ # Prevent the agent from scheduling new cron jobs during execution
+ cron_tool = agent.tools.get("cron")
+ cron_token = None
+ if isinstance(cron_tool, CronTool):
+ cron_token = cron_tool.set_cron_context(True)
+ try:
+ response = await agent.process_direct(
+ reminder_note,
+ session_key=f"cron:{job.id}",
+ channel=job.payload.channel or "cli",
+ chat_id=job.payload.to or "direct",
+ )
+ finally:
+ if isinstance(cron_tool, CronTool) and cron_token is not None:
+ cron_tool.reset_cron_context(cron_token)
+
+ message_tool = agent.tools.get("message")
+ if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
+ return response
+
+ if job.payload.deliver and job.payload.to and response:
from nanobot.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
- content=response or ""
+ content=response
))
return response
cron.on_job = on_cron_job
-
- # Create heartbeat service
- async def on_heartbeat(prompt: str) -> str:
- """Execute heartbeat through the agent."""
- return await agent.process_direct(prompt, session_key="heartbeat")
-
- heartbeat = HeartbeatService(
- workspace=config.workspace_path,
- on_heartbeat=on_heartbeat,
- interval_s=30 * 60, # 30 minutes
- enabled=True
- )
-
+
# Create channel manager
channels = ChannelManager(config, bus)
-
+
+ def _pick_heartbeat_target() -> tuple[str, str]:
+ """Pick a routable channel/chat target for heartbeat-triggered messages."""
+ enabled = set(channels.enabled_channels)
+ # Prefer the most recently updated non-internal session on an enabled channel.
+ for item in session_manager.list_sessions():
+ key = item.get("key") or ""
+ if ":" not in key:
+ continue
+ channel, chat_id = key.split(":", 1)
+ if channel in {"cli", "system"}:
+ continue
+ if channel in enabled and chat_id:
+ return channel, chat_id
+ # Fallback keeps prior behavior but remains explicit.
+ return "cli", "direct"
+
+ # Create heartbeat service
+ async def on_heartbeat_execute(tasks: str) -> str:
+ """Phase 2: execute heartbeat tasks through the full agent loop."""
+ channel, chat_id = _pick_heartbeat_target()
+
+ async def _silent(*_args, **_kwargs):
+ pass
+
+ return await agent.process_direct(
+ tasks,
+ session_key="heartbeat",
+ channel=channel,
+ chat_id=chat_id,
+ on_progress=_silent,
+ )
+
+ async def on_heartbeat_notify(response: str) -> None:
+ """Deliver a heartbeat response to the user's channel."""
+ from nanobot.bus.events import OutboundMessage
+ channel, chat_id = _pick_heartbeat_target()
+ if channel == "cli":
+ return # No external channel available to deliver to
+ await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response))
+
+ hb_cfg = config.gateway.heartbeat
+ heartbeat = HeartbeatService(
+ workspace=config.workspace_path,
+ provider=provider,
+ model=agent.model,
+ on_execute=on_heartbeat_execute,
+ on_notify=on_heartbeat_notify,
+ interval_s=hb_cfg.interval_s,
+ enabled=hb_cfg.enabled,
+ )
+
if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
else:
console.print("[yellow]Warning: No channels enabled[/yellow]")
-
+
cron_status = cron.status()
if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
-
- console.print(f"[green]✓[/green] Heartbeat: every 30m")
-
+
+ console.print(f"[green]✓[/green] Heartbeat: every {hb_cfg.interval_s}s")
+
async def run():
try:
await cron.start()
@@ -410,7 +448,7 @@ def gateway(
cron.stop()
agent.stop()
await channels.stop_all()
-
+
asyncio.run(run())
@@ -429,21 +467,28 @@ def agent(
logs: bool = typer.Option(False, "--logs/--no-logs", help="Show nanobot runtime logs during chat"),
):
"""Interact with the agent directly."""
- from nanobot.config.loader import load_config
- from nanobot.bus.queue import MessageBus
- from nanobot.agent.loop import AgentLoop
from loguru import logger
-
+
+ from nanobot.agent.loop import AgentLoop
+ from nanobot.bus.queue import MessageBus
+ from nanobot.config.loader import get_data_dir, load_config
+ from nanobot.cron.service import CronService
+
config = load_config()
-
+ sync_workspace_templates(config.workspace_path)
+
bus = MessageBus()
provider = _make_provider(config)
+ # Create cron service for tool usage (no callback needed for CLI unless running)
+ cron_store_path = get_data_dir() / "cron" / "jobs.json"
+ cron = CronService(cron_store_path)
+
if logs:
logger.enable("nanobot")
else:
logger.disable("nanobot")
-
+
agent_loop = AgentLoop(
bus=bus,
provider=provider,
@@ -453,12 +498,16 @@ def agent(
max_tokens=config.agents.defaults.max_tokens,
max_iterations=config.agents.defaults.max_tool_iterations,
memory_window=config.agents.defaults.memory_window,
+ reasoning_effort=config.agents.defaults.reasoning_effort,
brave_api_key=config.tools.web.search.api_key or None,
+ web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec,
+ cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
mcp_servers=config.tools.mcp_servers,
+ channels_config=config.channels,
)
-
+
# Show spinner when logs are off (no output to miss); skip when logs are on
def _thinking_ctx():
if logs:
@@ -467,28 +516,83 @@ def agent(
# Animated spinner is safe to use with prompt_toolkit input handling
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
+ async def _cli_progress(content: str, *, tool_hint: bool = False) -> None:
+ ch = agent_loop.channels_config
+ if ch and tool_hint and not ch.send_tool_hints:
+ return
+ if ch and not tool_hint and not ch.send_progress:
+ return
+ console.print(f" [dim]↳ {content}[/dim]")
+
if message:
- # Single message mode
+ # Single message mode — direct call, no bus needed
async def run_once():
with _thinking_ctx():
- response = await agent_loop.process_direct(message, session_id)
+ response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
await agent_loop.close_mcp()
-
+
asyncio.run(run_once())
else:
- # Interactive mode
+ # Interactive mode — route through bus like other channels
+ from nanobot.bus.events import InboundMessage
_init_prompt_session()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
- def _exit_on_sigint(signum, frame):
- _restore_terminal()
- console.print("\nGoodbye!")
- os._exit(0)
+ if ":" in session_id:
+ cli_channel, cli_chat_id = session_id.split(":", 1)
+ else:
+ cli_channel, cli_chat_id = "cli", session_id
+
+ def _handle_signal(signum, frame):
+ sig_name = signal.Signals(signum).name
+ _restore_terminal()
+ console.print(f"\nReceived {sig_name}, goodbye!")
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, _handle_signal)
+ signal.signal(signal.SIGTERM, _handle_signal)
+ # SIGHUP is not available on Windows
+ if hasattr(signal, 'SIGHUP'):
+ signal.signal(signal.SIGHUP, _handle_signal)
+ # Ignore SIGPIPE to prevent silent process termination when writing to closed pipes
+ # SIGPIPE is not available on Windows
+ if hasattr(signal, 'SIGPIPE'):
+ signal.signal(signal.SIGPIPE, signal.SIG_IGN)
- signal.signal(signal.SIGINT, _exit_on_sigint)
-
async def run_interactive():
+ bus_task = asyncio.create_task(agent_loop.run())
+ turn_done = asyncio.Event()
+ turn_done.set()
+ turn_response: list[str] = []
+
+ async def _consume_outbound():
+ while True:
+ try:
+ msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
+ if msg.metadata.get("_progress"):
+ is_tool_hint = msg.metadata.get("_tool_hint", False)
+ ch = agent_loop.channels_config
+ if ch and is_tool_hint and not ch.send_tool_hints:
+ pass
+ elif ch and not is_tool_hint and not ch.send_progress:
+ pass
+ else:
+ console.print(f" [dim]↳ {msg.content}[/dim]")
+ elif not turn_done.is_set():
+ if msg.content:
+ turn_response.append(msg.content)
+ turn_done.set()
+ elif msg.content:
+ console.print()
+ _print_agent_response(msg.content, render_markdown=markdown)
+ except asyncio.TimeoutError:
+ continue
+ except asyncio.CancelledError:
+ break
+
+ outbound_task = asyncio.create_task(_consume_outbound())
+
try:
while True:
try:
@@ -502,10 +606,22 @@ def agent(
_restore_terminal()
console.print("\nGoodbye!")
break
-
+
+ turn_done.clear()
+ turn_response.clear()
+
+ await bus.publish_inbound(InboundMessage(
+ channel=cli_channel,
+ sender_id="user",
+ chat_id=cli_chat_id,
+ content=user_input,
+ ))
+
with _thinking_ctx():
- response = await agent_loop.process_direct(user_input, session_id)
- _print_agent_response(response, render_markdown=markdown)
+ await turn_done.wait()
+
+ if turn_response:
+ _print_agent_response(turn_response[0], render_markdown=markdown)
except KeyboardInterrupt:
_restore_terminal()
console.print("\nGoodbye!")
@@ -515,8 +631,11 @@ def agent(
console.print("\nGoodbye!")
break
finally:
+ agent_loop.stop()
+ outbound_task.cancel()
+ await asyncio.gather(bus_task, outbound_task, return_exceptions=True)
await agent_loop.close_mcp()
-
+
asyncio.run(run_interactive())
@@ -573,7 +692,7 @@ def channels_status():
"✓" if mc.enabled else "✗",
mc_base
)
-
+
# Telegram
tg = config.channels.telegram
tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
@@ -592,6 +711,33 @@ def channels_status():
slack_config
)
+ # DingTalk
+ dt = config.channels.dingtalk
+ dt_config = f"client_id: {dt.client_id[:10]}..." if dt.client_id else "[dim]not configured[/dim]"
+ table.add_row(
+ "DingTalk",
+ "✓" if dt.enabled else "✗",
+ dt_config
+ )
+
+ # QQ
+ qq = config.channels.qq
+ qq_config = f"app_id: {qq.app_id[:10]}..." if qq.app_id else "[dim]not configured[/dim]"
+ table.add_row(
+ "QQ",
+ "✓" if qq.enabled else "✗",
+ qq_config
+ )
+
+ # Email
+ em = config.channels.email
+ em_config = em.imap_host if em.imap_host else "[dim]not configured[/dim]"
+ table.add_row(
+ "Email",
+ "✓" if em.enabled else "✗",
+ em_config
+ )
+
console.print(table)
@@ -599,57 +745,57 @@ def _get_bridge_dir() -> Path:
"""Get the bridge directory, setting it up if needed."""
import shutil
import subprocess
-
+
# User's bridge location
user_bridge = Path.home() / ".nanobot" / "bridge"
-
+
# Check if already built
if (user_bridge / "dist" / "index.js").exists():
return user_bridge
-
+
# Check for npm
if not shutil.which("npm"):
console.print("[red]npm not found. Please install Node.js >= 18.[/red]")
raise typer.Exit(1)
-
+
# Find source bridge: first check package data, then source dir
pkg_bridge = Path(__file__).parent.parent / "bridge" # nanobot/bridge (installed)
src_bridge = Path(__file__).parent.parent.parent / "bridge" # repo root/bridge (dev)
-
+
source = None
if (pkg_bridge / "package.json").exists():
source = pkg_bridge
elif (src_bridge / "package.json").exists():
source = src_bridge
-
+
if not source:
console.print("[red]Bridge source not found.[/red]")
console.print("Try reinstalling: pip install --force-reinstall nanobot")
raise typer.Exit(1)
-
+
console.print(f"{__logo__} Setting up bridge...")
-
+
# Copy to user directory
user_bridge.parent.mkdir(parents=True, exist_ok=True)
if user_bridge.exists():
shutil.rmtree(user_bridge)
shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
-
+
# Install and build
try:
console.print(" Installing dependencies...")
subprocess.run(["npm", "install"], cwd=user_bridge, check=True, capture_output=True)
-
+
console.print(" Building...")
subprocess.run(["npm", "run", "build"], cwd=user_bridge, check=True, capture_output=True)
-
+
console.print("[green]✓[/green] Bridge ready\n")
except subprocess.CalledProcessError as e:
console.print(f"[red]Build failed: {e}[/red]")
if e.stderr:
console.print(f"[dim]{e.stderr.decode()[:500]}[/dim]")
raise typer.Exit(1)
-
+
return user_bridge
@@ -657,18 +803,19 @@ def _get_bridge_dir() -> Path:
def channels_login():
"""Link device via QR code."""
import subprocess
+
from nanobot.config.loader import load_config
-
+
config = load_config()
bridge_dir = _get_bridge_dir()
-
+
console.print(f"{__logo__} Starting bridge...")
console.print("Scan the QR code to connect.\n")
-
+
env = {**os.environ}
if config.channels.whatsapp.bridge_token:
env["BRIDGE_TOKEN"] = config.channels.whatsapp.bridge_token
-
+
try:
subprocess.run(["npm", "start"], cwd=bridge_dir, check=True, env=env)
except subprocess.CalledProcessError as e:
@@ -677,163 +824,6 @@ def channels_login():
console.print("[red]npm not found. Please install Node.js.[/red]")
-# ============================================================================
-# Cron Commands
-# ============================================================================
-
-cron_app = typer.Typer(help="Manage scheduled tasks")
-app.add_typer(cron_app, name="cron")
-
-
-@cron_app.command("list")
-def cron_list(
- all: bool = typer.Option(False, "--all", "-a", help="Include disabled jobs"),
-):
- """List scheduled jobs."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- jobs = service.list_jobs(include_disabled=all)
-
- if not jobs:
- console.print("No scheduled jobs.")
- return
-
- table = Table(title="Scheduled Jobs")
- table.add_column("ID", style="cyan")
- table.add_column("Name")
- table.add_column("Schedule")
- table.add_column("Status")
- table.add_column("Next Run")
-
- import time
- for job in jobs:
- # Format schedule
- if job.schedule.kind == "every":
- sched = f"every {(job.schedule.every_ms or 0) // 1000}s"
- elif job.schedule.kind == "cron":
- sched = job.schedule.expr or ""
- else:
- sched = "one-time"
-
- # Format next run
- next_run = ""
- if job.state.next_run_at_ms:
- next_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(job.state.next_run_at_ms / 1000))
- next_run = next_time
-
- status = "[green]enabled[/green]" if job.enabled else "[dim]disabled[/dim]"
-
- table.add_row(job.id, job.name, sched, status, next_run)
-
- console.print(table)
-
-
-@cron_app.command("add")
-def cron_add(
- name: str = typer.Option(..., "--name", "-n", help="Job name"),
- message: str = typer.Option(..., "--message", "-m", help="Message for agent"),
- every: int = typer.Option(None, "--every", "-e", help="Run every N seconds"),
- cron_expr: str = typer.Option(None, "--cron", "-c", help="Cron expression (e.g. '0 9 * * *')"),
- at: str = typer.Option(None, "--at", help="Run once at time (ISO format)"),
- deliver: bool = typer.Option(False, "--deliver", "-d", help="Deliver response to channel"),
- to: str = typer.Option(None, "--to", help="Recipient for delivery"),
- channel: str = typer.Option(None, "--channel", help="Channel for delivery (e.g. 'telegram', 'whatsapp')"),
-):
- """Add a scheduled job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
- from nanobot.cron.types import CronSchedule
-
- # Determine schedule type
- if every:
- schedule = CronSchedule(kind="every", every_ms=every * 1000)
- elif cron_expr:
- schedule = CronSchedule(kind="cron", expr=cron_expr)
- elif at:
- import datetime
- dt = datetime.datetime.fromisoformat(at)
- schedule = CronSchedule(kind="at", at_ms=int(dt.timestamp() * 1000))
- else:
- console.print("[red]Error: Must specify --every, --cron, or --at[/red]")
- raise typer.Exit(1)
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- job = service.add_job(
- name=name,
- schedule=schedule,
- message=message,
- deliver=deliver,
- to=to,
- channel=channel,
- )
-
- console.print(f"[green]✓[/green] Added job '{job.name}' ({job.id})")
-
-
-@cron_app.command("remove")
-def cron_remove(
- job_id: str = typer.Argument(..., help="Job ID to remove"),
-):
- """Remove a scheduled job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- if service.remove_job(job_id):
- console.print(f"[green]✓[/green] Removed job {job_id}")
- else:
- console.print(f"[red]Job {job_id} not found[/red]")
-
-
-@cron_app.command("enable")
-def cron_enable(
- job_id: str = typer.Argument(..., help="Job ID"),
- disable: bool = typer.Option(False, "--disable", help="Disable instead of enable"),
-):
- """Enable or disable a job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- job = service.enable_job(job_id, enabled=not disable)
- if job:
- status = "disabled" if disable else "enabled"
- console.print(f"[green]✓[/green] Job '{job.name}' {status}")
- else:
- console.print(f"[red]Job {job_id} not found[/red]")
-
-
-@cron_app.command("run")
-def cron_run(
- job_id: str = typer.Argument(..., help="Job ID to run"),
- force: bool = typer.Option(False, "--force", "-f", help="Run even if disabled"),
-):
- """Manually run a job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- async def run():
- return await service.run_job(job_id, force=force)
-
- if asyncio.run(run()):
- console.print(f"[green]✓[/green] Job executed")
- else:
- console.print(f"[red]Failed to run job {job_id}[/red]")
-
-
# ============================================================================
# Status Commands
# ============================================================================
@@ -842,7 +832,7 @@ def cron_run(
@app.command()
def status():
"""Show nanobot status."""
- from nanobot.config.loader import load_config, get_config_path
+ from nanobot.config.loader import get_config_path, load_config
config_path = get_config_path()
config = load_config()
@@ -857,13 +847,15 @@ def status():
from nanobot.providers.registry import PROVIDERS
console.print(f"Model: {config.agents.defaults.model}")
-
+
# Check API keys from registry
for spec in PROVIDERS:
p = getattr(config.providers, spec.name, None)
if p is None:
continue
- if spec.is_local:
+ if spec.is_oauth:
+ console.print(f"{spec.label}: [green]✓ (OAuth)[/green]")
+ elif spec.is_local:
# Local deployments show api_base instead of api_key
if p.api_base:
console.print(f"{spec.label}: [green]✓ {p.api_base}[/green]")
@@ -874,5 +866,88 @@ def status():
console.print(f"{spec.label}: {'[green]✓[/green]' if has_key else '[dim]not set[/dim]'}")
+# ============================================================================
+# OAuth Login
+# ============================================================================
+
+provider_app = typer.Typer(help="Manage providers")
+app.add_typer(provider_app, name="provider")
+
+
+_LOGIN_HANDLERS: dict[str, callable] = {}
+
+
+def _register_login(name: str):
+ def decorator(fn):
+ _LOGIN_HANDLERS[name] = fn
+ return fn
+ return decorator
+
+
+@provider_app.command("login")
+def provider_login(
+ provider: str = typer.Argument(..., help="OAuth provider (e.g. 'openai-codex', 'github-copilot')"),
+):
+ """Authenticate with an OAuth provider."""
+ from nanobot.providers.registry import PROVIDERS
+
+ key = provider.replace("-", "_")
+ spec = next((s for s in PROVIDERS if s.name == key and s.is_oauth), None)
+ if not spec:
+ names = ", ".join(s.name.replace("_", "-") for s in PROVIDERS if s.is_oauth)
+ console.print(f"[red]Unknown OAuth provider: {provider}[/red] Supported: {names}")
+ raise typer.Exit(1)
+
+ handler = _LOGIN_HANDLERS.get(spec.name)
+ if not handler:
+ console.print(f"[red]Login not implemented for {spec.label}[/red]")
+ raise typer.Exit(1)
+
+ console.print(f"{__logo__} OAuth Login - {spec.label}\n")
+ handler()
+
+
+@_register_login("openai_codex")
+def _login_openai_codex() -> None:
+ try:
+ from oauth_cli_kit import get_token, login_oauth_interactive
+ token = None
+ try:
+ token = get_token()
+ except Exception:
+ pass
+ if not (token and token.access):
+ console.print("[cyan]Starting interactive OAuth login...[/cyan]\n")
+ token = login_oauth_interactive(
+ print_fn=lambda s: console.print(s),
+ prompt_fn=lambda s: typer.prompt(s),
+ )
+ if not (token and token.access):
+ console.print("[red]✗ Authentication failed[/red]")
+ raise typer.Exit(1)
+ console.print(f"[green]✓ Authenticated with OpenAI Codex[/green] [dim]{token.account_id}[/dim]")
+ except ImportError:
+ console.print("[red]oauth_cli_kit not installed. Run: pip install oauth-cli-kit[/red]")
+ raise typer.Exit(1)
+
+
+@_register_login("github_copilot")
+def _login_github_copilot() -> None:
+ import asyncio
+
+ console.print("[cyan]Starting GitHub Copilot device flow...[/cyan]\n")
+
+ async def _trigger():
+ from litellm import acompletion
+ await acompletion(model="github_copilot/gpt-4o", messages=[{"role": "user", "content": "hi"}], max_tokens=1)
+
+ try:
+ asyncio.run(_trigger())
+ console.print("[green]✓ Authenticated with GitHub Copilot[/green]")
+ except Exception as e:
+ console.print(f"[red]Authentication error: {e}[/red]")
+ raise typer.Exit(1)
+
+
if __name__ == "__main__":
app()
diff --git a/nanobot/config/__init__.py b/nanobot/config/__init__.py
index 88e8e9b..6c59668 100644
--- a/nanobot/config/__init__.py
+++ b/nanobot/config/__init__.py
@@ -1,6 +1,6 @@
"""Configuration module for nanobot."""
-from nanobot.config.loader import load_config, get_config_path
+from nanobot.config.loader import get_config_path, load_config
from nanobot.config.schema import Config
__all__ = ["Config", "load_config", "get_config_path"]
diff --git a/nanobot/config/loader.py b/nanobot/config/loader.py
index fd7d1e8..c789efd 100644
--- a/nanobot/config/loader.py
+++ b/nanobot/config/loader.py
@@ -2,7 +2,6 @@
import json
from pathlib import Path
-from typing import Any
from nanobot.config.schema import Config
@@ -21,45 +20,43 @@ def get_data_dir() -> Path:
def load_config(config_path: Path | None = None) -> Config:
"""
Load configuration from file or create default.
-
+
Args:
config_path: Optional path to config file. Uses default if not provided.
-
+
Returns:
Loaded configuration object.
"""
path = config_path or get_config_path()
-
+
if path.exists():
try:
- with open(path) as f:
+ with open(path, encoding="utf-8") as f:
data = json.load(f)
data = _migrate_config(data)
- return Config.model_validate(convert_keys(data))
+ return Config.model_validate(data)
except (json.JSONDecodeError, ValueError) as e:
print(f"Warning: Failed to load config from {path}: {e}")
print("Using default configuration.")
-
+
return Config()
def save_config(config: Config, config_path: Path | None = None) -> None:
"""
Save configuration to file.
-
+
Args:
config: Configuration to save.
config_path: Optional path to save to. Uses default if not provided.
"""
path = config_path or get_config_path()
path.parent.mkdir(parents=True, exist_ok=True)
-
- # Convert to camelCase format
- data = config.model_dump()
- data = convert_to_camel(data)
-
- with open(path, "w") as f:
- json.dump(data, f, indent=2)
+
+ data = config.model_dump(by_alias=True)
+
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(data, f, indent=2, ensure_ascii=False)
def _migrate_config(data: dict) -> dict:
@@ -70,37 +67,3 @@ def _migrate_config(data: dict) -> dict:
if "restrictToWorkspace" in exec_cfg and "restrictToWorkspace" not in tools:
tools["restrictToWorkspace"] = exec_cfg.pop("restrictToWorkspace")
return data
-
-
-def convert_keys(data: Any) -> Any:
- """Convert camelCase keys to snake_case for Pydantic."""
- if isinstance(data, dict):
- return {camel_to_snake(k): convert_keys(v) for k, v in data.items()}
- if isinstance(data, list):
- return [convert_keys(item) for item in data]
- return data
-
-
-def convert_to_camel(data: Any) -> Any:
- """Convert snake_case keys to camelCase."""
- if isinstance(data, dict):
- return {snake_to_camel(k): convert_to_camel(v) for k, v in data.items()}
- if isinstance(data, list):
- return [convert_to_camel(item) for item in data]
- return data
-
-
-def camel_to_snake(name: str) -> str:
- """Convert camelCase to snake_case."""
- result = []
- for i, char in enumerate(name):
- if char.isupper() and i > 0:
- result.append("_")
- result.append(char.lower())
- return "".join(result)
-
-
-def snake_to_camel(name: str) -> str:
- """Convert snake_case to camelCase."""
- components = name.split("_")
- return components[0] + "".join(x.title() for x in components[1:])
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 0934aac..803cb61 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -1,54 +1,98 @@
"""Configuration schema using Pydantic."""
from pathlib import Path
-from pydantic import BaseModel, Field, ConfigDict
+from typing import Literal
+
+from pydantic import BaseModel, ConfigDict, Field
+from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings
-class WhatsAppConfig(BaseModel):
+class Base(BaseModel):
+ """Base model that accepts both camelCase and snake_case keys."""
+
+ model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
+
+
+class WhatsAppConfig(Base):
"""WhatsApp channel configuration."""
+
enabled: bool = False
bridge_url: str = "ws://localhost:3001"
bridge_token: str = "" # Shared token for bridge auth (optional, recommended)
allow_from: list[str] = Field(default_factory=list) # Allowed phone numbers
-class TelegramConfig(BaseModel):
+class TelegramConfig(Base):
"""Telegram channel configuration."""
+
enabled: bool = False
token: str = "" # Bot token from @BotFather
allow_from: list[str] = Field(default_factory=list) # Allowed user IDs or usernames
- proxy: str | None = None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
+ proxy: str | None = (
+ None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
+ )
+ reply_to_message: bool = False # If true, bot replies quote the original message
-class FeishuConfig(BaseModel):
+class FeishuConfig(Base):
"""Feishu/Lark channel configuration using WebSocket long connection."""
+
enabled: bool = False
app_id: str = "" # App ID from Feishu Open Platform
app_secret: str = "" # App Secret from Feishu Open Platform
encrypt_key: str = "" # Encrypt Key for event subscription (optional)
verification_token: str = "" # Verification Token for event subscription (optional)
allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids
+ react_emoji: str = (
+ "THUMBSUP" # Emoji type for message reactions (e.g. THUMBSUP, OK, DONE, SMILE)
+ )
-class DingTalkConfig(BaseModel):
+class DingTalkConfig(Base):
"""DingTalk channel configuration using Stream mode."""
+
enabled: bool = False
client_id: str = "" # AppKey
client_secret: str = "" # AppSecret
allow_from: list[str] = Field(default_factory=list) # Allowed staff_ids
-class DiscordConfig(BaseModel):
+class DiscordConfig(Base):
"""Discord channel configuration."""
+
enabled: bool = False
token: str = "" # Bot token from Discord Developer Portal
allow_from: list[str] = Field(default_factory=list) # Allowed user IDs
gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
+ group_policy: Literal["mention", "open"] = "mention"
-class EmailConfig(BaseModel):
+
+class MatrixConfig(Base):
+ """Matrix (Element) channel configuration."""
+
+ enabled: bool = False
+ homeserver: str = "https://matrix.org"
+ access_token: str = ""
+ user_id: str = "" # @bot:matrix.org
+ device_id: str = ""
+ e2ee_enabled: bool = True # Enable Matrix E2EE support (encryption + encrypted room handling).
+ sync_stop_grace_seconds: int = (
+ 2 # Max seconds to wait for sync_forever to stop gracefully before cancellation fallback.
+ )
+ max_media_bytes: int = (
+ 20 * 1024 * 1024
+ ) # Max attachment size accepted for Matrix media handling (inbound + outbound).
+ allow_from: list[str] = Field(default_factory=list)
+ group_policy: Literal["open", "mention", "allowlist"] = "open"
+ group_allow_from: list[str] = Field(default_factory=list)
+ allow_room_mentions: bool = False
+
+
+class EmailConfig(Base):
"""Email channel configuration (IMAP inbound + SMTP outbound)."""
+
enabled: bool = False
consent_granted: bool = False # Explicit owner permission to access mailbox data
@@ -70,7 +114,9 @@ class EmailConfig(BaseModel):
from_address: str = ""
# Behavior
- auto_reply_enabled: bool = True # If false, inbound email is read but no automatic reply is sent
+ auto_reply_enabled: bool = (
+ True # If false, inbound email is read but no automatic reply is sent
+ )
poll_interval_seconds: int = 30
mark_seen: bool = True
max_body_chars: int = 12000
@@ -78,18 +124,21 @@ class EmailConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
-class MochatMentionConfig(BaseModel):
+class MochatMentionConfig(Base):
"""Mochat mention behavior configuration."""
+
require_in_groups: bool = False
-class MochatGroupRule(BaseModel):
+class MochatGroupRule(Base):
"""Mochat per-group mention requirement."""
+
require_mention: bool = False
-class MochatConfig(BaseModel):
+class MochatConfig(Base):
"""Mochat channel configuration."""
+
enabled: bool = False
base_url: str = "https://mochat.io"
socket_url: str = ""
@@ -114,36 +163,49 @@ class MochatConfig(BaseModel):
reply_delay_ms: int = 120000
-class SlackDMConfig(BaseModel):
+class SlackDMConfig(Base):
"""Slack DM policy configuration."""
+
enabled: bool = True
policy: str = "open" # "open" or "allowlist"
allow_from: list[str] = Field(default_factory=list) # Allowed Slack user IDs
-class SlackConfig(BaseModel):
+class SlackConfig(Base):
"""Slack channel configuration."""
+
enabled: bool = False
mode: str = "socket" # "socket" supported
webhook_path: str = "/slack/events"
bot_token: str = "" # xoxb-...
app_token: str = "" # xapp-...
user_token_read_only: bool = True
+ reply_in_thread: bool = True
+ react_emoji: str = "eyes"
+ allow_from: list[str] = Field(default_factory=list) # Allowed Slack user IDs (sender-level)
group_policy: str = "mention" # "mention", "open", "allowlist"
group_allow_from: list[str] = Field(default_factory=list) # Allowed channel IDs if allowlist
dm: SlackDMConfig = Field(default_factory=SlackDMConfig)
-class QQConfig(BaseModel):
+class QQConfig(Base):
"""QQ channel configuration using botpy SDK."""
+
enabled: bool = False
app_id: str = "" # 机器人 ID (AppID) from q.qq.com
secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
- allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access)
+ allow_from: list[str] = Field(
+ default_factory=list
+ ) # Allowed user openids (empty = public access)
-class ChannelsConfig(BaseModel):
+
+
+class ChannelsConfig(Base):
"""Configuration for chat channels."""
+
+ send_progress: bool = True # stream agent's text progress to the channel
+ send_tool_hints: bool = False # stream tool-call hints (e.g. read_file("…"))
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
@@ -153,33 +215,43 @@ class ChannelsConfig(BaseModel):
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig)
qq: QQConfig = Field(default_factory=QQConfig)
+ matrix: MatrixConfig = Field(default_factory=MatrixConfig)
-class AgentDefaults(BaseModel):
+class AgentDefaults(Base):
"""Default agent configuration."""
+
workspace: str = "~/.nanobot/workspace"
model: str = "anthropic/claude-opus-4-5"
+ provider: str = (
+ "auto" # Provider name (e.g. "anthropic", "openrouter") or "auto" for auto-detection
+ )
max_tokens: int = 8192
- temperature: float = 0.7
- max_tool_iterations: int = 20
- memory_window: int = 50
+ temperature: float = 0.1
+ max_tool_iterations: int = 40
+ memory_window: int = 100
+ reasoning_effort: str | None = None # low / medium / high — enables LLM thinking mode
-class AgentsConfig(BaseModel):
+class AgentsConfig(Base):
"""Agent configuration."""
+
defaults: AgentDefaults = Field(default_factory=AgentDefaults)
-class ProviderConfig(BaseModel):
+class ProviderConfig(Base):
"""LLM provider configuration."""
+
api_key: str = ""
api_base: str | None = None
extra_headers: dict[str, str] | None = None # Custom headers (e.g. APP-Code for AiHubMix)
-class ProvidersConfig(BaseModel):
+class ProvidersConfig(Base):
"""Configuration for LLM providers."""
+
custom: ProviderConfig = Field(default_factory=ProviderConfig) # Any OpenAI-compatible endpoint
+ azure_openai: ProviderConfig = Field(default_factory=ProviderConfig) # Azure OpenAI (model = deployment name)
anthropic: ProviderConfig = Field(default_factory=ProviderConfig)
openai: ProviderConfig = Field(default_factory=ProviderConfig)
openrouter: ProviderConfig = Field(default_factory=ProviderConfig)
@@ -192,40 +264,65 @@ class ProvidersConfig(BaseModel):
moonshot: ProviderConfig = Field(default_factory=ProviderConfig)
minimax: ProviderConfig = Field(default_factory=ProviderConfig)
aihubmix: ProviderConfig = Field(default_factory=ProviderConfig) # AiHubMix API gateway
+ siliconflow: ProviderConfig = Field(default_factory=ProviderConfig) # SiliconFlow (硅基流动)
+ volcengine: ProviderConfig = Field(default_factory=ProviderConfig) # VolcEngine (火山引擎)
+ openai_codex: ProviderConfig = Field(default_factory=ProviderConfig) # OpenAI Codex (OAuth)
+ github_copilot: ProviderConfig = Field(default_factory=ProviderConfig) # Github Copilot (OAuth)
-class GatewayConfig(BaseModel):
+class HeartbeatConfig(Base):
+ """Heartbeat service configuration."""
+
+ enabled: bool = True
+ interval_s: int = 30 * 60 # 30 minutes
+
+
+class GatewayConfig(Base):
"""Gateway/server configuration."""
+
host: str = "0.0.0.0"
port: int = 18790
+ heartbeat: HeartbeatConfig = Field(default_factory=HeartbeatConfig)
-class WebSearchConfig(BaseModel):
+class WebSearchConfig(Base):
"""Web search tool configuration."""
+
api_key: str = "" # Brave Search API key
max_results: int = 5
-class WebToolsConfig(BaseModel):
+class WebToolsConfig(Base):
"""Web tools configuration."""
+
+ proxy: str | None = (
+ None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
+ )
search: WebSearchConfig = Field(default_factory=WebSearchConfig)
-class ExecToolConfig(BaseModel):
+class ExecToolConfig(Base):
"""Shell exec tool configuration."""
+
timeout: int = 60
+ path_append: str = ""
-class MCPServerConfig(BaseModel):
+class MCPServerConfig(Base):
"""MCP server connection configuration (stdio or HTTP)."""
+
+ type: Literal["stdio", "sse", "streamableHttp"] | None = None # auto-detected if omitted
command: str = "" # Stdio: command to run (e.g. "npx")
args: list[str] = Field(default_factory=list) # Stdio: command arguments
env: dict[str, str] = Field(default_factory=dict) # Stdio: extra env vars
- url: str = "" # HTTP: streamable HTTP endpoint URL
+ url: str = "" # HTTP/SSE: endpoint URL
+ headers: dict[str, str] = Field(default_factory=dict) # HTTP/SSE: custom headers
+ tool_timeout: int = 30 # seconds before a tool call is cancelled
-class ToolsConfig(BaseModel):
+class ToolsConfig(Base):
"""Tools configuration."""
+
web: WebToolsConfig = Field(default_factory=WebToolsConfig)
exec: ExecToolConfig = Field(default_factory=ExecToolConfig)
restrict_to_workspace: bool = False # If true, restrict all tool access to workspace directory
@@ -234,30 +331,57 @@ class ToolsConfig(BaseModel):
class Config(BaseSettings):
"""Root configuration for nanobot."""
+
agents: AgentsConfig = Field(default_factory=AgentsConfig)
channels: ChannelsConfig = Field(default_factory=ChannelsConfig)
providers: ProvidersConfig = Field(default_factory=ProvidersConfig)
gateway: GatewayConfig = Field(default_factory=GatewayConfig)
tools: ToolsConfig = Field(default_factory=ToolsConfig)
-
+
@property
def workspace_path(self) -> Path:
"""Get expanded workspace path."""
return Path(self.agents.defaults.workspace).expanduser()
-
- def _match_provider(self, model: str | None = None) -> tuple["ProviderConfig | None", str | None]:
+
+ def _match_provider(
+ self, model: str | None = None
+ ) -> tuple["ProviderConfig | None", str | None]:
"""Match provider config and its registry name. Returns (config, spec_name)."""
from nanobot.providers.registry import PROVIDERS
+
+ forced = self.agents.defaults.provider
+ if forced != "auto":
+ p = getattr(self.providers, forced, None)
+ return (p, forced) if p else (None, None)
+
model_lower = (model or self.agents.defaults.model).lower()
+ model_normalized = model_lower.replace("-", "_")
+ model_prefix = model_lower.split("/", 1)[0] if "/" in model_lower else ""
+ normalized_prefix = model_prefix.replace("-", "_")
+
+ def _kw_matches(kw: str) -> bool:
+ kw = kw.lower()
+ return kw in model_lower or kw.replace("-", "_") in model_normalized
+
+ # Explicit provider prefix wins — prevents `github-copilot/...codex` matching openai_codex.
+ for spec in PROVIDERS:
+ p = getattr(self.providers, spec.name, None)
+ if p and model_prefix and normalized_prefix == spec.name:
+ if spec.is_oauth or p.api_key:
+ return p, spec.name
# Match by keyword (order follows PROVIDERS registry)
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
- if p and any(kw in model_lower for kw in spec.keywords) and p.api_key:
- return p, spec.name
+ if p and any(_kw_matches(kw) for kw in spec.keywords):
+ if spec.is_oauth or p.api_key:
+ return p, spec.name
# Fallback: gateways first, then others (follows registry order)
+ # OAuth providers are NOT valid fallbacks — they require explicit model selection
for spec in PROVIDERS:
+ if spec.is_oauth:
+ continue
p = getattr(self.providers, spec.name, None)
if p and p.api_key:
return p, spec.name
@@ -277,10 +401,11 @@ class Config(BaseSettings):
"""Get API key for the given model. Falls back to first available key."""
p = self.get_provider(model)
return p.api_key if p else None
-
+
def get_api_base(self, model: str | None = None) -> str | None:
"""Get API base URL for the given model. Applies default URLs for known gateways."""
from nanobot.providers.registry import find_by_name
+
p, name = self._match_provider(model)
if p and p.api_base:
return p.api_base
@@ -292,8 +417,5 @@ class Config(BaseSettings):
if spec and spec.is_gateway and spec.default_api_base:
return spec.default_api_base
return None
-
- model_config = ConfigDict(
- env_prefix="NANOBOT_",
- env_nested_delimiter="__"
- )
+
+ model_config = ConfigDict(env_prefix="NANOBOT_", env_nested_delimiter="__")
diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py
index 4da845a..1ed71f0 100644
--- a/nanobot/cron/service.py
+++ b/nanobot/cron/service.py
@@ -21,18 +21,20 @@ def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
"""Compute next run time in ms."""
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
-
+
if schedule.kind == "every":
if not schedule.every_ms or schedule.every_ms <= 0:
return None
# Next interval from now
return now_ms + schedule.every_ms
-
+
if schedule.kind == "cron" and schedule.expr:
try:
- from croniter import croniter
from zoneinfo import ZoneInfo
- base_time = time.time()
+
+ from croniter import croniter
+ # Use caller-provided reference time for deterministic scheduling
+ base_time = now_ms / 1000
tz = ZoneInfo(schedule.tz) if schedule.tz else datetime.now().astimezone().tzinfo
base_dt = datetime.fromtimestamp(base_time, tz=tz)
cron = croniter(schedule.expr, base_dt)
@@ -40,32 +42,52 @@ def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
return int(next_dt.timestamp() * 1000)
except Exception:
return None
-
+
return None
+def _validate_schedule_for_add(schedule: CronSchedule) -> None:
+ """Validate schedule fields that would otherwise create non-runnable jobs."""
+ if schedule.tz and schedule.kind != "cron":
+ raise ValueError("tz can only be used with cron schedules")
+
+ if schedule.kind == "cron" and schedule.tz:
+ try:
+ from zoneinfo import ZoneInfo
+
+ ZoneInfo(schedule.tz)
+ except Exception:
+ raise ValueError(f"unknown timezone '{schedule.tz}'") from None
+
+
class CronService:
"""Service for managing and executing scheduled jobs."""
-
+
def __init__(
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
):
self.store_path = store_path
- self.on_job = on_job # Callback to execute job, returns response text
+ self.on_job = on_job
self._store: CronStore | None = None
+ self._last_mtime: float = 0.0
self._timer_task: asyncio.Task | None = None
self._running = False
-
+
def _load_store(self) -> CronStore:
- """Load jobs from disk."""
+ """Load jobs from disk. Reloads automatically if file was modified externally."""
+ if self._store and self.store_path.exists():
+ mtime = self.store_path.stat().st_mtime
+ if mtime != self._last_mtime:
+ logger.info("Cron: jobs.json modified externally, reloading")
+ self._store = None
if self._store:
return self._store
-
+
if self.store_path.exists():
try:
- data = json.loads(self.store_path.read_text())
+ data = json.loads(self.store_path.read_text(encoding="utf-8"))
jobs = []
for j in data.get("jobs", []):
jobs.append(CronJob(
@@ -98,20 +120,20 @@ class CronService:
))
self._store = CronStore(jobs=jobs)
except Exception as e:
- logger.warning(f"Failed to load cron store: {e}")
+ logger.warning("Failed to load cron store: {}", e)
self._store = CronStore()
else:
self._store = CronStore()
-
+
return self._store
-
+
def _save_store(self) -> None:
"""Save jobs to disk."""
if not self._store:
return
-
+
self.store_path.parent.mkdir(parents=True, exist_ok=True)
-
+
data = {
"version": self._store.version,
"jobs": [
@@ -146,8 +168,9 @@ class CronService:
for j in self._store.jobs
]
}
-
- self.store_path.write_text(json.dumps(data, indent=2))
+
+ self.store_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
+ self._last_mtime = self.store_path.stat().st_mtime
async def start(self) -> None:
"""Start the cron service."""
@@ -156,15 +179,15 @@ class CronService:
self._recompute_next_runs()
self._save_store()
self._arm_timer()
- logger.info(f"Cron service started with {len(self._store.jobs if self._store else [])} jobs")
-
+ logger.info("Cron service started with {} jobs", len(self._store.jobs if self._store else []))
+
def stop(self) -> None:
"""Stop the cron service."""
self._running = False
if self._timer_task:
self._timer_task.cancel()
self._timer_task = None
-
+
def _recompute_next_runs(self) -> None:
"""Recompute next run times for all enabled jobs."""
if not self._store:
@@ -173,73 +196,74 @@ class CronService:
for job in self._store.jobs:
if job.enabled:
job.state.next_run_at_ms = _compute_next_run(job.schedule, now)
-
+
def _get_next_wake_ms(self) -> int | None:
"""Get the earliest next run time across all jobs."""
if not self._store:
return None
- times = [j.state.next_run_at_ms for j in self._store.jobs
+ times = [j.state.next_run_at_ms for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms]
return min(times) if times else None
-
+
def _arm_timer(self) -> None:
"""Schedule the next timer tick."""
if self._timer_task:
self._timer_task.cancel()
-
+
next_wake = self._get_next_wake_ms()
if not next_wake or not self._running:
return
-
+
delay_ms = max(0, next_wake - _now_ms())
delay_s = delay_ms / 1000
-
+
async def tick():
await asyncio.sleep(delay_s)
if self._running:
await self._on_timer()
-
+
self._timer_task = asyncio.create_task(tick())
-
+
async def _on_timer(self) -> None:
"""Handle timer tick - run due jobs."""
+ self._load_store()
if not self._store:
return
-
+
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
-
+
for job in due_jobs:
await self._execute_job(job)
-
+
self._save_store()
self._arm_timer()
-
+
async def _execute_job(self, job: CronJob) -> None:
"""Execute a single job."""
start_ms = _now_ms()
- logger.info(f"Cron: executing job '{job.name}' ({job.id})")
-
+ logger.info("Cron: executing job '{}' ({})", job.name, job.id)
+
try:
response = None
if self.on_job:
response = await self.on_job(job)
-
+
job.state.last_status = "ok"
job.state.last_error = None
- logger.info(f"Cron: job '{job.name}' completed")
-
+ logger.info("Cron: job '{}' completed", job.name)
+
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)
- logger.error(f"Cron: job '{job.name}' failed: {e}")
-
+ logger.error("Cron: job '{}' failed: {}", job.name, e)
+
job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms()
-
+
# Handle one-shot jobs
if job.schedule.kind == "at":
if job.delete_after_run:
@@ -250,15 +274,15 @@ class CronService:
else:
# Compute next run
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
-
+
# ========== Public API ==========
-
+
def list_jobs(self, include_disabled: bool = False) -> list[CronJob]:
"""List all jobs."""
store = self._load_store()
jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled]
return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf'))
-
+
def add_job(
self,
name: str,
@@ -271,8 +295,9 @@ class CronService:
) -> CronJob:
"""Add a new job."""
store = self._load_store()
+ _validate_schedule_for_add(schedule)
now = _now_ms()
-
+
job = CronJob(
id=str(uuid.uuid4())[:8],
name=name,
@@ -290,28 +315,28 @@ class CronService:
updated_at_ms=now,
delete_after_run=delete_after_run,
)
-
+
store.jobs.append(job)
self._save_store()
self._arm_timer()
-
- logger.info(f"Cron: added job '{name}' ({job.id})")
+
+ logger.info("Cron: added job '{}' ({})", name, job.id)
return job
-
+
def remove_job(self, job_id: str) -> bool:
"""Remove a job by ID."""
store = self._load_store()
before = len(store.jobs)
store.jobs = [j for j in store.jobs if j.id != job_id]
removed = len(store.jobs) < before
-
+
if removed:
self._save_store()
self._arm_timer()
- logger.info(f"Cron: removed job {job_id}")
-
+ logger.info("Cron: removed job {}", job_id)
+
return removed
-
+
def enable_job(self, job_id: str, enabled: bool = True) -> CronJob | None:
"""Enable or disable a job."""
store = self._load_store()
@@ -327,7 +352,7 @@ class CronService:
self._arm_timer()
return job
return None
-
+
async def run_job(self, job_id: str, force: bool = False) -> bool:
"""Manually run a job."""
store = self._load_store()
@@ -340,7 +365,7 @@ class CronService:
self._arm_timer()
return True
return False
-
+
def status(self) -> dict:
"""Get service status."""
store = self._load_store()
diff --git a/nanobot/heartbeat/service.py b/nanobot/heartbeat/service.py
index 221ed27..e534017 100644
--- a/nanobot/heartbeat/service.py
+++ b/nanobot/heartbeat/service.py
@@ -1,92 +1,130 @@
"""Heartbeat service - periodic agent wake-up to check for tasks."""
+from __future__ import annotations
+
import asyncio
from pathlib import Path
-from typing import Any, Callable, Coroutine
+from typing import TYPE_CHECKING, Any, Callable, Coroutine
from loguru import logger
-# Default interval: 30 minutes
-DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60
+if TYPE_CHECKING:
+ from nanobot.providers.base import LLMProvider
-# The prompt sent to agent during heartbeat
-HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists).
-Follow any instructions or tasks listed there.
-If nothing needs attention, reply with just: HEARTBEAT_OK"""
-
-# Token that indicates "nothing to do"
-HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
-
-
-def _is_heartbeat_empty(content: str | None) -> bool:
- """Check if HEARTBEAT.md has no actionable content."""
- if not content:
- return True
-
- # Lines to skip: empty, headers, HTML comments, empty checkboxes
- skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
-
- for line in content.split("\n"):
- line = line.strip()
- if not line or line.startswith("#") or line.startswith("