Add webhook signature verification for security
Co-authored-by: Soulter <37870767+Soulter@users.noreply.github.com>
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
from typing import Any, cast
|
||||
|
||||
from astrbot import logger
|
||||
@@ -92,6 +94,12 @@ class GitHubWebhookPlatformAdapter(Platform):
|
||||
# 获取请求数据
|
||||
payload = await request.json
|
||||
|
||||
# 验证 webhook 签名(如果配置了 secret)
|
||||
if self.webhook_secret:
|
||||
if not await self._verify_signature(request, payload):
|
||||
logger.warning("GitHub webhook 签名验证失败")
|
||||
return {"error": "Invalid signature"}, 401
|
||||
|
||||
logger.debug(f"收到 GitHub Webhook 事件: {event_type}")
|
||||
|
||||
# 处理不同类型的事件
|
||||
@@ -113,6 +121,47 @@ class GitHubWebhookPlatformAdapter(Platform):
|
||||
logger.error(f"处理 GitHub webhook 回调时发生错误: {e}", exc_info=True)
|
||||
return {"error": str(e)}, 500
|
||||
|
||||
async def _verify_signature(self, request: Any, payload: dict) -> bool:
|
||||
"""验证 GitHub webhook 签名
|
||||
|
||||
Args:
|
||||
request: Quart 请求对象
|
||||
payload: 请求负载数据
|
||||
|
||||
Returns:
|
||||
签名是否有效
|
||||
"""
|
||||
signature_header = request.headers.get("X-Hub-Signature-256", "")
|
||||
if not signature_header:
|
||||
# 如果没有签名头,检查是否有旧版本的签名
|
||||
signature_header = request.headers.get("X-Hub-Signature", "")
|
||||
if not signature_header:
|
||||
return False
|
||||
|
||||
# 获取原始请求体
|
||||
body = await request.get_data()
|
||||
|
||||
# 计算 HMAC
|
||||
if signature_header.startswith("sha256="):
|
||||
expected_signature = hmac.new(
|
||||
self.webhook_secret.encode("utf-8"),
|
||||
body,
|
||||
hashlib.sha256,
|
||||
).hexdigest()
|
||||
received_signature = signature_header.replace("sha256=", "")
|
||||
elif signature_header.startswith("sha1="):
|
||||
expected_signature = hmac.new(
|
||||
self.webhook_secret.encode("utf-8"),
|
||||
body,
|
||||
hashlib.sha1,
|
||||
).hexdigest()
|
||||
received_signature = signature_header.replace("sha1=", "")
|
||||
else:
|
||||
return False
|
||||
|
||||
# 使用 hmac.compare_digest 防止时序攻击
|
||||
return hmac.compare_digest(expected_signature, received_signature)
|
||||
|
||||
async def _handle_issue_event(self, payload: dict):
|
||||
"""处理 issue 事件"""
|
||||
action = payload.get("action", "")
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Test GitHub webhook platform adapter"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
@@ -25,7 +27,7 @@ def platform_config():
|
||||
"id": "test_github_webhook",
|
||||
"unified_webhook_mode": True,
|
||||
"webhook_uuid": "test-uuid-123",
|
||||
"webhook_secret": "test-secret",
|
||||
"webhook_secret": "", # No secret by default for easier testing
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +49,7 @@ class TestGitHubWebhookAdapter:
|
||||
def test_adapter_initialization(self, adapter):
|
||||
"""Test adapter is initialized correctly"""
|
||||
assert adapter.unified_webhook_mode is True
|
||||
assert adapter.webhook_secret == "test-secret"
|
||||
assert adapter.webhook_secret == ""
|
||||
assert adapter.meta().name == "github_webhook"
|
||||
assert adapter.meta().description == "GitHub Webhook 适配器"
|
||||
|
||||
@@ -199,3 +201,79 @@ class TestGitHubWebhookAdapter:
|
||||
|
||||
# Verify no event was queued
|
||||
assert event_queue.empty()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signature_verification(self, platform_settings, event_queue):
|
||||
"""Test webhook signature verification"""
|
||||
# Create adapter with webhook secret
|
||||
config_with_secret = {
|
||||
"type": "github_webhook",
|
||||
"enable": True,
|
||||
"id": "test_github_webhook",
|
||||
"unified_webhook_mode": True,
|
||||
"webhook_uuid": "test-uuid-123",
|
||||
"webhook_secret": "test-secret",
|
||||
}
|
||||
adapter = GitHubWebhookPlatformAdapter(
|
||||
config_with_secret, platform_settings, event_queue
|
||||
)
|
||||
|
||||
# Create a valid signature
|
||||
body = b'{"action": "opened"}'
|
||||
signature = hmac.new(b"test-secret", body, hashlib.sha256).hexdigest()
|
||||
|
||||
# Mock request with valid signature
|
||||
request = MagicMock()
|
||||
request.headers.get = lambda key, default="": {
|
||||
"X-GitHub-Event": "ping",
|
||||
"X-Hub-Signature-256": f"sha256={signature}",
|
||||
}.get(key, default)
|
||||
|
||||
async def mock_get_data():
|
||||
return body
|
||||
|
||||
request.get_data = mock_get_data
|
||||
|
||||
async def mock_json():
|
||||
return {"action": "opened"}
|
||||
|
||||
request.json = mock_json()
|
||||
|
||||
response = await adapter.webhook_callback(request)
|
||||
assert response == {"message": "pong"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_signature(self, platform_settings, event_queue):
|
||||
"""Test webhook with invalid signature is rejected"""
|
||||
# Create adapter with webhook secret
|
||||
config_with_secret = {
|
||||
"type": "github_webhook",
|
||||
"enable": True,
|
||||
"id": "test_github_webhook",
|
||||
"unified_webhook_mode": True,
|
||||
"webhook_uuid": "test-uuid-123",
|
||||
"webhook_secret": "test-secret",
|
||||
}
|
||||
adapter = GitHubWebhookPlatformAdapter(
|
||||
config_with_secret, platform_settings, event_queue
|
||||
)
|
||||
|
||||
# Mock request with invalid signature
|
||||
request = MagicMock()
|
||||
request.headers.get = lambda key, default="": {
|
||||
"X-GitHub-Event": "ping",
|
||||
"X-Hub-Signature-256": "sha256=invalidsignature",
|
||||
}.get(key, default)
|
||||
|
||||
async def mock_get_data():
|
||||
return b'{"action": "opened"}'
|
||||
|
||||
request.get_data = mock_get_data
|
||||
|
||||
async def mock_json():
|
||||
return {"action": "opened"}
|
||||
|
||||
request.json = mock_json()
|
||||
|
||||
response = await adapter.webhook_callback(request)
|
||||
assert response == ({"error": "Invalid signature"}, 401)
|
||||
|
||||
Reference in New Issue
Block a user