refactor(channels): extract split_message utility to reduce code duplication

Extract the _split_message function from discord.py and telegram.py
into a shared utility function in utils/helpers.py.

Changes:
- Add split_message() to nanobot/utils/helpers.py with configurable max_len
- Update Discord channel to use shared utility (2000 char limit)
- Update Telegram channel to use shared utility (4000 char limit)
- Remove duplicate implementations from both channels

Benefits:
- Reduces code duplication
- Centralizes message splitting logic for easier maintenance
- Makes the function reusable for future channels

The function splits content into chunks within max_len, preferring
to break at newlines or spaces rather than mid-word.
This commit is contained in:
suger-m
2026-03-05 11:14:04 +08:00
parent fb74281434
commit 323e5f22cc
3 changed files with 38 additions and 44 deletions

View File

@@ -13,34 +13,13 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DiscordConfig from nanobot.config.schema import DiscordConfig
from nanobot.utils.helpers import split_message
DISCORD_API_BASE = "https://discord.com/api/v10" DISCORD_API_BASE = "https://discord.com/api/v10"
MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB
MAX_MESSAGE_LEN = 2000 # Discord message character limit MAX_MESSAGE_LEN = 2000 # Discord message character limit
def _split_message(content: str, max_len: int = MAX_MESSAGE_LEN) -> list[str]:
"""Split content into chunks within max_len, preferring line breaks."""
if not content:
return []
if len(content) <= max_len:
return [content]
chunks: list[str] = []
while content:
if len(content) <= max_len:
chunks.append(content)
break
cut = content[:max_len]
pos = cut.rfind('\n')
if pos <= 0:
pos = cut.rfind(' ')
if pos <= 0:
pos = max_len
chunks.append(content[:pos])
content = content[pos:].lstrip()
return chunks
class DiscordChannel(BaseChannel): class DiscordChannel(BaseChannel):
"""Discord channel using Gateway websocket.""" """Discord channel using Gateway websocket."""
@@ -104,7 +83,7 @@ class DiscordChannel(BaseChannel):
headers = {"Authorization": f"Bot {self.config.token}"} headers = {"Authorization": f"Bot {self.config.token}"}
try: try:
chunks = _split_message(msg.content or "") chunks = split_message(msg.content or "", MAX_MESSAGE_LEN)
if not chunks: if not chunks:
return return

View File

@@ -14,6 +14,9 @@ from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import TelegramConfig from nanobot.config.schema import TelegramConfig
from nanobot.utils.helpers import split_message
TELEGRAM_MAX_MESSAGE_LEN = 4000 # Telegram message character limit
def _markdown_to_telegram_html(text: str) -> str: def _markdown_to_telegram_html(text: str) -> str:
@@ -79,26 +82,6 @@ def _markdown_to_telegram_html(text: str) -> str:
return text return text
def _split_message(content: str, max_len: int = 4000) -> list[str]:
"""Split content into chunks within max_len, preferring line breaks."""
if len(content) <= max_len:
return [content]
chunks: list[str] = []
while content:
if len(content) <= max_len:
chunks.append(content)
break
cut = content[:max_len]
pos = cut.rfind('\n')
if pos == -1:
pos = cut.rfind(' ')
if pos == -1:
pos = max_len
chunks.append(content[:pos])
content = content[pos:].lstrip()
return chunks
class TelegramChannel(BaseChannel): class TelegramChannel(BaseChannel):
""" """
Telegram channel using long polling. Telegram channel using long polling.
@@ -269,7 +252,7 @@ class TelegramChannel(BaseChannel):
# Send text content # Send text content
if msg.content and msg.content != "[empty message]": if msg.content and msg.content != "[empty message]":
for chunk in _split_message(msg.content): for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN):
try: try:
html = _markdown_to_telegram_html(chunk) html = _markdown_to_telegram_html(chunk)
await self._app.bot.send_message( await self._app.bot.send_message(

View File

@@ -34,6 +34,38 @@ def safe_filename(name: str) -> str:
return _UNSAFE_CHARS.sub("_", name).strip() return _UNSAFE_CHARS.sub("_", name).strip()
def split_message(content: str, max_len: int = 2000) -> list[str]:
"""
Split content into chunks within max_len, preferring line breaks.
Args:
content: The text content to split.
max_len: Maximum length per chunk (default 2000 for Discord compatibility).
Returns:
List of message chunks, each within max_len.
"""
if not content:
return []
if len(content) <= max_len:
return [content]
chunks: list[str] = []
while content:
if len(content) <= max_len:
chunks.append(content)
break
cut = content[:max_len]
# Try to break at newline first, then space, then hard break
pos = cut.rfind('\n')
if pos <= 0:
pos = cut.rfind(' ')
if pos <= 0:
pos = max_len
chunks.append(content[:pos])
content = content[pos:].lstrip()
return chunks
def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]: def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]:
"""Sync bundled templates to workspace. Only creates missing files.""" """Sync bundled templates to workspace. Only creates missing files."""
from importlib.resources import files as pkg_files from importlib.resources import files as pkg_files