fix: separate Telegram connection pools and add timeout retry to prevent pool exhaustion

The root cause of "Pool timeout" errors is that long-polling (getUpdates)
and outbound API calls (send_message, send_photo, etc.) shared the same
HTTPXRequest pool — polling holds connections indefinitely, starving sends
under concurrent load (e.g. cron jobs + user chat).

- Split into two independent pools: API calls (default 32) and polling (4)
- Expose connection_pool_size / pool_timeout in TelegramConfig for tuning
- Add _call_with_retry() with exponential backoff (3 attempts) on TimedOut
- Apply retry to _send_text and remote media URL sends
This commit is contained in:
Xubin Ren
2026-03-19 05:58:29 +00:00
committed by Xubin Ren
parent d9cb729596
commit dd7e3e499f
2 changed files with 154 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ from typing import Any, Literal
from loguru import logger from loguru import logger
from pydantic import Field from pydantic import Field
from telegram import BotCommand, ReplyParameters, Update from telegram import BotCommand, ReplyParameters, Update
from telegram.error import TimedOut
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.request import HTTPXRequest from telegram.request import HTTPXRequest
@@ -151,6 +152,10 @@ def _markdown_to_telegram_html(text: str) -> str:
return text return text
_SEND_MAX_RETRIES = 3
_SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry
class TelegramConfig(Base): class TelegramConfig(Base):
"""Telegram channel configuration.""" """Telegram channel configuration."""
@@ -160,6 +165,8 @@ class TelegramConfig(Base):
proxy: str | None = None proxy: str | None = None
reply_to_message: bool = False reply_to_message: bool = False
group_policy: Literal["open", "mention"] = "mention" group_policy: Literal["open", "mention"] = "mention"
connection_pool_size: int = 32
pool_timeout: float = 5.0
class TelegramChannel(BaseChannel): class TelegramChannel(BaseChannel):
@@ -226,15 +233,29 @@ class TelegramChannel(BaseChannel):
self._running = True self._running = True
# Build the application with larger connection pool to avoid pool-timeout on long runs proxy = self.config.proxy or None
req = HTTPXRequest(
connection_pool_size=16, # Separate pools so long-polling (getUpdates) never starves outbound sends.
pool_timeout=5.0, api_request = HTTPXRequest(
connection_pool_size=self.config.connection_pool_size,
pool_timeout=self.config.pool_timeout,
connect_timeout=30.0, connect_timeout=30.0,
read_timeout=30.0, read_timeout=30.0,
proxy=self.config.proxy if self.config.proxy else None, proxy=proxy,
)
poll_request = HTTPXRequest(
connection_pool_size=4,
pool_timeout=self.config.pool_timeout,
connect_timeout=30.0,
read_timeout=30.0,
proxy=proxy,
)
builder = (
Application.builder()
.token(self.config.token)
.request(api_request)
.get_updates_request(poll_request)
) )
builder = Application.builder().token(self.config.token).request(req).get_updates_request(req)
self._app = builder.build() self._app = builder.build()
self._app.add_error_handler(self._on_error) self._app.add_error_handler(self._on_error)
@@ -365,7 +386,8 @@ class TelegramChannel(BaseChannel):
ok, error = validate_url_target(media_path) ok, error = validate_url_target(media_path)
if not ok: if not ok:
raise ValueError(f"unsafe media URL: {error}") raise ValueError(f"unsafe media URL: {error}")
await sender( await self._call_with_retry(
sender,
chat_id=chat_id, chat_id=chat_id,
**{param: media_path}, **{param: media_path},
reply_parameters=reply_params, reply_parameters=reply_params,
@@ -401,6 +423,21 @@ class TelegramChannel(BaseChannel):
else: else:
await self._send_text(chat_id, chunk, reply_params, thread_kwargs) await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
async def _call_with_retry(self, fn, *args, **kwargs):
"""Call an async Telegram API function with retry on pool/network timeout."""
for attempt in range(1, _SEND_MAX_RETRIES + 1):
try:
return await fn(*args, **kwargs)
except TimedOut:
if attempt == _SEND_MAX_RETRIES:
raise
delay = _SEND_RETRY_BASE_DELAY * (2 ** (attempt - 1))
logger.warning(
"Telegram timeout (attempt {}/{}), retrying in {:.1f}s",
attempt, _SEND_MAX_RETRIES, delay,
)
await asyncio.sleep(delay)
async def _send_text( async def _send_text(
self, self,
chat_id: int, chat_id: int,
@@ -411,7 +448,8 @@ class TelegramChannel(BaseChannel):
"""Send a plain text message with HTML fallback.""" """Send a plain text message with HTML fallback."""
try: try:
html = _markdown_to_telegram_html(text) html = _markdown_to_telegram_html(text)
await self._app.bot.send_message( await self._call_with_retry(
self._app.bot.send_message,
chat_id=chat_id, text=html, parse_mode="HTML", chat_id=chat_id, text=html, parse_mode="HTML",
reply_parameters=reply_params, reply_parameters=reply_params,
**(thread_kwargs or {}), **(thread_kwargs or {}),
@@ -419,7 +457,8 @@ class TelegramChannel(BaseChannel):
except Exception as e: except Exception as e:
logger.warning("HTML parse failed, falling back to plain text: {}", e) logger.warning("HTML parse failed, falling back to plain text: {}", e)
try: try:
await self._app.bot.send_message( await self._call_with_retry(
self._app.bot.send_message,
chat_id=chat_id, chat_id=chat_id,
text=text, text=text,
reply_parameters=reply_params, reply_parameters=reply_params,

View File

@@ -18,6 +18,10 @@ class _FakeHTTPXRequest:
self.kwargs = kwargs self.kwargs = kwargs
self.__class__.instances.append(self) self.__class__.instances.append(self)
@classmethod
def clear(cls) -> None:
cls.instances.clear()
class _FakeUpdater: class _FakeUpdater:
def __init__(self, on_start_polling) -> None: def __init__(self, on_start_polling) -> None:
@@ -144,7 +148,8 @@ def _make_telegram_update(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None: async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None:
_FakeHTTPXRequest.clear()
config = TelegramConfig( config = TelegramConfig(
enabled=True, enabled=True,
token="123:abc", token="123:abc",
@@ -164,10 +169,106 @@ async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> No
await channel.start() await channel.start()
assert len(_FakeHTTPXRequest.instances) == 1 assert len(_FakeHTTPXRequest.instances) == 2
assert _FakeHTTPXRequest.instances[0].kwargs["proxy"] == config.proxy api_req, poll_req = _FakeHTTPXRequest.instances
assert builder.request_value is _FakeHTTPXRequest.instances[0] assert api_req.kwargs["proxy"] == config.proxy
assert builder.get_updates_request_value is _FakeHTTPXRequest.instances[0] assert poll_req.kwargs["proxy"] == config.proxy
assert api_req.kwargs["connection_pool_size"] == 32
assert poll_req.kwargs["connection_pool_size"] == 4
assert builder.request_value is api_req
assert builder.get_updates_request_value is poll_req
@pytest.mark.asyncio
async def test_start_respects_custom_pool_config(monkeypatch) -> None:
_FakeHTTPXRequest.clear()
config = TelegramConfig(
enabled=True,
token="123:abc",
allow_from=["*"],
connection_pool_size=32,
pool_timeout=10.0,
)
bus = MessageBus()
channel = TelegramChannel(config, bus)
app = _FakeApp(lambda: setattr(channel, "_running", False))
builder = _FakeBuilder(app)
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
monkeypatch.setattr(
"nanobot.channels.telegram.Application",
SimpleNamespace(builder=lambda: builder),
)
await channel.start()
api_req = _FakeHTTPXRequest.instances[0]
poll_req = _FakeHTTPXRequest.instances[1]
assert api_req.kwargs["connection_pool_size"] == 32
assert api_req.kwargs["pool_timeout"] == 10.0
assert poll_req.kwargs["pool_timeout"] == 10.0
@pytest.mark.asyncio
async def test_send_text_retries_on_timeout() -> None:
"""_send_text retries on TimedOut before succeeding."""
from telegram.error import TimedOut
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
call_count = 0
original_send = channel._app.bot.send_message
async def flaky_send(**kwargs):
nonlocal call_count
call_count += 1
if call_count <= 2:
raise TimedOut()
return await original_send(**kwargs)
channel._app.bot.send_message = flaky_send
import nanobot.channels.telegram as tg_mod
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
try:
await channel._send_text(123, "hello", None, {})
finally:
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
assert call_count == 3
assert len(channel._app.bot.sent_messages) == 1
@pytest.mark.asyncio
async def test_send_text_gives_up_after_max_retries() -> None:
"""_send_text raises TimedOut after exhausting all retries."""
from telegram.error import TimedOut
channel = TelegramChannel(
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
MessageBus(),
)
channel._app = _FakeApp(lambda: None)
async def always_timeout(**kwargs):
raise TimedOut()
channel._app.bot.send_message = always_timeout
import nanobot.channels.telegram as tg_mod
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
try:
await channel._send_text(123, "hello", None, {})
finally:
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
assert channel._app.bot.sent_messages == []
def test_derive_topic_session_key_uses_thread_id() -> None: def test_derive_topic_session_key_uses_thread_id() -> None: