diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index b982dd3..64d92e7 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -189,6 +189,10 @@ def _extract_post_content(content_json: dict) -> tuple[str, list[str]]: texts.append(el.get("text", "")) elif tag == "at": texts.append(f"@{el.get('user_name', 'user')}") + elif tag == "code_block": + lang = el.get("language", "") + code_text = el.get("text", "") + texts.append(f"\n```{lang}\n{code_text}\n```\n") elif tag == "img" and (key := el.get("image_key")): images.append(key) return (" ".join(texts).strip() or None), images @@ -1014,7 +1018,7 @@ class FeishuChannel(BaseChannel): event = data.event message = event.message sender = event.sender - + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index d22ee04..1cf9489 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -9,6 +9,7 @@ import unicodedata from loguru import logger from telegram import BotCommand, ReplyParameters, Update +from telegram.error import TimedOut from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.request import HTTPXRequest @@ -154,7 +155,8 @@ def _markdown_to_telegram_html(text: str) -> str: return text - +_SEND_MAX_RETRIES = 3 +_SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry class TelegramChannel(BaseChannel): """ Telegram channel using long polling. @@ -221,15 +223,29 @@ class TelegramChannel(BaseChannel): self._running = True - # Build the application with larger connection pool to avoid pool-timeout on long runs - req = HTTPXRequest( - connection_pool_size=16, - pool_timeout=5.0, + proxy = self.config.proxy or None + + # Separate pools so long-polling (getUpdates) never starves outbound sends. + api_request = HTTPXRequest( + connection_pool_size=self.config.connection_pool_size, + pool_timeout=self.config.pool_timeout, connect_timeout=30.0, read_timeout=30.0, - proxy=self.config.proxy if self.config.proxy else None, + proxy=proxy, + ) + poll_request = HTTPXRequest( + connection_pool_size=4, + pool_timeout=self.config.pool_timeout, + connect_timeout=30.0, + read_timeout=30.0, + proxy=proxy, + ) + builder = ( + Application.builder() + .token(self.config.token) + .request(api_request) + .get_updates_request(poll_request) ) - builder = Application.builder().token(self.config.token).request(req).get_updates_request(req) self._app = builder.build() self._app.add_error_handler(self._on_error) @@ -365,7 +381,8 @@ class TelegramChannel(BaseChannel): ok, error = validate_url_target(media_path) if not ok: raise ValueError(f"unsafe media URL: {error}") - await sender( + await self._call_with_retry( + sender, chat_id=chat_id, **{param: media_path}, reply_parameters=reply_params, @@ -401,6 +418,21 @@ class TelegramChannel(BaseChannel): else: await self._send_text(chat_id, chunk, reply_params, thread_kwargs) + async def _call_with_retry(self, fn, *args, **kwargs): + """Call an async Telegram API function with retry on pool/network timeout.""" + for attempt in range(1, _SEND_MAX_RETRIES + 1): + try: + return await fn(*args, **kwargs) + except TimedOut: + if attempt == _SEND_MAX_RETRIES: + raise + delay = _SEND_RETRY_BASE_DELAY * (2 ** (attempt - 1)) + logger.warning( + "Telegram timeout (attempt {}/{}), retrying in {:.1f}s", + attempt, _SEND_MAX_RETRIES, delay, + ) + await asyncio.sleep(delay) + async def _send_text( self, chat_id: int, @@ -411,7 +443,8 @@ class TelegramChannel(BaseChannel): """Send a plain text message with HTML fallback.""" try: html = _markdown_to_telegram_html(text) - await self._app.bot.send_message( + await self._call_with_retry( + self._app.bot.send_message, chat_id=chat_id, text=html, parse_mode="HTML", reply_parameters=reply_params, **(thread_kwargs or {}), @@ -419,7 +452,8 @@ class TelegramChannel(BaseChannel): except Exception as e: logger.warning("HTML parse failed, falling back to plain text: {}", e) try: - await self._app.bot.send_message( + await self._call_with_retry( + self._app.bot.send_message, chat_id=chat_id, text=text, reply_parameters=reply_params, diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index c1f5175..db6e92e 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -46,6 +46,8 @@ class TelegramConfig(Base): ) reply_to_message: bool = False # If true, bot replies quote the original message group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all + connection_pool_size: int = 32 # Outbound Telegram API HTTP pool size + pool_timeout: float = 5.0 # Shared HTTP pool timeout for bot sends and getUpdates class TelegramInstanceConfig(TelegramConfig): diff --git a/tests/test_telegram_channel.py b/tests/test_telegram_channel.py index 130acd8..7b798b8 100644 --- a/tests/test_telegram_channel.py +++ b/tests/test_telegram_channel.py @@ -16,6 +16,10 @@ class _FakeHTTPXRequest: self.kwargs = kwargs self.__class__.instances.append(self) + @classmethod + def clear(cls) -> None: + cls.instances.clear() + class _FakeUpdater: def __init__(self, on_start_polling) -> None: @@ -142,7 +146,8 @@ def _make_telegram_update( @pytest.mark.asyncio -async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None: +async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None: + _FakeHTTPXRequest.clear() config = TelegramConfig( enabled=True, token="123:abc", @@ -162,10 +167,106 @@ async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> No await channel.start() - assert len(_FakeHTTPXRequest.instances) == 1 - assert _FakeHTTPXRequest.instances[0].kwargs["proxy"] == config.proxy - assert builder.request_value is _FakeHTTPXRequest.instances[0] - assert builder.get_updates_request_value is _FakeHTTPXRequest.instances[0] + assert len(_FakeHTTPXRequest.instances) == 2 + api_req, poll_req = _FakeHTTPXRequest.instances + assert api_req.kwargs["proxy"] == config.proxy + assert poll_req.kwargs["proxy"] == config.proxy + assert api_req.kwargs["connection_pool_size"] == 32 + assert poll_req.kwargs["connection_pool_size"] == 4 + assert builder.request_value is api_req + assert builder.get_updates_request_value is poll_req + + +@pytest.mark.asyncio +async def test_start_respects_custom_pool_config(monkeypatch) -> None: + _FakeHTTPXRequest.clear() + config = TelegramConfig( + enabled=True, + token="123:abc", + allow_from=["*"], + connection_pool_size=32, + pool_timeout=10.0, + ) + bus = MessageBus() + channel = TelegramChannel(config, bus) + app = _FakeApp(lambda: setattr(channel, "_running", False)) + builder = _FakeBuilder(app) + + monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest) + monkeypatch.setattr( + "nanobot.channels.telegram.Application", + SimpleNamespace(builder=lambda: builder), + ) + + await channel.start() + + api_req = _FakeHTTPXRequest.instances[0] + poll_req = _FakeHTTPXRequest.instances[1] + assert api_req.kwargs["connection_pool_size"] == 32 + assert api_req.kwargs["pool_timeout"] == 10.0 + assert poll_req.kwargs["pool_timeout"] == 10.0 + + +@pytest.mark.asyncio +async def test_send_text_retries_on_timeout() -> None: + """_send_text retries on TimedOut before succeeding.""" + from telegram.error import TimedOut + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + call_count = 0 + original_send = channel._app.bot.send_message + + async def flaky_send(**kwargs): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise TimedOut() + return await original_send(**kwargs) + + channel._app.bot.send_message = flaky_send + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + await channel._send_text(123, "hello", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + assert call_count == 3 + assert len(channel._app.bot.sent_messages) == 1 + + +@pytest.mark.asyncio +async def test_send_text_gives_up_after_max_retries() -> None: + """_send_text raises TimedOut after exhausting all retries.""" + from telegram.error import TimedOut + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + async def always_timeout(**kwargs): + raise TimedOut() + + channel._app.bot.send_message = always_timeout + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + await channel._send_text(123, "hello", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + assert channel._app.bot.sent_messages == [] def test_derive_topic_session_key_uses_thread_id() -> None: