diff --git a/astrbot/core/utils/dify_api_client.py b/astrbot/core/utils/dify_api_client.py index fbc8a7e59..30ac08de1 100644 --- a/astrbot/core/utils/dify_api_client.py +++ b/astrbot/core/utils/dify_api_client.py @@ -35,19 +35,30 @@ class DifyAPIClient: text = await resp.text() raise Exception(f"chat_messages 请求失败:{resp.status}. {text}") + buffer = "" while True: - data = await resp.content.read(8192) # 防止数据过大导致高水位报错 - if not data: + # 保持原有的8192字节限制,防止数据过大导致高水位报错 + chunk = await resp.content.read(8192) + if not chunk: break - if not data.strip(): - continue - elif data.startswith(b"data:"): - try: - json_ = json.loads(data[5:]) - yield json_ - except BaseException: - pass + buffer += chunk.decode('utf-8') + blocks = buffer.split('\n\n') + + # 处理完整的数据块 + for block in blocks[:-1]: + if block.strip() and block.startswith('data:'): + try: + json_str = block[5:] # 移除 "data:" 前缀 + json_obj = json.loads(json_str) + yield json_obj + except json.JSONDecodeError as e: + logger.error(f"JSON解析错误: {str(e)}") + logger.error(f"原始数据块: {json_str}") + + # 保留最后一个可能不完整的块 + buffer = blocks[-1] if blocks else "" + async def workflow_run( self, inputs: Dict, @@ -66,20 +77,32 @@ class DifyAPIClient: ) as resp: if resp.status != 200: text = await resp.text() - raise Exception(f"chat_messages 请求失败:{resp.status}. {text}") + raise Exception(f"workflow_run 请求失败:{resp.status}. {text}") + + buffer = "" while True: - data = await resp.content.read(8192) # 防止数据过大导致高水位报错 - if not data: + # 保持原有的8192字节限制,防止数据过大导致高水位报错 + chunk = await resp.content.read(8192) + if not chunk: break - if not data.strip(): - continue - elif data.startswith(b"data:"): - try: - json_ = json.loads(data[5:]) - yield json_ - except BaseException: - pass + buffer += chunk.decode('utf-8') + blocks = buffer.split('\n\n') + + # 处理完整的数据块 + for block in blocks[:-1]: + if block.strip() and block.startswith('data:'): + try: + json_str = block[5:] # 移除 "data:" 前缀 + json_obj = json.loads(json_str) + yield json_obj + except json.JSONDecodeError as e: + logger.error(f"JSON解析错误: {str(e)}") + logger.error(f"原始数据块: {json_str}") + + # 保留最后一个可能不完整的块 + buffer = blocks[-1] if blocks else "" + async def file_upload( self, file_path: str,