diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py index d1fd0e187..97b2b2fb4 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py @@ -51,6 +51,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): VIDEO_FILE_TYPE = 2 VOICE_FILE_TYPE = 3 FILE_FILE_TYPE = 4 + STREAM_MARKDOWN_NEWLINE_ERROR = "流式消息md分片需要\\n结束" def __init__( self, @@ -69,35 +70,71 @@ class QQOfficialMessageEvent(AstrMessageEvent): await self._post_send() async def send_streaming(self, generator, use_fallback: bool = False): - """流式输出仅支持消息列表私聊""" + """流式输出仅支持消息列表私聊(C2C),其他消息源退化为普通发送""" + # 先标记事件层“已执行发送操作”,避免异常路径遗漏 + await super().send_streaming(generator, use_fallback) + # QQ C2C 流式协议:开始/中间分片使用 state=1,结束分片使用 state=10 stream_payload = {"state": 1, "id": None, "index": 0, "reset": False} - last_edit_time = 0 # 上次编辑消息的时间 - throttle_interval = 1 # 编辑消息的间隔时间 (秒) + last_edit_time = 0 # 上次发送分片的时间 + throttle_interval = 1 # 分片间最短间隔 (秒) ret = None + source = ( + self.message_obj.raw_message + ) # 提前获取,避免 generator 为空时 NameError try: async for chain in generator: source = self.message_obj.raw_message + + if not isinstance(source, botpy.message.C2CMessage): + # 非 C2C 场景:直接累积,最后统一发 + if not self.send_buffer: + self.send_buffer = chain + else: + self.send_buffer.chain.extend(chain.chain) + continue + + # ---- C2C 流式场景 ---- + + # tool_call break 信号:工具开始执行,先把已有 buffer 以 state=10 结束当前流式段 + if chain.type == "break": + if self.send_buffer: + stream_payload["state"] = 10 + ret = await self._post_send(stream=stream_payload) + ret_id = self._extract_response_message_id(ret) + if ret_id is not None: + stream_payload["id"] = ret_id + # 重置 stream_payload,为下一段流式做准备 + stream_payload = { + "state": 1, + "id": None, + "index": 0, + "reset": False, + } + last_edit_time = 0 + continue + + # 累积内容 if not self.send_buffer: self.send_buffer = chain else: self.send_buffer.chain.extend(chain.chain) - if isinstance(source, botpy.message.C2CMessage): - # 真流式传输 - current_time = asyncio.get_running_loop().time() - time_since_last_edit = current_time - last_edit_time - - if time_since_last_edit >= throttle_interval: - ret = cast( - message.Message, - await self._post_send(stream=stream_payload), - ) - stream_payload["index"] += 1 - stream_payload["id"] = ret["id"] - last_edit_time = asyncio.get_running_loop().time() + # 节流:按时间间隔发送中间分片 + current_time = asyncio.get_running_loop().time() + if current_time - last_edit_time >= throttle_interval: + ret = cast( + message.Message, + await self._post_send(stream=stream_payload), + ) + stream_payload["index"] += 1 + ret_id = self._extract_response_message_id(ret) + if ret_id is not None: + stream_payload["id"] = ret_id + last_edit_time = asyncio.get_running_loop().time() + self.send_buffer = None # 清空已发送的分片,避免下次重复发送旧内容 if isinstance(source, botpy.message.C2CMessage): - # 结束流式对话,并且传输 buffer 中剩余的消息 + # 结束流式对话,发送 buffer 中剩余内容 stream_payload["state"] = 10 ret = await self._post_send(stream=stream_payload) else: @@ -105,9 +142,22 @@ class QQOfficialMessageEvent(AstrMessageEvent): except Exception as e: logger.error(f"发送流式消息时出错: {e}", exc_info=True) + # 避免累计内容在异常后被整包重复发送:仅清理缓存,不做非流式整包兜底 + # 如需兜底,应该只发送未发送 delta(后续可继续优化) self.send_buffer = None - return await super().send_streaming(generator, use_fallback) + return None + + @staticmethod + def _extract_response_message_id(ret) -> str | None: + """兼容 qq-botpy 返回 Message 对象或 dict 两种形态。""" + if ret is None: + return None + if isinstance(ret, dict): + ret_id = ret.get("id") + return str(ret_id) if ret_id is not None else None + ret_id = getattr(ret, "id", None) + return str(ret_id) if ret_id is not None else None async def _post_send(self, stream: dict | None = None): if not self.send_buffer: @@ -135,6 +185,11 @@ class QQOfficialMessageEvent(AstrMessageEvent): file_name, ) = await QQOfficialMessageEvent._parse_to_qqofficial(self.send_buffer) + # C2C 流式仅用于文本分片,富媒体时降级为普通发送,避免平台侧流式校验报错。 + if stream and (image_base64 or record_file_path): + logger.debug("[QQOfficial] 检测到富媒体,降级为非流式发送。") + stream = None + if ( not plain_text and not image_base64 @@ -145,6 +200,17 @@ class QQOfficialMessageEvent(AstrMessageEvent): ): return None + # QQ C2C 流式 API 说明: + # - 开始/中间分片(state=1):增量追加内容,不需要 \n(加了会导致强制换行) + # - 最终分片(state=10):结束流,content 必须以 \n 结尾(QQ API 要求) + if ( + stream + and stream.get("state") == 10 + and plain_text + and not plain_text.endswith("\n") + ): + plain_text = plain_text + "\n" + payload: dict = { # "content": plain_text, "markdown": MarkdownPayload(content=plain_text) if plain_text else None, @@ -214,6 +280,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ), payload=payload, plain_text=plain_text, + stream=stream, ) case botpy.message.C2CMessage(): @@ -270,6 +337,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ), payload=payload, plain_text=plain_text, + stream=stream, ) else: ret = await self._send_with_markdown_fallback( @@ -279,6 +347,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ), payload=payload, plain_text=plain_text, + stream=stream, ) logger.debug(f"Message sent to C2C: {ret}") @@ -294,6 +363,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ), payload=payload, plain_text=plain_text, + stream=stream, ) case botpy.message.DirectMessage(): @@ -308,6 +378,7 @@ class QQOfficialMessageEvent(AstrMessageEvent): ), payload=payload, plain_text=plain_text, + stream=stream, ) case _: @@ -324,10 +395,31 @@ class QQOfficialMessageEvent(AstrMessageEvent): send_func, payload: dict, plain_text: str, + stream: dict | None = None, ): try: return await send_func(payload) except botpy.errors.ServerError as err: + # QQ 流式 markdown 分片校验:内容必须以换行结尾。 + # 某些边界场景服务端仍可能判定失败,这里做一次修正重试。 + if stream and self.STREAM_MARKDOWN_NEWLINE_ERROR in str(err): + retry_payload = payload.copy() + + markdown_payload = retry_payload.get("markdown") + if isinstance(markdown_payload, dict): + md_content = cast(str, markdown_payload.get("content", "") or "") + if md_content and not md_content.endswith("\n"): + retry_payload["markdown"] = {"content": md_content + "\n"} + + content = cast(str | None, retry_payload.get("content")) + if content and not content.endswith("\n"): + retry_payload["content"] = content + "\n" + + logger.warning( + "[QQOfficial] 流式 markdown 分片换行校验失败,已修正后重试一次。" + ) + return await send_func(retry_payload) + if ( self.MARKDOWN_NOT_ALLOWED_ERROR not in str(err) or not payload.get("markdown") @@ -339,10 +431,14 @@ class QQOfficialMessageEvent(AstrMessageEvent): "[QQOfficial] markdown 发送被拒绝,回退到 content 模式重试。" ) fallback_payload = payload.copy() - fallback_payload["markdown"] = None + fallback_payload.pop("markdown", None) fallback_payload["content"] = plain_text if fallback_payload.get("msg_type") == 2: fallback_payload["msg_type"] = 0 + if stream: + fallback_content = cast(str, fallback_payload.get("content") or "") + if fallback_content and not fallback_content.endswith("\n"): + fallback_payload["content"] = fallback_content + "\n" return await send_func(fallback_payload) async def upload_group_and_c2c_image( @@ -460,13 +556,21 @@ class QQOfficialMessageEvent(AstrMessageEvent): ) -> message.Message: payload = locals() payload.pop("self", None) + # QQ API does not accept stream.id=None; remove it when not yet assigned + if "stream" in payload and payload["stream"] is not None: + stream_data = dict(payload["stream"]) + if stream_data.get("id") is None: + stream_data.pop("id", None) + payload["stream"] = stream_data route = Route("POST", "/v2/users/{openid}/messages", openid=openid) result = await self.bot.api._http.request(route, json=payload) + if result is None: + logger.warning("[QQOfficial] post_c2c_message: API 返回 None,跳过本次发送") + return None if not isinstance(result, dict): - raise RuntimeError( - f"Failed to post c2c message, response is not dict: {result}" - ) + logger.error(f"[QQOfficial] post_c2c_message: 响应不是 dict: {result}") + return None return message.Message(**result) diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py index 88d4a2128..436be70db 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py @@ -212,6 +212,7 @@ class QQOfficialPlatformAdapter(Platform): if media: payload["media"] = media payload["msg_type"] = 7 + payload.pop("msg_id", None) if file_source: media = await QQOfficialMessageEvent.upload_group_and_c2c_media( send_helper, # type: ignore @@ -223,6 +224,7 @@ class QQOfficialPlatformAdapter(Platform): if media: payload["media"] = media payload["msg_type"] = 7 + payload.pop("msg_id", None) ret = await self.client.api.post_group_message( group_openid=session.session_id, **payload, @@ -266,6 +268,9 @@ class QQOfficialPlatformAdapter(Platform): if media: payload["media"] = media payload["msg_type"] = 7 + # QQ API rejects msg_id for media (video/file) messages sent + # via the proactive tool-call path; remove it to avoid 越权 error. + payload.pop("msg_id", None) if file_source: media = await QQOfficialMessageEvent.upload_group_and_c2c_media( send_helper, # type: ignore @@ -277,6 +282,7 @@ class QQOfficialPlatformAdapter(Platform): if media: payload["media"] = media payload["msg_type"] = 7 + payload.pop("msg_id", None) ret = await QQOfficialMessageEvent.post_c2c_message( send_helper, # type: ignore