Merge branch 'main' into pr-219
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
"""Channel manager for coordinating chat channels."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from typing import Any, TYPE_CHECKING
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -10,6 +12,9 @@ from nanobot.bus.queue import MessageBus
|
||||
from nanobot.channels.base import BaseChannel
|
||||
from nanobot.config.schema import Config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nanobot.session.manager import SessionManager
|
||||
|
||||
|
||||
class ChannelManager:
|
||||
"""
|
||||
@@ -21,9 +26,10 @@ class ChannelManager:
|
||||
- Route outbound messages
|
||||
"""
|
||||
|
||||
def __init__(self, config: Config, bus: MessageBus):
|
||||
def __init__(self, config: Config, bus: MessageBus, session_manager: "SessionManager | None" = None):
|
||||
self.config = config
|
||||
self.bus = bus
|
||||
self.session_manager = session_manager
|
||||
self.channels: dict[str, BaseChannel] = {}
|
||||
self._dispatch_task: asyncio.Task | None = None
|
||||
|
||||
@@ -40,6 +46,7 @@ class ChannelManager:
|
||||
self.config.channels.telegram,
|
||||
self.bus,
|
||||
groq_api_key=self.config.providers.groq.api_key,
|
||||
session_manager=self.session_manager,
|
||||
)
|
||||
logger.info("Telegram channel enabled")
|
||||
except ImportError as e:
|
||||
|
||||
@@ -1,17 +1,23 @@
|
||||
"""Telegram channel implementation using python-telegram-bot."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from loguru import logger
|
||||
from telegram import Update
|
||||
from telegram.ext import Application, MessageHandler, filters, ContextTypes
|
||||
from telegram import BotCommand, Update
|
||||
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
||||
|
||||
from nanobot.bus.events import OutboundMessage
|
||||
from nanobot.bus.queue import MessageBus
|
||||
from nanobot.channels.base import BaseChannel
|
||||
from nanobot.config.schema import TelegramConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nanobot.session.manager import SessionManager
|
||||
|
||||
|
||||
def _markdown_to_telegram_html(text: str) -> str:
|
||||
"""
|
||||
@@ -85,12 +91,27 @@ class TelegramChannel(BaseChannel):
|
||||
|
||||
name = "telegram"
|
||||
|
||||
def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""):
|
||||
# Commands registered with Telegram's command menu
|
||||
BOT_COMMANDS = [
|
||||
BotCommand("start", "Start the bot"),
|
||||
BotCommand("reset", "Reset conversation history"),
|
||||
BotCommand("help", "Show available commands"),
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: TelegramConfig,
|
||||
bus: MessageBus,
|
||||
groq_api_key: str = "",
|
||||
session_manager: SessionManager | None = None,
|
||||
):
|
||||
super().__init__(config, bus)
|
||||
self.config: TelegramConfig = config
|
||||
self.groq_api_key = groq_api_key
|
||||
self.session_manager = session_manager
|
||||
self._app: Application | None = None
|
||||
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
|
||||
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the Telegram bot with long polling."""
|
||||
@@ -106,6 +127,11 @@ class TelegramChannel(BaseChannel):
|
||||
builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
|
||||
self._app = builder.build()
|
||||
|
||||
# Add command handlers
|
||||
self._app.add_handler(CommandHandler("start", self._on_start))
|
||||
self._app.add_handler(CommandHandler("reset", self._on_reset))
|
||||
self._app.add_handler(CommandHandler("help", self._on_help))
|
||||
|
||||
# Add message handler for text, photos, voice, documents
|
||||
self._app.add_handler(
|
||||
MessageHandler(
|
||||
@@ -115,20 +141,22 @@ class TelegramChannel(BaseChannel):
|
||||
)
|
||||
)
|
||||
|
||||
# Add /start command handler
|
||||
from telegram.ext import CommandHandler
|
||||
self._app.add_handler(CommandHandler("start", self._on_start))
|
||||
|
||||
logger.info("Starting Telegram bot (polling mode)...")
|
||||
|
||||
# Initialize and start polling
|
||||
await self._app.initialize()
|
||||
await self._app.start()
|
||||
|
||||
# Get bot info
|
||||
# Get bot info and register command menu
|
||||
bot_info = await self._app.bot.get_me()
|
||||
logger.info(f"Telegram bot @{bot_info.username} connected")
|
||||
|
||||
try:
|
||||
await self._app.bot.set_my_commands(self.BOT_COMMANDS)
|
||||
logger.debug("Telegram bot commands registered")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to register bot commands: {e}")
|
||||
|
||||
# Start polling (this runs until stopped)
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=["message"],
|
||||
@@ -143,6 +171,10 @@ class TelegramChannel(BaseChannel):
|
||||
"""Stop the Telegram bot."""
|
||||
self._running = False
|
||||
|
||||
# Cancel all typing indicators
|
||||
for chat_id in list(self._typing_tasks):
|
||||
self._stop_typing(chat_id)
|
||||
|
||||
if self._app:
|
||||
logger.info("Stopping Telegram bot...")
|
||||
await self._app.updater.stop()
|
||||
@@ -156,6 +188,9 @@ class TelegramChannel(BaseChannel):
|
||||
logger.warning("Telegram bot not running")
|
||||
return
|
||||
|
||||
# Stop typing indicator for this chat
|
||||
self._stop_typing(msg.chat_id)
|
||||
|
||||
try:
|
||||
# chat_id should be the Telegram chat ID (integer)
|
||||
chat_id = int(msg.chat_id)
|
||||
@@ -187,9 +222,45 @@ class TelegramChannel(BaseChannel):
|
||||
user = update.effective_user
|
||||
await update.message.reply_text(
|
||||
f"👋 Hi {user.first_name}! I'm nanobot.\n\n"
|
||||
"Send me a message and I'll respond!"
|
||||
"Send me a message and I'll respond!\n"
|
||||
"Type /help to see available commands."
|
||||
)
|
||||
|
||||
async def _on_reset(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /reset command — clear conversation history."""
|
||||
if not update.message or not update.effective_user:
|
||||
return
|
||||
|
||||
chat_id = str(update.message.chat_id)
|
||||
session_key = f"{self.name}:{chat_id}"
|
||||
|
||||
if self.session_manager is None:
|
||||
logger.warning("/reset called but session_manager is not available")
|
||||
await update.message.reply_text("⚠️ Session management is not available.")
|
||||
return
|
||||
|
||||
session = self.session_manager.get_or_create(session_key)
|
||||
msg_count = len(session.messages)
|
||||
session.clear()
|
||||
self.session_manager.save(session)
|
||||
|
||||
logger.info(f"Session reset for {session_key} (cleared {msg_count} messages)")
|
||||
await update.message.reply_text("🔄 Conversation history cleared. Let's start fresh!")
|
||||
|
||||
async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle /help command — show available commands."""
|
||||
if not update.message:
|
||||
return
|
||||
|
||||
help_text = (
|
||||
"🐈 <b>nanobot commands</b>\n\n"
|
||||
"/start — Start the bot\n"
|
||||
"/reset — Reset conversation history\n"
|
||||
"/help — Show this help message\n\n"
|
||||
"Just send me a text message to chat!"
|
||||
)
|
||||
await update.message.reply_text(help_text, parse_mode="HTML")
|
||||
|
||||
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle incoming messages (text, photos, voice, documents)."""
|
||||
if not update.message or not update.effective_user:
|
||||
@@ -272,10 +343,15 @@ class TelegramChannel(BaseChannel):
|
||||
|
||||
logger.debug(f"Telegram message from {sender_id}: {content[:50]}...")
|
||||
|
||||
str_chat_id = str(chat_id)
|
||||
|
||||
# Start typing indicator before processing
|
||||
self._start_typing(str_chat_id)
|
||||
|
||||
# Forward to the message bus
|
||||
await self._handle_message(
|
||||
sender_id=sender_id,
|
||||
chat_id=str(chat_id),
|
||||
chat_id=str_chat_id,
|
||||
content=content,
|
||||
media=media_paths,
|
||||
metadata={
|
||||
@@ -287,6 +363,29 @@ class TelegramChannel(BaseChannel):
|
||||
}
|
||||
)
|
||||
|
||||
def _start_typing(self, chat_id: str) -> None:
|
||||
"""Start sending 'typing...' indicator for a chat."""
|
||||
# Cancel any existing typing task for this chat
|
||||
self._stop_typing(chat_id)
|
||||
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
|
||||
|
||||
def _stop_typing(self, chat_id: str) -> None:
|
||||
"""Stop the typing indicator for a chat."""
|
||||
task = self._typing_tasks.pop(chat_id, None)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
|
||||
async def _typing_loop(self, chat_id: str) -> None:
|
||||
"""Repeatedly send 'typing' action until cancelled."""
|
||||
try:
|
||||
while self._app:
|
||||
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
|
||||
await asyncio.sleep(4)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
|
||||
|
||||
def _get_extension(self, media_type: str, mime_type: str | None) -> str:
|
||||
"""Get file extension based on media type."""
|
||||
if mime_type:
|
||||
|
||||
Reference in New Issue
Block a user