fix: ensure message stream order (#4487)
This commit is contained in:
@@ -93,7 +93,8 @@ class WebChatAdapter(Platform):
|
||||
session: MessageSesion,
|
||||
message_chain: MessageChain,
|
||||
):
|
||||
await WebChatMessageEvent._send(message_chain, session.session_id)
|
||||
message_id = f"active_{str(uuid.uuid4())}"
|
||||
await WebChatMessageEvent._send(message_id, message_chain, session.session_id)
|
||||
await super().send_by_session(session, message_chain)
|
||||
|
||||
async def _get_message_history(
|
||||
@@ -196,7 +197,7 @@ class WebChatAdapter(Platform):
|
||||
|
||||
abm.session_id = f"webchat!{username}!{cid}"
|
||||
|
||||
abm.message_id = str(uuid.uuid4())
|
||||
abm.message_id = payload.get("message_id")
|
||||
|
||||
# 处理消息段列表
|
||||
message_parts = payload.get("message", [])
|
||||
|
||||
@@ -21,7 +21,10 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
|
||||
@staticmethod
|
||||
async def _send(
|
||||
message: MessageChain | None, session_id: str, streaming: bool = False
|
||||
message_id: str,
|
||||
message: MessageChain | None,
|
||||
session_id: str,
|
||||
streaming: bool = False,
|
||||
) -> str | None:
|
||||
cid = session_id.split("!")[-1]
|
||||
web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
|
||||
@@ -31,6 +34,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"type": "end",
|
||||
"data": "",
|
||||
"streaming": False,
|
||||
"message_id": message_id,
|
||||
}, # end means this request is finished
|
||||
)
|
||||
return
|
||||
@@ -45,6 +49,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"data": data,
|
||||
"streaming": streaming,
|
||||
"chain_type": message.type,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
elif isinstance(comp, Json):
|
||||
@@ -54,6 +59,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"data": json.dumps(comp.data, ensure_ascii=False),
|
||||
"streaming": streaming,
|
||||
"chain_type": message.type,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
elif isinstance(comp, Image):
|
||||
@@ -69,6 +75,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"type": "image",
|
||||
"data": data,
|
||||
"streaming": streaming,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
elif isinstance(comp, Record):
|
||||
@@ -84,6 +91,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"type": "record",
|
||||
"data": data,
|
||||
"streaming": streaming,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
elif isinstance(comp, File):
|
||||
@@ -100,6 +108,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"type": "file",
|
||||
"data": data,
|
||||
"streaming": streaming,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
else:
|
||||
@@ -108,7 +117,8 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
return data
|
||||
|
||||
async def send(self, message: MessageChain | None):
|
||||
await WebChatMessageEvent._send(message, session_id=self.session_id)
|
||||
message_id = self.message_obj.message_id
|
||||
await WebChatMessageEvent._send(message_id, message, session_id=self.session_id)
|
||||
await super().send(MessageChain([]))
|
||||
|
||||
async def send_streaming(self, generator, use_fallback: bool = False):
|
||||
@@ -116,6 +126,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
reasoning_content = ""
|
||||
cid = self.session_id.split("!")[-1]
|
||||
web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid)
|
||||
message_id = self.message_obj.message_id
|
||||
async for chain in generator:
|
||||
# if chain.type == "break" and final_data:
|
||||
# # 分割符
|
||||
@@ -130,7 +141,8 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
# continue
|
||||
|
||||
r = await WebChatMessageEvent._send(
|
||||
chain,
|
||||
message_id=message_id,
|
||||
message=chain,
|
||||
session_id=self.session_id,
|
||||
streaming=True,
|
||||
)
|
||||
@@ -147,6 +159,7 @@ class WebChatMessageEvent(AstrMessageEvent):
|
||||
"data": final_data,
|
||||
"reasoning": reasoning_content,
|
||||
"streaming": True,
|
||||
"message_id": message_id,
|
||||
},
|
||||
)
|
||||
await super().send_streaming(generator, use_fallback)
|
||||
|
||||
@@ -296,6 +296,8 @@ class ChatRoute(Route):
|
||||
# 构建用户消息段(包含 path 用于传递给 adapter)
|
||||
message_parts = await self._build_user_message_parts(message)
|
||||
|
||||
message_id = str(uuid.uuid4())
|
||||
|
||||
async def stream():
|
||||
client_disconnected = False
|
||||
accumulated_parts = []
|
||||
@@ -319,6 +321,13 @@ class ChatRoute(Route):
|
||||
if not result:
|
||||
continue
|
||||
|
||||
if (
|
||||
"message_id" in result
|
||||
and result["message_id"] != message_id
|
||||
):
|
||||
logger.warning("webchat stream message_id mismatch")
|
||||
continue
|
||||
|
||||
result_text = result["data"]
|
||||
msg_type = result.get("type")
|
||||
streaming = result.get("streaming", False)
|
||||
@@ -456,6 +465,7 @@ class ChatRoute(Route):
|
||||
"selected_provider": selected_provider,
|
||||
"selected_model": selected_model,
|
||||
"enable_streaming": enable_streaming,
|
||||
"message_id": message_id,
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user