refactor: 重构 wechatpadpro 授权码生成并增强安全性
- 将 generate_auth_key 方法中的授权码提取逻辑重构为新的辅助方法 _extract_auth_key ,以提高代码的可读性和可测试性。
- 在访问 data.get('authKeys') 之前添加 isinstance(data, dict) 检查,以防止潜在的 AttributeError 。
- 移除了 auth_key 的明文日志记录,以避免敏感信息泄露。
- 在生成新密钥之前,将 self.auth_key 初始化为 None ,以避免在失败时保留旧值。
This commit is contained in:
@@ -210,6 +210,16 @@ class WeChatPadProAdapter(Platform):
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
|
||||
def _extract_auth_key(self, data):
|
||||
"""Helper method to extract auth_key from response data."""
|
||||
if isinstance(data, dict):
|
||||
auth_keys = data.get("authKeys") # 新接口
|
||||
if isinstance(auth_keys, list) and auth_keys:
|
||||
return auth_keys[0]
|
||||
elif isinstance(data, list) and data: # 旧接口
|
||||
return data[0]
|
||||
return None
|
||||
|
||||
async def generate_auth_key(self):
|
||||
"""
|
||||
生成授权码。
|
||||
@@ -218,30 +228,26 @@ class WeChatPadProAdapter(Platform):
|
||||
params = {"key": self.admin_key}
|
||||
payload = {"Count": 1, "Days": 365} # 生成一个有效期365天的授权码
|
||||
|
||||
self.auth_key = None # Reset auth_key before generating a new one
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
async with session.post(url, params=params, json=payload) as response:
|
||||
if response.status != 200:
|
||||
logger.error(f"生成授权码失败: {response.status}, {await response.text()}")
|
||||
return
|
||||
|
||||
response_data = await response.json()
|
||||
# 修正成功判断条件和授权码提取路径
|
||||
if response.status == 200 and response_data.get("Code") == 200:
|
||||
# 授权码在 Data 字段的列表中
|
||||
data = response_data.get("Data")
|
||||
if data:
|
||||
# 新返回格式
|
||||
if isinstance(data.get("authKeys"), list) and data["authKeys"]:
|
||||
self.auth_key = data["authKeys"][0]
|
||||
# 兼容旧版接口
|
||||
elif isinstance(data, list) and data:
|
||||
self.auth_key = data[0]
|
||||
|
||||
if self.auth_key:
|
||||
logger.info(f"成功获取授权码 {self.auth_key[:8]}...")
|
||||
else:
|
||||
logger.error(f"生成授权码成功但未找到授权码: {response_data}")
|
||||
if response_data.get("Code") == 200:
|
||||
if data := response_data.get("Data"):
|
||||
self.auth_key = self._extract_auth_key(data)
|
||||
|
||||
if self.auth_key:
|
||||
logger.info("成功获取授权码")
|
||||
else:
|
||||
logger.error(f"生成授权码成功但未找到授权码: {response_data}")
|
||||
else:
|
||||
logger.error(f"生成授权码失败: {response.status}, {response_data}")
|
||||
logger.error(f"生成授权码失败: {response_data}")
|
||||
except aiohttp.ClientConnectorError as e:
|
||||
logger.error(f"连接到 WeChatPadPro 服务失败: {e}")
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user