Compare commits

...

5 Commits

Author SHA1 Message Date
copilot-swe-agent[bot] a2fe0ec5a1 Add webhook signature verification for security
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-12-12 14:27:51 +00:00
copilot-swe-agent[bot] 6957ec713d Clean up unused imports in tests
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-12-12 14:24:18 +00:00
copilot-swe-agent[bot] d97c8b5b2b Add tests for GitHub webhook platform adapter
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-12-12 14:23:22 +00:00
copilot-swe-agent[bot] d07a1ad5c9 Add GitHub webhook platform adapter with event handlers
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
2025-12-12 14:20:33 +00:00
copilot-swe-agent[bot] d8e6dfbd6b Initial plan 2025-12-12 14:14:49 +00:00
4 changed files with 620 additions and 0 deletions
+4
View File
@@ -112,6 +112,10 @@ class PlatformManager:
from .sources.satori.satori_adapter import (
SatoriPlatformAdapter, # noqa: F401
)
case "github_webhook":
from .sources.github_webhook.github_webhook_adapter import (
GitHubWebhookPlatformAdapter, # noqa: F401
)
except (ImportError, ModuleNotFoundError) as e:
logger.error(
f"加载平台适配器 {platform_config['type']} 失败,原因:{e}。请检查依赖库是否安装。提示:可以在 管理面板->平台日志->安装Pip库 中安装依赖库。",
@@ -0,0 +1,315 @@
import asyncio
import hashlib
import hmac
from typing import Any, cast
from astrbot import logger
from astrbot.api.event import MessageChain
from astrbot.api.message_components import Plain
from astrbot.api.platform import (
AstrBotMessage,
MessageMember,
MessageType,
Platform,
PlatformMetadata,
)
from astrbot.core.platform.astr_message_event import MessageSesion
from astrbot.core.platform.platform import PlatformStatus
from astrbot.core.utils.webhook_utils import log_webhook_info
from ...register import register_platform_adapter
from .github_webhook_event import GitHubWebhookMessageEvent
@register_platform_adapter(
"github_webhook",
"GitHub Webhook 适配器",
support_streaming_message=False,
)
class GitHubWebhookPlatformAdapter(Platform):
"""GitHub Webhook 平台适配器
支持的事件:
- issues (created)
- issue_comment (created)
- pull_request (opened)
"""
def __init__(
self,
platform_config: dict,
platform_settings: dict,
event_queue: asyncio.Queue,
) -> None:
super().__init__(platform_config, event_queue)
self.unified_webhook_mode = platform_config.get("unified_webhook_mode", True)
self.webhook_secret = platform_config.get("webhook_secret", "")
self.shutdown_event = asyncio.Event()
async def send_by_session(
self,
session: MessageSesion,
message_chain: MessageChain,
):
"""GitHub Webhook 是单向接收,不支持主动发送消息"""
logger.warning("GitHub Webhook 适配器不支持 send_by_session")
def meta(self) -> PlatformMetadata:
return PlatformMetadata(
name="github_webhook",
description="GitHub Webhook 适配器",
id=cast(str, self.config.get("id")),
)
async def run(self):
"""运行适配器"""
self.status = PlatformStatus.RUNNING
# 如果启用统一 webhook 模式
webhook_uuid = self.config.get("webhook_uuid")
if self.unified_webhook_mode and webhook_uuid:
log_webhook_info(f"{self.meta().id}(GitHub Webhook)", webhook_uuid)
# 保持运行状态,等待 shutdown
await self.shutdown_event.wait()
else:
logger.warning("GitHub Webhook 适配器需要启用统一 webhook 模式")
await self.shutdown_event.wait()
async def webhook_callback(self, request: Any) -> Any:
"""统一 Webhook 回调入口
处理 GitHub webhook 事件
Args:
request: Quart 请求对象
Returns:
响应数据
"""
try:
# 获取事件类型
event_type = request.headers.get("X-GitHub-Event", "")
# 获取请求数据
payload = await request.json
# 验证 webhook 签名(如果配置了 secret
if self.webhook_secret:
if not await self._verify_signature(request, payload):
logger.warning("GitHub webhook 签名验证失败")
return {"error": "Invalid signature"}, 401
logger.debug(f"收到 GitHub Webhook 事件: {event_type}")
# 处理不同类型的事件
if event_type == "issues":
await self._handle_issue_event(payload)
elif event_type == "issue_comment":
await self._handle_issue_comment_event(payload)
elif event_type == "pull_request":
await self._handle_pull_request_event(payload)
elif event_type == "ping":
# GitHub webhook 验证事件
return {"message": "pong"}
else:
logger.debug(f"忽略不支持的 GitHub 事件类型: {event_type}")
return {"status": "ok"}
except Exception as e:
logger.error(f"处理 GitHub webhook 回调时发生错误: {e}", exc_info=True)
return {"error": str(e)}, 500
async def _verify_signature(self, request: Any, payload: dict) -> bool:
"""验证 GitHub webhook 签名
Args:
request: Quart 请求对象
payload: 请求负载数据
Returns:
签名是否有效
"""
signature_header = request.headers.get("X-Hub-Signature-256", "")
if not signature_header:
# 如果没有签名头,检查是否有旧版本的签名
signature_header = request.headers.get("X-Hub-Signature", "")
if not signature_header:
return False
# 获取原始请求体
body = await request.get_data()
# 计算 HMAC
if signature_header.startswith("sha256="):
expected_signature = hmac.new(
self.webhook_secret.encode("utf-8"),
body,
hashlib.sha256,
).hexdigest()
received_signature = signature_header.replace("sha256=", "")
elif signature_header.startswith("sha1="):
expected_signature = hmac.new(
self.webhook_secret.encode("utf-8"),
body,
hashlib.sha1,
).hexdigest()
received_signature = signature_header.replace("sha1=", "")
else:
return False
# 使用 hmac.compare_digest 防止时序攻击
return hmac.compare_digest(expected_signature, received_signature)
async def _handle_issue_event(self, payload: dict):
"""处理 issue 事件"""
action = payload.get("action", "")
# 只处理创建事件
if action != "created" and action != "opened":
return
issue = payload.get("issue", {})
repo = payload.get("repository", {})
sender = payload.get("sender", {})
# 构造消息文本
message_text = (
f"📝 新 Issue 创建\n"
f"仓库: {repo.get('full_name', 'unknown')}\n"
f"标题: {issue.get('title', 'No title')}\n"
f"作者: {sender.get('login', 'unknown')}\n"
f"链接: {issue.get('html_url', '')}\n"
f"内容:\n{issue.get('body', 'No description')[:200]}"
)
# 创建 AstrBotMessage
abm = self._create_message(
message_text,
sender.get("login", "unknown"),
sender.get("login", "unknown"),
repo.get("full_name", "unknown"),
)
# 提交事件
self.commit_event(
GitHubWebhookMessageEvent(
message_text,
abm,
self.meta(),
repo.get("full_name", "unknown"),
"issues",
payload,
)
)
async def _handle_issue_comment_event(self, payload: dict):
"""处理 issue 评论事件"""
action = payload.get("action", "")
# 只处理创建事件
if action != "created":
return
issue = payload.get("issue", {})
comment = payload.get("comment", {})
repo = payload.get("repository", {})
sender = payload.get("sender", {})
# 构造消息文本
message_text = (
f"💬 新 Issue 评论\n"
f"仓库: {repo.get('full_name', 'unknown')}\n"
f"Issue: {issue.get('title', 'No title')}\n"
f"评论者: {sender.get('login', 'unknown')}\n"
f"链接: {comment.get('html_url', '')}\n"
f"内容:\n{comment.get('body', 'No comment')[:200]}"
)
# 创建 AstrBotMessage
abm = self._create_message(
message_text,
sender.get("login", "unknown"),
sender.get("login", "unknown"),
repo.get("full_name", "unknown"),
)
# 提交事件
self.commit_event(
GitHubWebhookMessageEvent(
message_text,
abm,
self.meta(),
repo.get("full_name", "unknown"),
"issue_comment",
payload,
)
)
async def _handle_pull_request_event(self, payload: dict):
"""处理 pull request 事件"""
action = payload.get("action", "")
# 只处理打开事件
if action != "opened":
return
pr = payload.get("pull_request", {})
repo = payload.get("repository", {})
sender = payload.get("sender", {})
# 构造消息文本
message_text = (
f"🔀 新 Pull Request\n"
f"仓库: {repo.get('full_name', 'unknown')}\n"
f"标题: {pr.get('title', 'No title')}\n"
f"作者: {sender.get('login', 'unknown')}\n"
f"链接: {pr.get('html_url', '')}\n"
f"内容:\n{pr.get('body', 'No description')[:200]}"
)
# 创建 AstrBotMessage
abm = self._create_message(
message_text,
sender.get("login", "unknown"),
sender.get("login", "unknown"),
repo.get("full_name", "unknown"),
)
# 提交事件
self.commit_event(
GitHubWebhookMessageEvent(
message_text,
abm,
self.meta(),
repo.get("full_name", "unknown"),
"pull_request",
payload,
)
)
def _create_message(
self,
message_text: str,
user_id: str,
nickname: str,
session_id: str,
) -> AstrBotMessage:
"""创建 AstrBotMessage 对象"""
abm = AstrBotMessage()
abm.type = MessageType.GROUP_MESSAGE
abm.self_id = self.client_self_id
abm.session_id = session_id
abm.message_id = ""
abm.sender = MessageMember(user_id=user_id, nickname=nickname)
abm.message = [Plain(message_text)]
abm.message_str = message_text
abm.raw_message = message_text
return abm
async def terminate(self):
"""终止适配器运行"""
self.shutdown_event.set()
logger.info("GitHub Webhook 适配器已经被优雅地关闭")
@@ -0,0 +1,22 @@
from astrbot.api.platform import AstrBotMessage, PlatformMetadata
from ...astr_message_event import AstrMessageEvent
class GitHubWebhookMessageEvent(AstrMessageEvent):
"""GitHub Webhook 消息事件"""
def __init__(
self,
message_str: str,
message_obj: AstrBotMessage,
platform_meta: PlatformMetadata,
session_id: str,
event_type: str,
event_data: dict,
):
super().__init__(message_str, message_obj, platform_meta, session_id)
self.event_type = event_type
"""GitHub 事件类型: issues, issue_comment, pull_request"""
self.event_data = event_data
"""原始事件数据"""
+279
View File
@@ -0,0 +1,279 @@
"""Test GitHub webhook platform adapter"""
import asyncio
import hashlib
import hmac
from unittest.mock import MagicMock
import pytest
from astrbot.core.platform.sources.github_webhook.github_webhook_adapter import (
GitHubWebhookPlatformAdapter,
)
@pytest.fixture
def event_queue():
"""Create a test event queue"""
return asyncio.Queue()
@pytest.fixture
def platform_config():
"""Create test platform configuration"""
return {
"type": "github_webhook",
"enable": True,
"id": "test_github_webhook",
"unified_webhook_mode": True,
"webhook_uuid": "test-uuid-123",
"webhook_secret": "", # No secret by default for easier testing
}
@pytest.fixture
def platform_settings():
"""Create test platform settings"""
return {"unique_session": False}
@pytest.fixture
def adapter(platform_config, platform_settings, event_queue):
"""Create test adapter instance"""
return GitHubWebhookPlatformAdapter(platform_config, platform_settings, event_queue)
class TestGitHubWebhookAdapter:
"""Test cases for GitHub webhook adapter"""
def test_adapter_initialization(self, adapter):
"""Test adapter is initialized correctly"""
assert adapter.unified_webhook_mode is True
assert adapter.webhook_secret == ""
assert adapter.meta().name == "github_webhook"
assert adapter.meta().description == "GitHub Webhook 适配器"
@pytest.mark.asyncio
async def test_ping_event(self, adapter):
"""Test GitHub ping event"""
# Mock request
request = MagicMock()
request.headers.get.return_value = "ping"
async def mock_json():
return {}
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"message": "pong"}
@pytest.mark.asyncio
async def test_issue_created_event(self, adapter, event_queue):
"""Test GitHub issue created event"""
# Mock request with issue created payload
request = MagicMock()
request.headers.get.return_value = "issues"
payload = {
"action": "opened",
"issue": {
"title": "Test Issue",
"body": "This is a test issue",
"html_url": "https://github.com/test/repo/issues/1",
},
"repository": {"full_name": "test/repo"},
"sender": {"login": "testuser"},
}
async def mock_json():
return payload
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"status": "ok"}
# Verify event was queued
assert not event_queue.empty()
event = event_queue.get_nowait()
assert event.event_type == "issues"
assert "新 Issue 创建" in event.message_str
assert "Test Issue" in event.message_str
@pytest.mark.asyncio
async def test_issue_comment_event(self, adapter, event_queue):
"""Test GitHub issue comment event"""
request = MagicMock()
request.headers.get.return_value = "issue_comment"
payload = {
"action": "created",
"issue": {"title": "Test Issue"},
"comment": {
"body": "Test comment",
"html_url": "https://github.com/test/repo/issues/1#comment",
},
"repository": {"full_name": "test/repo"},
"sender": {"login": "commenter"},
}
async def mock_json():
return payload
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"status": "ok"}
# Verify event was queued
assert not event_queue.empty()
event = event_queue.get_nowait()
assert event.event_type == "issue_comment"
assert "新 Issue 评论" in event.message_str
assert "Test comment" in event.message_str
@pytest.mark.asyncio
async def test_pull_request_event(self, adapter, event_queue):
"""Test GitHub pull request opened event"""
request = MagicMock()
request.headers.get.return_value = "pull_request"
payload = {
"action": "opened",
"pull_request": {
"title": "Test PR",
"body": "This is a test PR",
"html_url": "https://github.com/test/repo/pull/1",
},
"repository": {"full_name": "test/repo"},
"sender": {"login": "prauthor"},
}
async def mock_json():
return payload
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"status": "ok"}
# Verify event was queued
assert not event_queue.empty()
event = event_queue.get_nowait()
assert event.event_type == "pull_request"
assert "新 Pull Request" in event.message_str
assert "Test PR" in event.message_str
@pytest.mark.asyncio
async def test_unsupported_event(self, adapter, event_queue):
"""Test unsupported GitHub event type"""
request = MagicMock()
request.headers.get.return_value = "push"
async def mock_json():
return {"action": "created"}
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"status": "ok"}
# Verify no event was queued for unsupported events
assert event_queue.empty()
@pytest.mark.asyncio
async def test_issue_closed_ignored(self, adapter, event_queue):
"""Test that issue closed action is ignored"""
request = MagicMock()
request.headers.get.return_value = "issues"
payload = {
"action": "closed", # Should be ignored
"issue": {"title": "Test Issue"},
"repository": {"full_name": "test/repo"},
"sender": {"login": "testuser"},
}
async def mock_json():
return payload
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"status": "ok"}
# Verify no event was queued
assert event_queue.empty()
@pytest.mark.asyncio
async def test_signature_verification(self, platform_settings, event_queue):
"""Test webhook signature verification"""
# Create adapter with webhook secret
config_with_secret = {
"type": "github_webhook",
"enable": True,
"id": "test_github_webhook",
"unified_webhook_mode": True,
"webhook_uuid": "test-uuid-123",
"webhook_secret": "test-secret",
}
adapter = GitHubWebhookPlatformAdapter(
config_with_secret, platform_settings, event_queue
)
# Create a valid signature
body = b'{"action": "opened"}'
signature = hmac.new(b"test-secret", body, hashlib.sha256).hexdigest()
# Mock request with valid signature
request = MagicMock()
request.headers.get = lambda key, default="": {
"X-GitHub-Event": "ping",
"X-Hub-Signature-256": f"sha256={signature}",
}.get(key, default)
async def mock_get_data():
return body
request.get_data = mock_get_data
async def mock_json():
return {"action": "opened"}
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == {"message": "pong"}
@pytest.mark.asyncio
async def test_invalid_signature(self, platform_settings, event_queue):
"""Test webhook with invalid signature is rejected"""
# Create adapter with webhook secret
config_with_secret = {
"type": "github_webhook",
"enable": True,
"id": "test_github_webhook",
"unified_webhook_mode": True,
"webhook_uuid": "test-uuid-123",
"webhook_secret": "test-secret",
}
adapter = GitHubWebhookPlatformAdapter(
config_with_secret, platform_settings, event_queue
)
# Mock request with invalid signature
request = MagicMock()
request.headers.get = lambda key, default="": {
"X-GitHub-Event": "ping",
"X-Hub-Signature-256": "sha256=invalidsignature",
}.get(key, default)
async def mock_get_data():
return b'{"action": "opened"}'
request.get_data = mock_get_data
async def mock_json():
return {"action": "opened"}
request.json = mock_json()
response = await adapter.webhook_callback(request)
assert response == ({"error": "Invalid signature"}, 401)