- Add plugin discovery via Python entry_points (group: nanobot.channels) - Move 11 channel Config classes from schema.py into their own channel modules - ChannelsConfig now only keeps send_progress + send_tool_hints (extra=allow) - Each built-in channel parses dict->Pydantic in __init__, zero internal changes - All channels implement default_config() for onboard auto-population - nanobot onboard injects defaults for all discovered channels (built-in + plugins) - Add nanobot plugins list CLI command - Add Channel Plugin Guide (docs/CHANNEL_PLUGIN_GUIDE.md) - Fully backward compatible: existing config.json and sessions work as-is - 340 tests pass, zero regressions
600 lines
20 KiB
Python
600 lines
20 KiB
Python
import asyncio
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.telegram import TELEGRAM_REPLY_CONTEXT_MAX_LEN, TelegramChannel
|
|
from nanobot.channels.telegram import TelegramConfig
|
|
|
|
|
|
class _FakeHTTPXRequest:
|
|
instances: list["_FakeHTTPXRequest"] = []
|
|
|
|
def __init__(self, **kwargs) -> None:
|
|
self.kwargs = kwargs
|
|
self.__class__.instances.append(self)
|
|
|
|
|
|
class _FakeUpdater:
|
|
def __init__(self, on_start_polling) -> None:
|
|
self._on_start_polling = on_start_polling
|
|
|
|
async def start_polling(self, **kwargs) -> None:
|
|
self._on_start_polling()
|
|
|
|
|
|
class _FakeBot:
|
|
def __init__(self) -> None:
|
|
self.sent_messages: list[dict] = []
|
|
self.get_me_calls = 0
|
|
|
|
async def get_me(self):
|
|
self.get_me_calls += 1
|
|
return SimpleNamespace(id=999, username="nanobot_test")
|
|
|
|
async def set_my_commands(self, commands) -> None:
|
|
self.commands = commands
|
|
|
|
async def send_message(self, **kwargs) -> None:
|
|
self.sent_messages.append(kwargs)
|
|
|
|
async def send_chat_action(self, **kwargs) -> None:
|
|
pass
|
|
|
|
async def get_file(self, file_id: str):
|
|
"""Return a fake file that 'downloads' to a path (for reply-to-media tests)."""
|
|
async def _fake_download(path) -> None:
|
|
pass
|
|
return SimpleNamespace(download_to_drive=_fake_download)
|
|
|
|
|
|
class _FakeApp:
|
|
def __init__(self, on_start_polling) -> None:
|
|
self.bot = _FakeBot()
|
|
self.updater = _FakeUpdater(on_start_polling)
|
|
self.handlers = []
|
|
self.error_handlers = []
|
|
|
|
def add_error_handler(self, handler) -> None:
|
|
self.error_handlers.append(handler)
|
|
|
|
def add_handler(self, handler) -> None:
|
|
self.handlers.append(handler)
|
|
|
|
async def initialize(self) -> None:
|
|
pass
|
|
|
|
async def start(self) -> None:
|
|
pass
|
|
|
|
|
|
class _FakeBuilder:
|
|
def __init__(self, app: _FakeApp) -> None:
|
|
self.app = app
|
|
self.token_value = None
|
|
self.request_value = None
|
|
self.get_updates_request_value = None
|
|
|
|
def token(self, token: str):
|
|
self.token_value = token
|
|
return self
|
|
|
|
def request(self, request):
|
|
self.request_value = request
|
|
return self
|
|
|
|
def get_updates_request(self, request):
|
|
self.get_updates_request_value = request
|
|
return self
|
|
|
|
def proxy(self, _proxy):
|
|
raise AssertionError("builder.proxy should not be called when request is set")
|
|
|
|
def get_updates_proxy(self, _proxy):
|
|
raise AssertionError("builder.get_updates_proxy should not be called when request is set")
|
|
|
|
def build(self):
|
|
return self.app
|
|
|
|
|
|
def _make_telegram_update(
|
|
*,
|
|
chat_type: str = "group",
|
|
text: str | None = None,
|
|
caption: str | None = None,
|
|
entities=None,
|
|
caption_entities=None,
|
|
reply_to_message=None,
|
|
):
|
|
user = SimpleNamespace(id=12345, username="alice", first_name="Alice")
|
|
message = SimpleNamespace(
|
|
chat=SimpleNamespace(type=chat_type, is_forum=False),
|
|
chat_id=-100123,
|
|
text=text,
|
|
caption=caption,
|
|
entities=entities or [],
|
|
caption_entities=caption_entities or [],
|
|
reply_to_message=reply_to_message,
|
|
photo=None,
|
|
voice=None,
|
|
audio=None,
|
|
document=None,
|
|
media_group_id=None,
|
|
message_thread_id=None,
|
|
message_id=1,
|
|
)
|
|
return SimpleNamespace(message=message, effective_user=user)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_uses_request_proxy_without_builder_proxy(monkeypatch) -> None:
|
|
config = TelegramConfig(
|
|
enabled=True,
|
|
token="123:abc",
|
|
allow_from=["*"],
|
|
proxy="http://127.0.0.1:7890",
|
|
)
|
|
bus = MessageBus()
|
|
channel = TelegramChannel(config, bus)
|
|
app = _FakeApp(lambda: setattr(channel, "_running", False))
|
|
builder = _FakeBuilder(app)
|
|
|
|
monkeypatch.setattr("nanobot.channels.telegram.HTTPXRequest", _FakeHTTPXRequest)
|
|
monkeypatch.setattr(
|
|
"nanobot.channels.telegram.Application",
|
|
SimpleNamespace(builder=lambda: builder),
|
|
)
|
|
|
|
await channel.start()
|
|
|
|
assert len(_FakeHTTPXRequest.instances) == 1
|
|
assert _FakeHTTPXRequest.instances[0].kwargs["proxy"] == config.proxy
|
|
assert builder.request_value is _FakeHTTPXRequest.instances[0]
|
|
assert builder.get_updates_request_value is _FakeHTTPXRequest.instances[0]
|
|
|
|
|
|
def test_derive_topic_session_key_uses_thread_id() -> None:
|
|
message = SimpleNamespace(
|
|
chat=SimpleNamespace(type="supergroup"),
|
|
chat_id=-100123,
|
|
message_thread_id=42,
|
|
)
|
|
|
|
assert TelegramChannel._derive_topic_session_key(message) == "telegram:-100123:topic:42"
|
|
|
|
|
|
def test_get_extension_falls_back_to_original_filename() -> None:
|
|
channel = TelegramChannel(TelegramConfig(), MessageBus())
|
|
|
|
assert channel._get_extension("file", None, "report.pdf") == ".pdf"
|
|
assert channel._get_extension("file", None, "archive.tar.gz") == ".tar.gz"
|
|
|
|
|
|
def test_telegram_group_policy_defaults_to_mention() -> None:
|
|
assert TelegramConfig().group_policy == "mention"
|
|
|
|
|
|
def test_is_allowed_accepts_legacy_telegram_id_username_formats() -> None:
|
|
channel = TelegramChannel(TelegramConfig(allow_from=["12345", "alice", "67890|bob"]), MessageBus())
|
|
|
|
assert channel.is_allowed("12345|carol") is True
|
|
assert channel.is_allowed("99999|alice") is True
|
|
assert channel.is_allowed("67890|bob") is True
|
|
|
|
|
|
def test_is_allowed_rejects_invalid_legacy_telegram_sender_shapes() -> None:
|
|
channel = TelegramChannel(TelegramConfig(allow_from=["alice"]), MessageBus())
|
|
|
|
assert channel.is_allowed("attacker|alice|extra") is False
|
|
assert channel.is_allowed("not-a-number|alice") is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_progress_keeps_message_in_topic() -> None:
|
|
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"])
|
|
channel = TelegramChannel(config, MessageBus())
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
await channel.send(
|
|
OutboundMessage(
|
|
channel="telegram",
|
|
chat_id="123",
|
|
content="hello",
|
|
metadata={"_progress": True, "message_thread_id": 42},
|
|
)
|
|
)
|
|
|
|
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_reply_infers_topic_from_message_id_cache() -> None:
|
|
config = TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], reply_to_message=True)
|
|
channel = TelegramChannel(config, MessageBus())
|
|
channel._app = _FakeApp(lambda: None)
|
|
channel._message_threads[("123", 10)] = 42
|
|
|
|
await channel.send(
|
|
OutboundMessage(
|
|
channel="telegram",
|
|
chat_id="123",
|
|
content="hello",
|
|
metadata={"message_id": 10},
|
|
)
|
|
)
|
|
|
|
assert channel._app.bot.sent_messages[0]["message_thread_id"] == 42
|
|
assert channel._app.bot.sent_messages[0]["reply_parameters"].message_id == 10
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_policy_mention_ignores_unmentioned_group_message() -> None:
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
handled = []
|
|
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
await channel._on_message(_make_telegram_update(text="hello everyone"), None)
|
|
|
|
assert handled == []
|
|
assert channel._app.bot.get_me_calls == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_policy_mention_accepts_text_mention_and_caches_bot_identity() -> None:
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
handled = []
|
|
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
mention = SimpleNamespace(type="mention", offset=0, length=13)
|
|
await channel._on_message(_make_telegram_update(text="@nanobot_test hi", entities=[mention]), None)
|
|
await channel._on_message(_make_telegram_update(text="@nanobot_test again", entities=[mention]), None)
|
|
|
|
assert len(handled) == 2
|
|
assert channel._app.bot.get_me_calls == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_policy_mention_accepts_caption_mention() -> None:
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
handled = []
|
|
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
mention = SimpleNamespace(type="mention", offset=0, length=13)
|
|
await channel._on_message(
|
|
_make_telegram_update(caption="@nanobot_test photo", caption_entities=[mention]),
|
|
None,
|
|
)
|
|
|
|
assert len(handled) == 1
|
|
assert handled[0]["content"] == "@nanobot_test photo"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_policy_mention_accepts_reply_to_bot() -> None:
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="mention"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
handled = []
|
|
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
reply = SimpleNamespace(from_user=SimpleNamespace(id=999))
|
|
await channel._on_message(_make_telegram_update(text="reply", reply_to_message=reply), None)
|
|
|
|
assert len(handled) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_group_policy_open_accepts_plain_group_message() -> None:
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
|
|
handled = []
|
|
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
await channel._on_message(_make_telegram_update(text="hello group"), None)
|
|
|
|
assert len(handled) == 1
|
|
assert channel._app.bot.get_me_calls == 0
|
|
|
|
|
|
def test_extract_reply_context_no_reply() -> None:
|
|
"""When there is no reply_to_message, _extract_reply_context returns None."""
|
|
message = SimpleNamespace(reply_to_message=None)
|
|
assert TelegramChannel._extract_reply_context(message) is None
|
|
|
|
|
|
def test_extract_reply_context_with_text() -> None:
|
|
"""When reply has text, return prefixed string."""
|
|
reply = SimpleNamespace(text="Hello world", caption=None)
|
|
message = SimpleNamespace(reply_to_message=reply)
|
|
assert TelegramChannel._extract_reply_context(message) == "[Reply to: Hello world]"
|
|
|
|
|
|
def test_extract_reply_context_with_caption_only() -> None:
|
|
"""When reply has only caption (no text), caption is used."""
|
|
reply = SimpleNamespace(text=None, caption="Photo caption")
|
|
message = SimpleNamespace(reply_to_message=reply)
|
|
assert TelegramChannel._extract_reply_context(message) == "[Reply to: Photo caption]"
|
|
|
|
|
|
def test_extract_reply_context_truncation() -> None:
|
|
"""Reply text is truncated at TELEGRAM_REPLY_CONTEXT_MAX_LEN."""
|
|
long_text = "x" * (TELEGRAM_REPLY_CONTEXT_MAX_LEN + 100)
|
|
reply = SimpleNamespace(text=long_text, caption=None)
|
|
message = SimpleNamespace(reply_to_message=reply)
|
|
result = TelegramChannel._extract_reply_context(message)
|
|
assert result is not None
|
|
assert result.startswith("[Reply to: ")
|
|
assert result.endswith("...]")
|
|
assert len(result) == len("[Reply to: ]") + TELEGRAM_REPLY_CONTEXT_MAX_LEN + len("...")
|
|
|
|
|
|
def test_extract_reply_context_no_text_returns_none() -> None:
|
|
"""When reply has no text/caption, _extract_reply_context returns None (media handled separately)."""
|
|
reply = SimpleNamespace(text=None, caption=None)
|
|
message = SimpleNamespace(reply_to_message=reply)
|
|
assert TelegramChannel._extract_reply_context(message) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_includes_reply_context() -> None:
|
|
"""When user replies to a message, content passed to bus starts with reply context."""
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
handled = []
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
reply = SimpleNamespace(text="Hello", message_id=2, from_user=SimpleNamespace(id=1))
|
|
update = _make_telegram_update(text="translate this", reply_to_message=reply)
|
|
await channel._on_message(update, None)
|
|
|
|
assert len(handled) == 1
|
|
assert handled[0]["content"].startswith("[Reply to: Hello]")
|
|
assert "translate this" in handled[0]["content"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_message_media_returns_path_when_download_succeeds(
|
|
monkeypatch, tmp_path
|
|
) -> None:
|
|
"""_download_message_media returns (paths, content_parts) when bot.get_file and download succeed."""
|
|
media_dir = tmp_path / "media" / "telegram"
|
|
media_dir.mkdir(parents=True)
|
|
monkeypatch.setattr(
|
|
"nanobot.channels.telegram.get_media_dir",
|
|
lambda channel=None: media_dir if channel else tmp_path / "media",
|
|
)
|
|
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
channel._app.bot.get_file = AsyncMock(
|
|
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
|
)
|
|
|
|
msg = SimpleNamespace(
|
|
photo=[SimpleNamespace(file_id="fid123", mime_type="image/jpeg")],
|
|
voice=None,
|
|
audio=None,
|
|
document=None,
|
|
video=None,
|
|
video_note=None,
|
|
animation=None,
|
|
)
|
|
paths, parts = await channel._download_message_media(msg)
|
|
assert len(paths) == 1
|
|
assert len(parts) == 1
|
|
assert "fid123" in paths[0]
|
|
assert "[image:" in parts[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_attaches_reply_to_media_when_available(monkeypatch, tmp_path) -> None:
|
|
"""When user replies to a message with media, that media is downloaded and attached to the turn."""
|
|
media_dir = tmp_path / "media" / "telegram"
|
|
media_dir.mkdir(parents=True)
|
|
monkeypatch.setattr(
|
|
"nanobot.channels.telegram.get_media_dir",
|
|
lambda channel=None: media_dir if channel else tmp_path / "media",
|
|
)
|
|
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
app = _FakeApp(lambda: None)
|
|
app.bot.get_file = AsyncMock(
|
|
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
|
)
|
|
channel._app = app
|
|
handled = []
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
reply_with_photo = SimpleNamespace(
|
|
text=None,
|
|
caption=None,
|
|
photo=[SimpleNamespace(file_id="reply_photo_fid", mime_type="image/jpeg")],
|
|
document=None,
|
|
voice=None,
|
|
audio=None,
|
|
video=None,
|
|
video_note=None,
|
|
animation=None,
|
|
)
|
|
update = _make_telegram_update(
|
|
text="what is the image?",
|
|
reply_to_message=reply_with_photo,
|
|
)
|
|
await channel._on_message(update, None)
|
|
|
|
assert len(handled) == 1
|
|
assert handled[0]["content"].startswith("[Reply to: [image:")
|
|
assert "what is the image?" in handled[0]["content"]
|
|
assert len(handled[0]["media"]) == 1
|
|
assert "reply_photo_fid" in handled[0]["media"][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_reply_to_media_fallback_when_download_fails() -> None:
|
|
"""When reply has media but download fails, no media attached and no reply tag."""
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
channel._app.bot.get_file = None
|
|
handled = []
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
reply_with_photo = SimpleNamespace(
|
|
text=None,
|
|
caption=None,
|
|
photo=[SimpleNamespace(file_id="x", mime_type="image/jpeg")],
|
|
document=None,
|
|
voice=None,
|
|
audio=None,
|
|
video=None,
|
|
video_note=None,
|
|
animation=None,
|
|
)
|
|
update = _make_telegram_update(text="what is this?", reply_to_message=reply_with_photo)
|
|
await channel._on_message(update, None)
|
|
|
|
assert len(handled) == 1
|
|
assert "what is this?" in handled[0]["content"]
|
|
assert handled[0]["media"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_on_message_reply_to_caption_and_media(monkeypatch, tmp_path) -> None:
|
|
"""When replying to a message with caption + photo, both text context and media are included."""
|
|
media_dir = tmp_path / "media" / "telegram"
|
|
media_dir.mkdir(parents=True)
|
|
monkeypatch.setattr(
|
|
"nanobot.channels.telegram.get_media_dir",
|
|
lambda channel=None: media_dir if channel else tmp_path / "media",
|
|
)
|
|
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
app = _FakeApp(lambda: None)
|
|
app.bot.get_file = AsyncMock(
|
|
return_value=SimpleNamespace(download_to_drive=AsyncMock(return_value=None))
|
|
)
|
|
channel._app = app
|
|
handled = []
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
channel._handle_message = capture_handle
|
|
channel._start_typing = lambda _chat_id: None
|
|
|
|
reply_with_caption_and_photo = SimpleNamespace(
|
|
text=None,
|
|
caption="A cute cat",
|
|
photo=[SimpleNamespace(file_id="cat_fid", mime_type="image/jpeg")],
|
|
document=None,
|
|
voice=None,
|
|
audio=None,
|
|
video=None,
|
|
video_note=None,
|
|
animation=None,
|
|
)
|
|
update = _make_telegram_update(
|
|
text="what breed is this?",
|
|
reply_to_message=reply_with_caption_and_photo,
|
|
)
|
|
await channel._on_message(update, None)
|
|
|
|
assert len(handled) == 1
|
|
assert "[Reply to: A cute cat]" in handled[0]["content"]
|
|
assert "what breed is this?" in handled[0]["content"]
|
|
assert len(handled[0]["media"]) == 1
|
|
assert "cat_fid" in handled[0]["media"][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_forward_command_does_not_inject_reply_context() -> None:
|
|
"""Slash commands forwarded via _forward_command must not include reply context."""
|
|
channel = TelegramChannel(
|
|
TelegramConfig(enabled=True, token="123:abc", allow_from=["*"], group_policy="open"),
|
|
MessageBus(),
|
|
)
|
|
channel._app = _FakeApp(lambda: None)
|
|
handled = []
|
|
async def capture_handle(**kwargs) -> None:
|
|
handled.append(kwargs)
|
|
channel._handle_message = capture_handle
|
|
|
|
reply = SimpleNamespace(text="some old message", message_id=2, from_user=SimpleNamespace(id=1))
|
|
update = _make_telegram_update(text="/new", reply_to_message=reply)
|
|
await channel._forward_command(update, None)
|
|
|
|
assert len(handled) == 1
|
|
assert handled[0]["content"] == "/new"
|