feat: add send_chat_action for Telegram platform adapter (#5037)

* feat: add send_chat_action for Telegram platform adapter

Add typing/upload indicator when sending messages via Telegram.
- Added _send_chat_action helper method for sending chat actions
- Send appropriate action (typing, upload_photo, upload_document, upload_voice)
  before sending different message types
- Support streaming mode with typing indicator
- Support supergroup with message_thread_id

* refactor(telegram): extract chat action helpers and add throttling

- Add ACTION_BY_TYPE mapping for message type to action priority
- Add _get_chat_action_for_chain() to determine action from message chain
- Add _send_media_with_action() for upload → send → restore typing pattern
- Add _ensure_typing() helper for typing status
- Add chat action throttling (0.5s) in streaming mode to avoid rate limits
- Update type annotation to ChatAction | str for better static checking

* feat(telegram): implement send_typing method for Telegram platform

---------

Co-authored-by: Soulter <905617992@qq.com>
This commit is contained in:
evpeople
2026-02-12 14:46:06 +08:00
committed by GitHub
parent e3cb9eb8af
commit 30a0098b2a
3 changed files with 125 additions and 5 deletions
@@ -149,6 +149,7 @@ class InternalAgentSubStage(Stage):
logger.debug("ready to request llm provider")
await event.send_typing()
await call_event_hook(event, EventType.OnWaitingLLMRequestEvent)
async with session_lock_manager.acquire_lock(event.unified_msg_origin):
@@ -244,6 +244,12 @@ class AstrMessageEvent(abc.ABC):
)
self._has_send_oper = True
async def send_typing(self) -> None:
"""发送输入中状态。
默认实现为空,由具体平台按需重写。
"""
async def _pre_send(self) -> None:
"""调度器会在执行 send() 前调用该方法 deprecated in v3.5.18"""
@@ -5,6 +5,7 @@ from typing import Any, cast
import telegramify_markdown
from telegram import ReactionTypeCustomEmoji, ReactionTypeEmoji
from telegram.constants import ChatAction
from telegram.ext import ExtBot
from astrbot import logger
@@ -31,6 +32,14 @@ class TelegramPlatformEvent(AstrMessageEvent):
"word": re.compile(r"\s"),
}
# 消息类型到 chat action 的映射,用于优先级判断
ACTION_BY_TYPE: dict[type, str] = {
Record: ChatAction.UPLOAD_VOICE,
File: ChatAction.UPLOAD_DOCUMENT,
Image: ChatAction.UPLOAD_PHOTO,
Plain: ChatAction.TYPING,
}
def __init__(
self,
message_str: str,
@@ -67,6 +76,71 @@ class TelegramPlatformEvent(AstrMessageEvent):
return chunks
@classmethod
async def _send_chat_action(
cls,
client: ExtBot,
chat_id: str,
action: ChatAction | str,
message_thread_id: str | None = None,
) -> None:
"""发送聊天状态动作"""
try:
payload: dict[str, Any] = {"chat_id": chat_id, "action": action}
if message_thread_id:
payload["message_thread_id"] = message_thread_id
await client.send_chat_action(**payload)
except Exception as e:
logger.warning(f"[Telegram] 发送 chat action 失败: {e}")
@classmethod
def _get_chat_action_for_chain(cls, chain: list[Any]) -> ChatAction | str:
"""根据消息链中的组件类型确定合适的 chat action(按优先级)"""
for seg_type, action in cls.ACTION_BY_TYPE.items():
if any(isinstance(seg, seg_type) for seg in chain):
return action
return ChatAction.TYPING
@classmethod
async def _send_media_with_action(
cls,
client: ExtBot,
upload_action: ChatAction | str,
send_coro,
*,
user_name: str,
message_thread_id: str | None = None,
**payload: Any,
) -> None:
"""发送媒体时显示 upload action,发送完成后恢复 typing"""
await cls._send_chat_action(client, user_name, upload_action, message_thread_id)
await send_coro(**payload)
await cls._send_chat_action(
client, user_name, ChatAction.TYPING, message_thread_id
)
async def _ensure_typing(
self,
user_name: str,
message_thread_id: str | None = None,
) -> None:
"""确保显示 typing 状态"""
await self._send_chat_action(
self.client, user_name, ChatAction.TYPING, message_thread_id
)
async def send_typing(self) -> None:
message_thread_id = None
if self.get_message_type() == MessageType.GROUP_MESSAGE:
user_name = self.message_obj.group_id
else:
user_name = self.get_sender_id()
if "#" in user_name:
user_name, message_thread_id = user_name.split("#")
await self._ensure_typing(user_name, message_thread_id)
@classmethod
async def send_with_client(
cls,
@@ -91,6 +165,11 @@ class TelegramPlatformEvent(AstrMessageEvent):
if "#" in user_name:
# it's a supergroup chat with message_thread_id
user_name, message_thread_id = user_name.split("#")
# 根据消息链确定合适的 chat action 并发送
action = cls._get_chat_action_for_chain(message.chain)
await cls._send_chat_action(client, user_name, action, message_thread_id)
for i in message.chain:
payload = {
"chat_id": user_name,
@@ -195,6 +274,12 @@ class TelegramPlatformEvent(AstrMessageEvent):
message_id = None
last_edit_time = 0 # 上次编辑消息的时间
throttle_interval = 0.6 # 编辑消息的间隔时间 (秒)
last_chat_action_time = 0 # 上次发送 chat action 的时间
chat_action_interval = 0.5 # chat action 的节流间隔 (秒)
# 发送初始 typing 状态
await self._ensure_typing(user_name, message_thread_id)
last_chat_action_time = asyncio.get_event_loop().time()
async for chain in generator:
if isinstance(chain, MessageChain):
@@ -219,15 +304,25 @@ class TelegramPlatformEvent(AstrMessageEvent):
delta += i.text
elif isinstance(i, Image):
image_path = await i.convert_to_file_path()
await self.client.send_photo(
photo=image_path, **cast(Any, payload)
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_PHOTO,
self.client.send_photo,
user_name=user_name,
message_thread_id=message_thread_id,
photo=image_path,
**cast(Any, payload),
)
continue
elif isinstance(i, File):
path = await i.get_file()
name = i.name or os.path.basename(path)
await self.client.send_document(
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_DOCUMENT,
self.client.send_document,
user_name=user_name,
message_thread_id=message_thread_id,
document=path,
filename=name,
**cast(Any, payload),
@@ -235,7 +330,15 @@ class TelegramPlatformEvent(AstrMessageEvent):
continue
elif isinstance(i, Record):
path = await i.convert_to_file_path()
await self.client.send_voice(voice=path, **cast(Any, payload))
await self._send_media_with_action(
self.client,
ChatAction.UPLOAD_VOICE,
self.client.send_voice,
user_name=user_name,
message_thread_id=message_thread_id,
voice=path,
**cast(Any, payload),
)
continue
else:
logger.warning(f"不支持的消息类型: {type(i)}")
@@ -248,6 +351,11 @@ class TelegramPlatformEvent(AstrMessageEvent):
# 如果距离上次编辑的时间 >= 设定的间隔,等待一段时间
if time_since_last_edit >= throttle_interval:
# 发送 typing 状态(带节流)
current_time = asyncio.get_event_loop().time()
if current_time - last_chat_action_time >= chat_action_interval:
await self._ensure_typing(user_name, message_thread_id)
last_chat_action_time = current_time
# 编辑消息
try:
await self.client.edit_message_text(
@@ -263,6 +371,11 @@ class TelegramPlatformEvent(AstrMessageEvent):
) # 更新上次编辑的时间
else:
# delta 长度一般不会大于 4096,因此这里直接发送
# 发送 typing 状态(带节流)
current_time = asyncio.get_event_loop().time()
if current_time - last_chat_action_time >= chat_action_interval:
await self._ensure_typing(user_name, message_thread_id)
last_chat_action_time = current_time
try:
msg = await self.client.send_message(
text=delta, **cast(Any, payload)