Merge pull request #1175 from Raven95676/telegram

feat: 自动注册指令到Telegram
This commit is contained in:
Soulter
2025-04-10 20:26:54 +08:00
committed by GitHub
@@ -1,26 +1,31 @@
import asyncio
import sys
import uuid
import asyncio
import astrbot.api.message_components as Comp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from telegram import BotCommand, Update
from telegram.constants import ChatType
from telegram.ext import ApplicationBuilder, ContextTypes, ExtBot, filters
from telegram.ext import MessageHandler as TelegramMessageHandler
import astrbot.api.message_components as Comp
from astrbot.api import logger
from astrbot.api.event import MessageChain
from astrbot.api.platform import (
Platform,
AstrBotMessage,
MessageMember,
PlatformMetadata,
MessageType,
Platform,
PlatformMetadata,
register_platform_adapter,
)
from astrbot.api.event import MessageChain
from astrbot.core.platform.astr_message_event import MessageSesion
from astrbot.api.platform import register_platform_adapter
from astrbot.core.star.filter.command import CommandFilter
from astrbot.core.star.filter.command_group import CommandGroupFilter
from astrbot.core.star.star import star_map
from astrbot.core.star.star_handler import star_handlers_registry
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, filters
from telegram.constants import ChatType
from telegram.ext import MessageHandler as TelegramMessageHandler
from .tg_event import TelegramPlatformEvent
from astrbot.api import logger
from telegram.ext import ExtBot
if sys.version_info >= (3, 12):
from typing import override
@@ -67,6 +72,8 @@ class TelegramPlatformAdapter(Platform):
self.client = self.application.bot
logger.debug(f"Telegram base url: {self.client.base_url}")
self.scheduler = AsyncIOScheduler()
@override
async def send_by_session(
self, session: MessageSesion, message_chain: MessageChain
@@ -87,10 +94,87 @@ class TelegramPlatformAdapter(Platform):
async def run(self):
await self.application.initialize()
await self.application.start()
await self.register_commands()
# TODO 使用更优雅的方式重新注册命令
self.scheduler.add_job(
self.register_commands,
"interval",
minutes=5,
id="telegram_command_register",
misfire_grace_time=60,
)
self.scheduler.start()
queue = self.application.updater.start_polling()
logger.info("Telegram Platform Adapter is running.")
await queue
async def register_commands(self):
"""收集所有注册的指令并注册到 Telegram"""
try:
await self.client.delete_my_commands()
commands = self.collect_commands()
if commands:
await self.client.set_my_commands(commands)
for cmd in commands:
logger.debug(f"已注册指令: /{cmd.command} - {cmd.description}")
except Exception as e:
logger.error(f"向 Telegram 注册指令时发生错误: {e!s}")
def collect_commands(self) -> list[BotCommand]:
"""从注册的处理器中收集所有指令"""
command_dict = {}
skip_commands = {"start"}
for handler_md in star_handlers_registry._handlers:
handler_metadata = handler_md[1]
if not star_map[handler_metadata.handler_module_path].activated:
continue
for event_filter in handler_metadata.event_filters:
cmd_info = self._extract_command_info(
event_filter, handler_metadata, skip_commands
)
if cmd_info:
cmd_name, description = cmd_info
command_dict.setdefault(cmd_name, description)
commands_a = sorted(command_dict.keys())
return [BotCommand(cmd, command_dict[cmd]) for cmd in commands_a]
@staticmethod
def _extract_command_info(
event_filter, handler_metadata, skip_commands: set
) -> tuple[str, str] | None:
"""从事件过滤器中提取指令信息"""
cmd_name = None
is_group = False
if isinstance(event_filter, CommandFilter) and event_filter.command_name:
if (
event_filter.parent_command_names
and event_filter.parent_command_names != [""]
):
return None
cmd_name = event_filter.command_name
elif isinstance(event_filter, CommandGroupFilter):
if event_filter.parent_group:
return None
cmd_name = event_filter.group_name
is_group = True
if not cmd_name or cmd_name in skip_commands:
return None
# Build description.
description = handler_metadata.desc or (
f"指令组: {cmd_name} (包含多个子指令)" if is_group else f"指令: {cmd_name}"
)
if len(description) > 30:
description = description[:30] + "..."
return cmd_name, description
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(
chat_id=update.effective_chat.id, text=self.config["start_message"]
@@ -162,6 +246,16 @@ class TelegramPlatformAdapter(Platform):
# 处理文本消息
plain_text = update.message.text
# 群聊场景命令特殊处理
if plain_text.startswith("/"):
command_parts = plain_text.split(" ", 1)
if "@" in command_parts[0]:
command, bot_name = command_parts[0].split("@")
if bot_name == self.client.username:
plain_text = command + (
f" {command_parts[1]}" if len(command_parts) > 1 else ""
)
if update.message.entities:
for entity in update.message.entities:
if entity.type == "mention":
@@ -241,7 +335,11 @@ class TelegramPlatformAdapter(Platform):
async def terminate(self):
try:
if self.scheduler.running:
self.scheduler.shutdown()
await self.application.stop()
await self.client.delete_my_commands()
# 保险起见先判断是否存在updater对象
if self.application.updater is not None: