From a2fe0ec5a10ae3f7fc609e92825daa5f8d9f8fa4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:27:51 +0000 Subject: [PATCH] Add webhook signature verification for security Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com> --- .../github_webhook/github_webhook_adapter.py | 49 +++++++++++ tests/test_github_webhook.py | 82 ++++++++++++++++++- 2 files changed, 129 insertions(+), 2 deletions(-) diff --git a/astrbot/core/platform/sources/github_webhook/github_webhook_adapter.py b/astrbot/core/platform/sources/github_webhook/github_webhook_adapter.py index 24be375aa..732ec37ae 100644 --- a/astrbot/core/platform/sources/github_webhook/github_webhook_adapter.py +++ b/astrbot/core/platform/sources/github_webhook/github_webhook_adapter.py @@ -1,4 +1,6 @@ import asyncio +import hashlib +import hmac from typing import Any, cast from astrbot import logger @@ -92,6 +94,12 @@ class GitHubWebhookPlatformAdapter(Platform): # 获取请求数据 payload = await request.json + # 验证 webhook 签名(如果配置了 secret) + if self.webhook_secret: + if not await self._verify_signature(request, payload): + logger.warning("GitHub webhook 签名验证失败") + return {"error": "Invalid signature"}, 401 + logger.debug(f"收到 GitHub Webhook 事件: {event_type}") # 处理不同类型的事件 @@ -113,6 +121,47 @@ class GitHubWebhookPlatformAdapter(Platform): logger.error(f"处理 GitHub webhook 回调时发生错误: {e}", exc_info=True) return {"error": str(e)}, 500 + async def _verify_signature(self, request: Any, payload: dict) -> bool: + """验证 GitHub webhook 签名 + + Args: + request: Quart 请求对象 + payload: 请求负载数据 + + Returns: + 签名是否有效 + """ + signature_header = request.headers.get("X-Hub-Signature-256", "") + if not signature_header: + # 如果没有签名头,检查是否有旧版本的签名 + signature_header = request.headers.get("X-Hub-Signature", "") + if not signature_header: + return False + + # 获取原始请求体 + body = await request.get_data() + + # 计算 HMAC + if signature_header.startswith("sha256="): + expected_signature = hmac.new( + self.webhook_secret.encode("utf-8"), + body, + hashlib.sha256, + ).hexdigest() + received_signature = signature_header.replace("sha256=", "") + elif signature_header.startswith("sha1="): + expected_signature = hmac.new( + self.webhook_secret.encode("utf-8"), + body, + hashlib.sha1, + ).hexdigest() + received_signature = signature_header.replace("sha1=", "") + else: + return False + + # 使用 hmac.compare_digest 防止时序攻击 + return hmac.compare_digest(expected_signature, received_signature) + async def _handle_issue_event(self, payload: dict): """处理 issue 事件""" action = payload.get("action", "") diff --git a/tests/test_github_webhook.py b/tests/test_github_webhook.py index e04bf75d6..6f5c619c5 100644 --- a/tests/test_github_webhook.py +++ b/tests/test_github_webhook.py @@ -1,6 +1,8 @@ """Test GitHub webhook platform adapter""" import asyncio +import hashlib +import hmac from unittest.mock import MagicMock import pytest @@ -25,7 +27,7 @@ def platform_config(): "id": "test_github_webhook", "unified_webhook_mode": True, "webhook_uuid": "test-uuid-123", - "webhook_secret": "test-secret", + "webhook_secret": "", # No secret by default for easier testing } @@ -47,7 +49,7 @@ class TestGitHubWebhookAdapter: def test_adapter_initialization(self, adapter): """Test adapter is initialized correctly""" assert adapter.unified_webhook_mode is True - assert adapter.webhook_secret == "test-secret" + assert adapter.webhook_secret == "" assert adapter.meta().name == "github_webhook" assert adapter.meta().description == "GitHub Webhook 适配器" @@ -199,3 +201,79 @@ class TestGitHubWebhookAdapter: # Verify no event was queued assert event_queue.empty() + + @pytest.mark.asyncio + async def test_signature_verification(self, platform_settings, event_queue): + """Test webhook signature verification""" + # Create adapter with webhook secret + config_with_secret = { + "type": "github_webhook", + "enable": True, + "id": "test_github_webhook", + "unified_webhook_mode": True, + "webhook_uuid": "test-uuid-123", + "webhook_secret": "test-secret", + } + adapter = GitHubWebhookPlatformAdapter( + config_with_secret, platform_settings, event_queue + ) + + # Create a valid signature + body = b'{"action": "opened"}' + signature = hmac.new(b"test-secret", body, hashlib.sha256).hexdigest() + + # Mock request with valid signature + request = MagicMock() + request.headers.get = lambda key, default="": { + "X-GitHub-Event": "ping", + "X-Hub-Signature-256": f"sha256={signature}", + }.get(key, default) + + async def mock_get_data(): + return body + + request.get_data = mock_get_data + + async def mock_json(): + return {"action": "opened"} + + request.json = mock_json() + + response = await adapter.webhook_callback(request) + assert response == {"message": "pong"} + + @pytest.mark.asyncio + async def test_invalid_signature(self, platform_settings, event_queue): + """Test webhook with invalid signature is rejected""" + # Create adapter with webhook secret + config_with_secret = { + "type": "github_webhook", + "enable": True, + "id": "test_github_webhook", + "unified_webhook_mode": True, + "webhook_uuid": "test-uuid-123", + "webhook_secret": "test-secret", + } + adapter = GitHubWebhookPlatformAdapter( + config_with_secret, platform_settings, event_queue + ) + + # Mock request with invalid signature + request = MagicMock() + request.headers.get = lambda key, default="": { + "X-GitHub-Event": "ping", + "X-Hub-Signature-256": "sha256=invalidsignature", + }.get(key, default) + + async def mock_get_data(): + return b'{"action": "opened"}' + + request.get_data = mock_get_data + + async def mock_json(): + return {"action": "opened"} + + request.json = mock_json() + + response = await adapter.webhook_callback(request) + assert response == ({"error": "Invalid signature"}, 401)