From 30a0098b2a443278ef14834635394497b998ece3 Mon Sep 17 00:00:00 2001 From: evpeople <54983536+evpeople@users.noreply.github.com> Date: Thu, 12 Feb 2026 14:46:06 +0800 Subject: [PATCH] feat: add send_chat_action for Telegram platform adapter (#5037) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add send_chat_action for Telegram platform adapter Add typing/upload indicator when sending messages via Telegram. - Added _send_chat_action helper method for sending chat actions - Send appropriate action (typing, upload_photo, upload_document, upload_voice) before sending different message types - Support streaming mode with typing indicator - Support supergroup with message_thread_id * refactor(telegram): extract chat action helpers and add throttling - Add ACTION_BY_TYPE mapping for message type to action priority - Add _get_chat_action_for_chain() to determine action from message chain - Add _send_media_with_action() for upload → send → restore typing pattern - Add _ensure_typing() helper for typing status - Add chat action throttling (0.5s) in streaming mode to avoid rate limits - Update type annotation to ChatAction | str for better static checking * feat(telegram): implement send_typing method for Telegram platform --------- Co-authored-by: Soulter <905617992@qq.com> --- .../method/agent_sub_stages/internal.py | 1 + astrbot/core/platform/astr_message_event.py | 6 + .../platform/sources/telegram/tg_event.py | 123 +++++++++++++++++- 3 files changed, 125 insertions(+), 5 deletions(-) diff --git a/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py b/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py index 0abee033d..db61ce0ec 100644 --- a/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py +++ b/astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py @@ -149,6 +149,7 @@ class InternalAgentSubStage(Stage): logger.debug("ready to request llm provider") + await event.send_typing() await call_event_hook(event, EventType.OnWaitingLLMRequestEvent) async with session_lock_manager.acquire_lock(event.unified_msg_origin): diff --git a/astrbot/core/platform/astr_message_event.py b/astrbot/core/platform/astr_message_event.py index 83b9813e0..4cd531c53 100644 --- a/astrbot/core/platform/astr_message_event.py +++ b/astrbot/core/platform/astr_message_event.py @@ -244,6 +244,12 @@ class AstrMessageEvent(abc.ABC): ) self._has_send_oper = True + async def send_typing(self) -> None: + """发送输入中状态。 + + 默认实现为空,由具体平台按需重写。 + """ + async def _pre_send(self) -> None: """调度器会在执行 send() 前调用该方法 deprecated in v3.5.18""" diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index 1df289d83..ffa58e1a8 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -5,6 +5,7 @@ from typing import Any, cast import telegramify_markdown from telegram import ReactionTypeCustomEmoji, ReactionTypeEmoji +from telegram.constants import ChatAction from telegram.ext import ExtBot from astrbot import logger @@ -31,6 +32,14 @@ class TelegramPlatformEvent(AstrMessageEvent): "word": re.compile(r"\s"), } + # 消息类型到 chat action 的映射,用于优先级判断 + ACTION_BY_TYPE: dict[type, str] = { + Record: ChatAction.UPLOAD_VOICE, + File: ChatAction.UPLOAD_DOCUMENT, + Image: ChatAction.UPLOAD_PHOTO, + Plain: ChatAction.TYPING, + } + def __init__( self, message_str: str, @@ -67,6 +76,71 @@ class TelegramPlatformEvent(AstrMessageEvent): return chunks + @classmethod + async def _send_chat_action( + cls, + client: ExtBot, + chat_id: str, + action: ChatAction | str, + message_thread_id: str | None = None, + ) -> None: + """发送聊天状态动作""" + try: + payload: dict[str, Any] = {"chat_id": chat_id, "action": action} + if message_thread_id: + payload["message_thread_id"] = message_thread_id + await client.send_chat_action(**payload) + except Exception as e: + logger.warning(f"[Telegram] 发送 chat action 失败: {e}") + + @classmethod + def _get_chat_action_for_chain(cls, chain: list[Any]) -> ChatAction | str: + """根据消息链中的组件类型确定合适的 chat action(按优先级)""" + for seg_type, action in cls.ACTION_BY_TYPE.items(): + if any(isinstance(seg, seg_type) for seg in chain): + return action + return ChatAction.TYPING + + @classmethod + async def _send_media_with_action( + cls, + client: ExtBot, + upload_action: ChatAction | str, + send_coro, + *, + user_name: str, + message_thread_id: str | None = None, + **payload: Any, + ) -> None: + """发送媒体时显示 upload action,发送完成后恢复 typing""" + await cls._send_chat_action(client, user_name, upload_action, message_thread_id) + await send_coro(**payload) + await cls._send_chat_action( + client, user_name, ChatAction.TYPING, message_thread_id + ) + + async def _ensure_typing( + self, + user_name: str, + message_thread_id: str | None = None, + ) -> None: + """确保显示 typing 状态""" + await self._send_chat_action( + self.client, user_name, ChatAction.TYPING, message_thread_id + ) + + async def send_typing(self) -> None: + message_thread_id = None + if self.get_message_type() == MessageType.GROUP_MESSAGE: + user_name = self.message_obj.group_id + else: + user_name = self.get_sender_id() + + if "#" in user_name: + user_name, message_thread_id = user_name.split("#") + + await self._ensure_typing(user_name, message_thread_id) + @classmethod async def send_with_client( cls, @@ -91,6 +165,11 @@ class TelegramPlatformEvent(AstrMessageEvent): if "#" in user_name: # it's a supergroup chat with message_thread_id user_name, message_thread_id = user_name.split("#") + + # 根据消息链确定合适的 chat action 并发送 + action = cls._get_chat_action_for_chain(message.chain) + await cls._send_chat_action(client, user_name, action, message_thread_id) + for i in message.chain: payload = { "chat_id": user_name, @@ -195,6 +274,12 @@ class TelegramPlatformEvent(AstrMessageEvent): message_id = None last_edit_time = 0 # 上次编辑消息的时间 throttle_interval = 0.6 # 编辑消息的间隔时间 (秒) + last_chat_action_time = 0 # 上次发送 chat action 的时间 + chat_action_interval = 0.5 # chat action 的节流间隔 (秒) + + # 发送初始 typing 状态 + await self._ensure_typing(user_name, message_thread_id) + last_chat_action_time = asyncio.get_event_loop().time() async for chain in generator: if isinstance(chain, MessageChain): @@ -219,15 +304,25 @@ class TelegramPlatformEvent(AstrMessageEvent): delta += i.text elif isinstance(i, Image): image_path = await i.convert_to_file_path() - await self.client.send_photo( - photo=image_path, **cast(Any, payload) + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_PHOTO, + self.client.send_photo, + user_name=user_name, + message_thread_id=message_thread_id, + photo=image_path, + **cast(Any, payload), ) continue elif isinstance(i, File): path = await i.get_file() name = i.name or os.path.basename(path) - - await self.client.send_document( + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_DOCUMENT, + self.client.send_document, + user_name=user_name, + message_thread_id=message_thread_id, document=path, filename=name, **cast(Any, payload), @@ -235,7 +330,15 @@ class TelegramPlatformEvent(AstrMessageEvent): continue elif isinstance(i, Record): path = await i.convert_to_file_path() - await self.client.send_voice(voice=path, **cast(Any, payload)) + await self._send_media_with_action( + self.client, + ChatAction.UPLOAD_VOICE, + self.client.send_voice, + user_name=user_name, + message_thread_id=message_thread_id, + voice=path, + **cast(Any, payload), + ) continue else: logger.warning(f"不支持的消息类型: {type(i)}") @@ -248,6 +351,11 @@ class TelegramPlatformEvent(AstrMessageEvent): # 如果距离上次编辑的时间 >= 设定的间隔,等待一段时间 if time_since_last_edit >= throttle_interval: + # 发送 typing 状态(带节流) + current_time = asyncio.get_event_loop().time() + if current_time - last_chat_action_time >= chat_action_interval: + await self._ensure_typing(user_name, message_thread_id) + last_chat_action_time = current_time # 编辑消息 try: await self.client.edit_message_text( @@ -263,6 +371,11 @@ class TelegramPlatformEvent(AstrMessageEvent): ) # 更新上次编辑的时间 else: # delta 长度一般不会大于 4096,因此这里直接发送 + # 发送 typing 状态(带节流) + current_time = asyncio.get_event_loop().time() + if current_time - last_chat_action_time >= chat_action_interval: + await self._ensure_typing(user_name, message_thread_id) + last_chat_action_time = current_time try: msg = await self.client.send_message( text=delta, **cast(Any, payload)