Merge remote-tracking branch 'origin/main'
# Conflicts: # nanobot/channels/telegram.py
This commit is contained in:
@@ -189,6 +189,10 @@ def _extract_post_content(content_json: dict) -> tuple[str, list[str]]:
|
|||||||
texts.append(el.get("text", ""))
|
texts.append(el.get("text", ""))
|
||||||
elif tag == "at":
|
elif tag == "at":
|
||||||
texts.append(f"@{el.get('user_name', 'user')}")
|
texts.append(f"@{el.get('user_name', 'user')}")
|
||||||
|
elif tag == "code_block":
|
||||||
|
lang = el.get("language", "")
|
||||||
|
code_text = el.get("text", "")
|
||||||
|
texts.append(f"\n```{lang}\n{code_text}\n```\n")
|
||||||
elif tag == "img" and (key := el.get("image_key")):
|
elif tag == "img" and (key := el.get("image_key")):
|
||||||
images.append(key)
|
images.append(key)
|
||||||
return (" ".join(texts).strip() or None), images
|
return (" ".join(texts).strip() or None), images
|
||||||
@@ -1014,7 +1018,7 @@ class FeishuChannel(BaseChannel):
|
|||||||
event = data.event
|
event = data.event
|
||||||
message = event.message
|
message = event.message
|
||||||
sender = event.sender
|
sender = event.sender
|
||||||
|
|
||||||
# Deduplication check
|
# Deduplication check
|
||||||
message_id = message.message_id
|
message_id = message.message_id
|
||||||
if message_id in self._processed_message_ids:
|
if message_id in self._processed_message_ids:
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import unicodedata
|
|||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from telegram import BotCommand, ReplyParameters, Update
|
from telegram import BotCommand, ReplyParameters, Update
|
||||||
|
from telegram.error import TimedOut
|
||||||
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
||||||
from telegram.request import HTTPXRequest
|
from telegram.request import HTTPXRequest
|
||||||
|
|
||||||
@@ -154,7 +155,8 @@ def _markdown_to_telegram_html(text: str) -> str:
|
|||||||
|
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
_SEND_MAX_RETRIES = 3
|
||||||
|
_SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry
|
||||||
class TelegramChannel(BaseChannel):
|
class TelegramChannel(BaseChannel):
|
||||||
"""
|
"""
|
||||||
Telegram channel using long polling.
|
Telegram channel using long polling.
|
||||||
@@ -221,15 +223,29 @@ class TelegramChannel(BaseChannel):
|
|||||||
|
|
||||||
self._running = True
|
self._running = True
|
||||||
|
|
||||||
# Build the application with larger connection pool to avoid pool-timeout on long runs
|
proxy = self.config.proxy or None
|
||||||
req = HTTPXRequest(
|
|
||||||
connection_pool_size=16,
|
# Separate pools so long-polling (getUpdates) never starves outbound sends.
|
||||||
pool_timeout=5.0,
|
api_request = HTTPXRequest(
|
||||||
|
connection_pool_size=self.config.connection_pool_size,
|
||||||
|
pool_timeout=self.config.pool_timeout,
|
||||||
connect_timeout=30.0,
|
connect_timeout=30.0,
|
||||||
read_timeout=30.0,
|
read_timeout=30.0,
|
||||||
proxy=self.config.proxy if self.config.proxy else None,
|
proxy=proxy,
|
||||||
|
)
|
||||||
|
poll_request = HTTPXRequest(
|
||||||
|
connection_pool_size=4,
|
||||||
|
pool_timeout=self.config.pool_timeout,
|
||||||
|
connect_timeout=30.0,
|
||||||
|
read_timeout=30.0,
|
||||||
|
proxy=proxy,
|
||||||
|
)
|
||||||
|
builder = (
|
||||||
|
Application.builder()
|
||||||
|
.token(self.config.token)
|
||||||
|
.request(api_request)
|
||||||
|
.get_updates_request(poll_request)
|
||||||
)
|
)
|
||||||
builder = Application.builder().token(self.config.token).request(req).get_updates_request(req)
|
|
||||||
self._app = builder.build()
|
self._app = builder.build()
|
||||||
self._app.add_error_handler(self._on_error)
|
self._app.add_error_handler(self._on_error)
|
||||||
|
|
||||||
@@ -365,7 +381,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
ok, error = validate_url_target(media_path)
|
ok, error = validate_url_target(media_path)
|
||||||
if not ok:
|
if not ok:
|
||||||
raise ValueError(f"unsafe media URL: {error}")
|
raise ValueError(f"unsafe media URL: {error}")
|
||||||
await sender(
|
await self._call_with_retry(
|
||||||
|
sender,
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
**{param: media_path},
|
**{param: media_path},
|
||||||
reply_parameters=reply_params,
|
reply_parameters=reply_params,
|
||||||
@@ -401,6 +418,21 @@ class TelegramChannel(BaseChannel):
|
|||||||
else:
|
else:
|
||||||
await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
|
await self._send_text(chat_id, chunk, reply_params, thread_kwargs)
|
||||||
|
|
||||||
|
async def _call_with_retry(self, fn, *args, **kwargs):
|
||||||
|
"""Call an async Telegram API function with retry on pool/network timeout."""
|
||||||
|
for attempt in range(1, _SEND_MAX_RETRIES + 1):
|
||||||
|
try:
|
||||||
|
return await fn(*args, **kwargs)
|
||||||
|
except TimedOut:
|
||||||
|
if attempt == _SEND_MAX_RETRIES:
|
||||||
|
raise
|
||||||
|
delay = _SEND_RETRY_BASE_DELAY * (2 ** (attempt - 1))
|
||||||
|
logger.warning(
|
||||||
|
"Telegram timeout (attempt {}/{}), retrying in {:.1f}s",
|
||||||
|
attempt, _SEND_MAX_RETRIES, delay,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
async def _send_text(
|
async def _send_text(
|
||||||
self,
|
self,
|
||||||
chat_id: int,
|
chat_id: int,
|
||||||
@@ -411,7 +443,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
"""Send a plain text message with HTML fallback."""
|
"""Send a plain text message with HTML fallback."""
|
||||||
try:
|
try:
|
||||||
html = _markdown_to_telegram_html(text)
|
html = _markdown_to_telegram_html(text)
|
||||||
await self._app.bot.send_message(
|
await self._call_with_retry(
|
||||||
|
self._app.bot.send_message,
|
||||||
chat_id=chat_id, text=html, parse_mode="HTML",
|
chat_id=chat_id, text=html, parse_mode="HTML",
|
||||||
reply_parameters=reply_params,
|
reply_parameters=reply_params,
|
||||||
**(thread_kwargs or {}),
|
**(thread_kwargs or {}),
|
||||||
@@ -419,7 +452,8 @@ class TelegramChannel(BaseChannel):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("HTML parse failed, falling back to plain text: {}", e)
|
logger.warning("HTML parse failed, falling back to plain text: {}", e)
|
||||||
try:
|
try:
|
||||||
await self._app.bot.send_message(
|
await self._call_with_retry(
|
||||||
|
self._app.bot.send_message,
|
||||||
chat_id=chat_id,
|
chat_id=chat_id,
|
||||||
text=text,
|
text=text,
|
||||||
reply_parameters=reply_params,
|
reply_parameters=reply_params,
|
||||||
|
|||||||
@@ -46,6 +46,8 @@ class TelegramConfig(Base):
|
|||||||
)
|
)
|
||||||
reply_to_message: bool = False # If true, bot replies quote the original message
|
reply_to_message: bool = False # If true, bot replies quote the original message
|
||||||
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all
|
group_policy: Literal["open", "mention"] = "mention" # "mention" responds when @mentioned or replied to, "open" responds to all
|
||||||
|
connection_pool_size: int = 32 # Outbound Telegram API HTTP pool size
|
||||||
|
pool_timeout: float = 5.0 # Shared HTTP pool timeout for bot sends and getUpdates
|
||||||
|
|
||||||
|
|
||||||
class TelegramInstanceConfig(TelegramConfig):
|
class TelegramInstanceConfig(TelegramConfig):
|
||||||
|
|||||||
@@ -16,6 +16,10 @@ class _FakeHTTPXRequest:
|
|||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
self.__class__.instances.append(self)
|
self.__class__.instances.append(self)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clear(cls) -> None:
|
||||||
|
cls.instances.clear()
|
||||||
|
|
||||||
|
|
||||||
class _FakeUpdater:
|
class _FakeUpdater:
|
||||||
def __init__(self, on_start_polling) -> None:
|
def __init__(self, on_start_polling) -> None:
|
||||||
@@ -142,7 +146,8 @@ def _make_telegram_update(
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None:
|
async def test_start_creates_separate_pools_with_proxy(monkeypatch) -> None:
|
||||||
|
_FakeHTTPXRequest.clear()
|
||||||
config = TelegramConfig(
|
config = TelegramConfig(
|
||||||
enabled=True,
|
enabled=True,
|
||||||
token="123:abc",
|
token="123:abc",
|
||||||
@@ -162,10 +167,106 @@ async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> No
|
|||||||
|
|
||||||
await channel.start()
|
await channel.start()
|
||||||
|
|
||||||
assert len(_FakeHTTPXRequest.instances) == 1
|
assert len(_FakeHTTPXRequest.instances) == 2
|
||||||
assert _FakeHTTPXRequest.instances[0].kwargs["proxy"] == config.proxy
|
api_req, poll_req = _FakeHTTPXRequest.instances
|
||||||
assert builder.request_value is _FakeHTTPXRequest.instances[0]
|
assert api_req.kwargs["proxy"] == config.proxy
|
||||||
assert builder.get_updates_request_value is _FakeHTTPXRequest.instances[0]
|
assert poll_req.kwargs["proxy"] == config.proxy
|
||||||
|
assert api_req.kwargs["connection_pool_size"] == 32
|
||||||
|
assert poll_req.kwargs["connection_pool_size"] == 4
|
||||||
|
assert builder.request_value is api_req
|
||||||
|
assert builder.get_updates_request_value is poll_req
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_respects_custom_pool_config(monkeypatch) -> None:
|
||||||
|
_FakeHTTPXRequest.clear()
|
||||||
|
config = TelegramConfig(
|
||||||
|
enabled=True,
|
||||||
|
token="123:abc",
|
||||||
|
allow_from=["*"],
|
||||||
|
connection_pool_size=32,
|
||||||
|
pool_timeout=10.0,
|
||||||
|
)
|
||||||
|
bus = MessageBus()
|
||||||
|
channel = TelegramChannel(config, bus)
|
||||||
|
app = _FakeApp(lambda: setattr(channel, "_running", False))
|
||||||
|
builder = _FakeBuilder(app)
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.channels.telegram.Application",
|
||||||
|
SimpleNamespace(builder=lambda: builder),
|
||||||
|
)
|
||||||
|
|
||||||
|
await channel.start()
|
||||||
|
|
||||||
|
api_req = _FakeHTTPXRequest.instances[0]
|
||||||
|
poll_req = _FakeHTTPXRequest.instances[1]
|
||||||
|
assert api_req.kwargs["connection_pool_size"] == 32
|
||||||
|
assert api_req.kwargs["pool_timeout"] == 10.0
|
||||||
|
assert poll_req.kwargs["pool_timeout"] == 10.0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_text_retries_on_timeout() -> None:
|
||||||
|
"""_send_text retries on TimedOut before succeeding."""
|
||||||
|
from telegram.error import TimedOut
|
||||||
|
|
||||||
|
channel = TelegramChannel(
|
||||||
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||||||
|
MessageBus(),
|
||||||
|
)
|
||||||
|
channel._app = _FakeApp(lambda: None)
|
||||||
|
|
||||||
|
call_count = 0
|
||||||
|
original_send = channel._app.bot.send_message
|
||||||
|
|
||||||
|
async def flaky_send(**kwargs):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count <= 2:
|
||||||
|
raise TimedOut()
|
||||||
|
return await original_send(**kwargs)
|
||||||
|
|
||||||
|
channel._app.bot.send_message = flaky_send
|
||||||
|
|
||||||
|
import nanobot.channels.telegram as tg_mod
|
||||||
|
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||||||
|
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||||||
|
try:
|
||||||
|
await channel._send_text(123, "hello", None, {})
|
||||||
|
finally:
|
||||||
|
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||||||
|
|
||||||
|
assert call_count == 3
|
||||||
|
assert len(channel._app.bot.sent_messages) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_text_gives_up_after_max_retries() -> None:
|
||||||
|
"""_send_text raises TimedOut after exhausting all retries."""
|
||||||
|
from telegram.error import TimedOut
|
||||||
|
|
||||||
|
channel = TelegramChannel(
|
||||||
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
||||||
|
MessageBus(),
|
||||||
|
)
|
||||||
|
channel._app = _FakeApp(lambda: None)
|
||||||
|
|
||||||
|
async def always_timeout(**kwargs):
|
||||||
|
raise TimedOut()
|
||||||
|
|
||||||
|
channel._app.bot.send_message = always_timeout
|
||||||
|
|
||||||
|
import nanobot.channels.telegram as tg_mod
|
||||||
|
orig_delay = tg_mod._SEND_RETRY_BASE_DELAY
|
||||||
|
tg_mod._SEND_RETRY_BASE_DELAY = 0.01
|
||||||
|
try:
|
||||||
|
await channel._send_text(123, "hello", None, {})
|
||||||
|
finally:
|
||||||
|
tg_mod._SEND_RETRY_BASE_DELAY = orig_delay
|
||||||
|
|
||||||
|
assert channel._app.bot.sent_messages == []
|
||||||
|
|
||||||
|
|
||||||
def test_derive_topic_session_key_uses_thread_id() -> None:
|
def test_derive_topic_session_key_uses_thread_id() -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user