feat: supports webhook mode for Lark platform (#4016)

* feat: add Lark platform support with unified webhook configuration

* fix: update token verification logic in LarkWebhookServer

* feat: implement event deduplication and cleanup for Lark webhook events
This commit is contained in:
Soulter
2025-12-12 22:12:13 +08:00
committed by GitHub
parent 3b81fb4985
commit 8a0b7717cc
15 changed files with 378 additions and 28 deletions
+27
View File
@@ -13,6 +13,7 @@ WEBHOOK_SUPPORTED_PLATFORMS = [
"wecom",
"wecom_ai_bot",
"slack",
"lark",
]
# 默认配置
@@ -277,6 +278,10 @@ CONFIG_METADATA_2 = {
"app_id": "",
"app_secret": "",
"domain": "https://open.feishu.cn",
"lark_connection_mode": "socket", # webhook, socket
"webhook_uuid": "",
"lark_encrypt_key": "",
"lark_verification_token": "",
},
"钉钉(DingTalk)": {
"id": "dingtalk",
@@ -370,6 +375,28 @@ CONFIG_METADATA_2 = {
# "type": "string",
# "options": ["fullscreen", "embedded"],
# },
"lark_connection_mode": {
"description": "订阅方式",
"type": "string",
"options": ["socket", "webhook"],
"labels": ["长连接模式", "推送至服务器模式"],
},
"lark_encrypt_key": {
"description": "Encrypt Key",
"type": "string",
"hint": "用于解密飞书回调数据的加密密钥",
"condition": {
"lark_connection_mode": "webhook",
},
},
"lark_verification_token": {
"description": "Verification Token",
"type": "string",
"hint": "用于验证飞书回调请求的令牌",
"condition": {
"lark_connection_mode": "webhook",
},
},
"is_sandbox": {
"description": "沙箱模式",
"type": "bool",
+4
View File
@@ -5,6 +5,7 @@ from asyncio import Queue
from astrbot.core import logger
from astrbot.core.config.astrbot_config import AstrBotConfig
from astrbot.core.star.star_handler import EventType, star_handlers_registry, star_map
from astrbot.core.utils.webhook_utils import ensure_platform_webhook_config
from .platform import Platform, PlatformStatus
from .register import platform_cls_map
@@ -18,6 +19,7 @@ class PlatformManager:
self._inst_map: dict[str, dict] = {}
self.astrbot_config = config
self.platforms_config = config["platform"]
self.settings = config["platform_settings"]
"""NOTE: 这里是 default 的配置文件,以保证最大的兼容性;
@@ -29,6 +31,8 @@ class PlatformManager:
"""初始化所有平台适配器"""
for platform in self.platforms_config:
try:
if ensure_platform_webhook_config(platform):
self.astrbot_config.save_config()
await self.load_platform(platform)
except Exception as e:
logger.error(f"初始化 {platform} 平台适配器失败: {e}")
+8
View File
@@ -80,6 +80,13 @@ class Platform(abc.ABC):
if self._status == PlatformStatus.ERROR:
self._status = PlatformStatus.RUNNING
def unified_webhook(self) -> bool:
"""是否正在使用统一 Webhook 模式"""
return bool(
self.config.get("unified_webhook_mode", False)
and self.config.get("webhook_uuid")
)
def get_stats(self) -> dict:
"""获取平台统计信息"""
meta = self.meta()
@@ -97,6 +104,7 @@ class Platform(abc.ABC):
}
if self.last_error
else None,
"unified_webhook": self.unified_webhook(),
}
@abc.abstractmethod
@@ -421,7 +421,7 @@ class AiocqhttpAdapter(Platform):
async def shutdown_trigger_placeholder(self):
await self.shutdown_event.wait()
logger.info("aiocqhttp 适配器已被优雅地关闭")
logger.info("aiocqhttp 适配器已被关闭")
def meta(self) -> PlatformMetadata:
return self.metadata
@@ -245,7 +245,7 @@ class DingtalkPlatformAdapter(Platform):
task.result()
except Exception as e:
if "Graceful shutdown" in str(e):
logger.info("钉钉适配器已被优雅地关闭")
logger.info("钉钉适配器已被关闭")
return
logger.error(f"钉钉机器人启动失败: {e}")
@@ -4,7 +4,7 @@ import json
import re
import time
import uuid
from typing import cast
from typing import Any, cast
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
@@ -12,6 +12,7 @@ from lark_oapi.api.im.v1 import (
CreateMessageRequestBody,
GetMessageResourceRequest,
)
from lark_oapi.api.im.v1.processor import P2ImMessageReceiveV1Processor
import astrbot.api.message_components as Comp
from astrbot import logger
@@ -24,9 +25,11 @@ from astrbot.api.platform import (
PlatformMetadata,
)
from astrbot.core.platform.astr_message_event import MessageSesion
from astrbot.core.utils.webhook_utils import log_webhook_info
from ...register import register_platform_adapter
from .lark_event import LarkMessageEvent
from .server import LarkWebhookServer
@register_platform_adapter(
@@ -48,9 +51,13 @@ class LarkPlatformAdapter(Platform):
self.domain = platform_config.get("domain", lark.FEISHU_DOMAIN)
self.bot_name = platform_config.get("lark_bot_name", "astrbot")
# socket or webhook
self.connection_mode = platform_config.get("lark_connection_mode", "socket")
if not self.bot_name:
logger.warning("未设置飞书机器人名称,@ 机器人可能得不到回复。")
# 初始化 WebSocket 长连接相关配置
async def on_msg_event_recv(event: lark.im.v1.P2ImMessageReceiveV1):
await self.convert_msg(event)
@@ -63,6 +70,8 @@ class LarkPlatformAdapter(Platform):
.build()
)
self.do_v2_msg_event = do_v2_msg_event
self.client = lark.ws.Client(
app_id=self.appid,
app_secret=self.appsecret,
@@ -75,6 +84,39 @@ class LarkPlatformAdapter(Platform):
lark.Client.builder().app_id(self.appid).app_secret(self.appsecret).build()
)
self.webhook_server = None
if self.connection_mode == "webhook":
self.webhook_server = LarkWebhookServer(platform_config, event_queue)
self.webhook_server.set_callback(self.handle_webhook_event)
self.event_id_timestamps: dict[str, float] = {}
def _clean_expired_events(self):
"""清理超过 30 分钟的事件记录"""
current_time = time.time()
expired_keys = [
event_id
for event_id, timestamp in self.event_id_timestamps.items()
if current_time - timestamp > 1800
]
for event_id in expired_keys:
del self.event_id_timestamps[event_id]
def _is_duplicate_event(self, event_id: str) -> bool:
"""检查事件是否重复
Args:
event_id: 事件ID
Returns:
True 表示重复事件,False 表示新事件
"""
self._clean_expired_events()
if event_id in self.event_id_timestamps:
return True
self.event_id_timestamps[event_id] = time.time()
return False
async def send_by_session(
self,
session: MessageSesion,
@@ -295,13 +337,61 @@ class LarkPlatformAdapter(Platform):
self._event_queue.put_nowait(event)
async def handle_webhook_event(self, event_data: dict):
"""处理 Webhook 事件
Args:
event_data: Webhook 事件数据
"""
try:
header = event_data.get("header", {})
event_id = header.get("event_id", "")
if event_id and self._is_duplicate_event(event_id):
logger.debug(f"[Lark Webhook] 跳过重复事件: {event_id}")
return
event_type = header.get("event_type", "")
if event_type == "im.message.receive_v1":
processor = P2ImMessageReceiveV1Processor(self.do_v2_msg_event)
data = (processor.type())(event_data)
processor.do(data)
else:
logger.debug(f"[Lark Webhook] 未处理的事件类型: {event_type}")
except Exception as e:
logger.error(f"[Lark Webhook] 处理事件失败: {e}", exc_info=True)
async def run(self):
# self.client.start()
await self.client._connect()
if self.connection_mode == "webhook":
# Webhook 模式
if self.webhook_server is None:
logger.error("[Lark] Webhook 模式已启用,但 webhook_server 未初始化")
return
webhook_uuid = self.config.get("webhook_uuid")
if webhook_uuid:
log_webhook_info(f"{self.meta().id}(飞书 Webhook)", webhook_uuid)
else:
logger.warning("[Lark] Webhook 模式已启用,但未配置 webhook_uuid")
else:
# 长连接模式
await self.client._connect()
async def webhook_callback(self, request: Any) -> Any:
"""统一 Webhook 回调入口"""
if not self.webhook_server:
return {"error": "Webhook server not initialized"}, 500
return await self.webhook_server.handle_callback(request)
async def terminate(self):
await self.client._disconnect()
logger.info("飞书(Lark) 适配器已被优雅地关闭")
if self.connection_mode == "socket":
await self.client._disconnect()
logger.info("飞书(Lark) 适配器已关闭")
def get_client(self) -> lark.ws.Client:
return self.client
def unified_webhook(self) -> bool:
return bool(
self.config.get("lark_connection_mode", "") == "webhook"
and self.config.get("webhook_uuid")
)
@@ -0,0 +1,206 @@
"""飞书(Lark) Webhook 服务器实现
实现飞书事件订阅的 Webhook 模式,支持:
1. 请求 URL 验证 (challenge 验证)
2. 事件加密/解密 (AES-256-CBC)
3. 签名校验 (SHA256)
4. 事件接收和处理
"""
import asyncio
import base64
import hashlib
import json
from collections.abc import Awaitable, Callable
from Crypto.Cipher import AES
from astrbot.api import logger
class AESCipher:
"""AES 加密/解密工具类"""
def __init__(self, key: str):
self.bs = AES.block_size
self.key = hashlib.sha256(self.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode("utf8"))
if isinstance(data, u_type):
return data.encode("utf8")
return data
@staticmethod
def _unpad(s):
return s[: -ord(s[len(s) - 1 :])]
def decrypt(self, enc):
iv = enc[: AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size :]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode("utf8")
class LarkWebhookServer:
"""飞书 Webhook 服务器
仅支持统一 Webhook 模式
"""
def __init__(self, config: dict, event_queue: asyncio.Queue):
"""初始化 Webhook 服务器
Args:
config: 飞书配置
event_queue: 事件队列
"""
self.app_id = config["app_id"]
self.app_secret = config["app_secret"]
self.encrypt_key = config.get("lark_encrypt_key", "")
self.verification_token = config.get("lark_verification_token", "")
self.event_queue = event_queue
self.callback: Callable[[dict], Awaitable[None]] | None = None
# 初始化加密工具
self.cipher = None
if self.encrypt_key:
self.cipher = AESCipher(self.encrypt_key)
def verify_signature(
self,
timestamp: str,
nonce: str,
encrypt_key: str,
body: bytes,
signature: str,
) -> bool:
"""验证签名
Args:
timestamp: 请求时间戳
nonce: 随机数
encrypt_key: 加密密钥
body: 请求体
signature: 签名
Returns:
签名是否有效
"""
# 拼接字符串: timestamp + nonce + encrypt_key + body
bytes_b1 = (timestamp + nonce + encrypt_key).encode("utf-8")
bytes_b = bytes_b1 + body
h = hashlib.sha256(bytes_b)
calculated_signature = h.hexdigest()
return calculated_signature == signature
def decrypt_event(self, encrypted_data: str) -> dict:
"""解密事件数据
Args:
encrypted_data: 加密的事件数据
Returns:
解密后的事件字典
"""
if not self.cipher:
raise ValueError("未配置 encrypt_key,无法解密事件")
decrypted_str = self.cipher.decrypt_string(encrypted_data)
return json.loads(decrypted_str)
async def handle_challenge(self, event_data: dict) -> dict:
"""处理 challenge 验证请求
Args:
event_data: 事件数据
Returns:
包含 challenge 的响应
"""
challenge = event_data.get("challenge", "")
logger.info(f"[Lark Webhook] 收到 challenge 验证请求: {challenge}")
return {"challenge": challenge}
async def handle_callback(self, request) -> tuple[dict, int] | dict:
"""处理 webhook 回调,可被统一 webhook 入口复用
Args:
request: Quart 请求对象
Returns:
响应数据
"""
# 获取原始请求体
body = await request.get_data()
try:
event_data = await request.json
except Exception as e:
logger.error(f"[Lark Webhook] 解析请求体失败: {e}")
return {"error": "Invalid JSON"}, 400
if not event_data:
logger.error("[Lark Webhook] 请求体为空")
return {"error": "Empty request body"}, 400
# 如果配置了 encrypt_key,进行签名验证
if self.encrypt_key:
timestamp = request.headers.get("X-Lark-Request-Timestamp", "")
nonce = request.headers.get("X-Lark-Request-Nonce", "")
signature = request.headers.get("X-Lark-Signature", "")
if timestamp and nonce and signature:
if not self.verify_signature(
timestamp, nonce, self.encrypt_key, body, signature
):
logger.error("[Lark Webhook] 签名验证失败")
return {"error": "Invalid signature"}, 401
# 检查是否是加密事件
if "encrypt" in event_data:
try:
event_data = self.decrypt_event(event_data["encrypt"])
logger.debug(f"[Lark Webhook] 解密后的事件: {event_data}")
except Exception as e:
logger.error(f"[Lark Webhook] 解密事件失败: {e}")
return {"error": "Decryption failed"}, 400
# 验证 token
if self.verification_token:
header = event_data.get("header", {})
if header:
token = header.get("token", "")
else:
token = event_data.get("token", "")
if token != self.verification_token:
logger.error("[Lark Webhook] Verification Token 不匹配。")
return {"error": "Invalid verification token"}, 401
# 处理 URL 验证 (challenge)
if event_data.get("type") == "url_verification":
return await self.handle_challenge(event_data)
# 调用回调函数处理事件
if self.callback:
try:
await self.callback(event_data)
except Exception as e:
logger.error(f"[Lark Webhook] 处理事件回调失败: {e}", exc_info=True)
return {"error": "Event processing failed"}, 500
return {}
def set_callback(self, callback: Callable[[dict], Awaitable[None]]):
"""设置事件回调函数
Args:
callback: 处理事件的异步函数
"""
self.callback = callback
@@ -409,7 +409,7 @@ class SlackAdapter(Platform):
await self.socket_client.stop()
if self.webhook_client:
await self.webhook_client.stop()
logger.info("Slack 适配器已被优雅地关闭")
logger.info("Slack 适配器已被关闭")
def meta(self) -> PlatformMetadata:
return self.metadata
@@ -427,3 +427,10 @@ class SlackAdapter(Platform):
def get_client(self):
return self.web_client
def unified_webhook(self) -> bool:
return bool(
self.config.get("unified_webhook_mode", False)
and self.config.get("slack_connection_mode", "") == "webhook"
and self.config.get("webhook_uuid")
)
@@ -424,6 +424,6 @@ class TelegramPlatformAdapter(Platform):
if self.application.updater is not None:
await self.application.updater.stop()
logger.info("Telegram 适配器已被优雅地关闭")
logger.info("Telegram 适配器已被关闭")
except Exception as e:
logger.error(f"Telegram 适配器关闭时出错: {e}")
@@ -422,4 +422,4 @@ class WecomPlatformAdapter(Platform):
await self.server.server.shutdown()
except Exception as _:
pass
logger.info("企业微信 适配器已被优雅地关闭")
logger.info("企业微信 适配器已被关闭")
@@ -349,4 +349,4 @@ class WeixinOfficialAccountPlatformAdapter(Platform):
await self.server.server.shutdown()
except Exception as _:
pass
logger.info("微信公众平台 适配器已被优雅地关闭")
logger.info("微信公众平台 适配器已被关闭")
+19
View File
@@ -1,4 +1,7 @@
import uuid
from astrbot.core import astrbot_config, logger
from astrbot.core.config.default import WEBHOOK_SUPPORTED_PLATFORMS
def _get_callback_api_base() -> str:
@@ -45,3 +48,19 @@ def log_webhook_info(platform_name: str, webhook_uuid: str):
"====================\n"
)
logger.info(display_log)
def ensure_platform_webhook_config(platform_cfg: dict) -> bool:
"""为支持统一 webhook 的平台自动生成 webhook_uuid
Args:
platform_cfg (dict): 平台配置字典
Returns:
bool: 如果生成了 webhook_uuid 则返回 True否则返回 False
"""
pt = platform_cfg.get("type", "")
if pt in WEBHOOK_SUPPORTED_PLATFORMS and not platform_cfg.get("webhook_uuid"):
platform_cfg["webhook_uuid"] = uuid.uuid4().hex[:16]
return True
return False
+4 -15
View File
@@ -2,7 +2,6 @@ import asyncio
import inspect
import os
import traceback
import uuid
from typing import Any
from quart import request
@@ -15,7 +14,6 @@ from astrbot.core.config.default import (
CONFIG_METADATA_3_SYSTEM,
DEFAULT_CONFIG,
DEFAULT_VALUE_MAP,
WEBHOOK_SUPPORTED_PLATFORMS,
)
from astrbot.core.config.i18n_utils import ConfigMetadataI18n
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
@@ -23,6 +21,7 @@ from astrbot.core.platform.register import platform_cls_map, platform_registry
from astrbot.core.provider import Provider
from astrbot.core.provider.register import provider_registry
from astrbot.core.star.star import star_registry
from astrbot.core.utils.webhook_utils import ensure_platform_webhook_config
from .route import Response, Route, RouteContext
@@ -559,13 +558,8 @@ class ConfigRoute(Route):
async def post_new_platform(self):
new_platform_config = await request.json
# 如果是支持统一 webhook 模式的平台,且启用了统一 webhook 模式,自动生成 webhook_uuid
platform_type = new_platform_config.get("type", "")
if platform_type in WEBHOOK_SUPPORTED_PLATFORMS:
if new_platform_config.get("unified_webhook_mode", False):
# 如果没有 webhook_uuid 或为空,自动生成
if not new_platform_config.get("webhook_uuid"):
new_platform_config["webhook_uuid"] = uuid.uuid4().hex[:16]
# 如果是支持统一 webhook 模式的平台,生成 webhook_uuid
ensure_platform_webhook_config(new_platform_config)
self.config["platform"].append(new_platform_config)
try:
@@ -597,12 +591,7 @@ class ConfigRoute(Route):
return Response().error("参数错误").__dict__
# 如果是支持统一 webhook 模式的平台,且启用了统一 webhook 模式,确保有 webhook_uuid
platform_type = new_config.get("type", "")
if platform_type in WEBHOOK_SUPPORTED_PLATFORMS:
if new_config.get("unified_webhook_mode", False):
# 如果没有 webhook_uuid 或为空,自动生成
if not new_config.get("webhook_uuid"):
new_config["webhook_uuid"] = uuid.uuid4().hex
ensure_platform_webhook_config(new_config)
for i, platform in enumerate(self.config["platform"]):
if platform["id"] == platform_id:
+1 -1
View File
@@ -82,7 +82,7 @@ class PlatformRoute(Route):
"""
for platform in self.platform_manager.platform_insts:
if platform.config.get("webhook_uuid") == webhook_uuid:
if platform.config.get("unified_webhook_mode", False):
if platform.unified_webhook():
return platform
return None
+1 -1
View File
@@ -57,7 +57,7 @@
{{ getPlatformStat(item.id)?.error_count }} {{ tm('runtimeStatus.errors') }}
</v-chip>
</div>
<div v-if="item.unified_webhook_mode && item.webhook_uuid" class="webhook-info">
<div v-if="getPlatformStat(item.id)?.unified_webhook && item.webhook_uuid" class="webhook-info">
<v-chip
size="small"
color="primary"