Files
AstrBot/astrbot/core/message/message_event_result.py
T
エイカク 451ad685ae feat: 集成 DeerFlow Agent Runner 并优化流式处理 (#5581)
* feat: integrate DeerFlow agent runner and improve stream handling

* refactor: split DeerFlow stream flow and close stale client on reset

* fix: enforce max_step and correct timeout type check

* fix: harden DeerFlow config parsing and session lifecycle

* fix: preserve third-party runner error semantics and harden image parsing

* perf: bound DeerFlow values history and seen-id cache

* refactor: improve deerflow stream semantics and client lifecycle

* fix: harden third-party runner error semantics and fallback aggregation

* refactor: reduce deerflow image log noise and lazy-init api session

* perf: avoid unnecessary iterable copies in deerflow stream utils

* refactor: centralize runner error key and clarify deerflow client lifecycle

* refactor: simplify third-party runner output flow

* fix: defer streaming runner cleanup and unify error mapping

* fix: handle id-less values messages and redact stream payload logs

* fix: improve deerflow error signaling and third-party runner flow

* fix: support deerflow proxy and refine runner lifecycle

* fix: tighten deerflow image validation and runner lifecycle

* feat: support deerflow image output components

* fix: harden runner stream cleanup and refactor deerflow config

* fix: preserve deerflow done hook and simplify runner lifecycle

* refactor: simplify third-party runner aggregation and lifecycle closing

* fix: preserve first deerflow values payload and simplify runner flow

* refactor: unify runner final resolution and harden deerflow close state

* refactor: share int coercion and make deerflow close best effort

* refactor: extract deerflow mappers and streamline third-party lifecycle

* refactor: simplify third-party flow and harden sse flush parsing

* fix: make deerflow runner close path best effort

* refactor: simplify third-party orchestration and centralize deerflow keys

* refactor: simplify third-party chunk flow and deerflow finalization

* fix: harden deerflow stream parsing and simplify runner lifecycle

* refactor: remove redundant deerflow values text assignment

* fix: improve deerflow timeout diagnostics and image input feedback

* refactor: flatten third-party runner lifecycle and aggregation

* chore: use deerflow official remote svg icon

* chore: remove unused deerflow local logo asset
2026-03-01 12:31:38 +09:00

261 lines
8.0 KiB
Python

import enum
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from typing_extensions import deprecated
from astrbot.core.message.components import (
At,
AtAll,
BaseMessageComponent,
Image,
Json,
Plain,
)
@dataclass
class MessageChain:
"""MessageChain 描述了一整条消息中带有的所有组件。
现代消息平台的一条富文本消息中可能由多个组件构成,如文本、图片、At 等,并且保留了顺序。
Attributes:
`chain` (list): 用于顺序存储各个组件。
`use_t2i_` (bool): 用于标记是否使用文本转图片服务。默认为 None,即跟随用户的设置。当设置为 True 时,将会使用文本转图片服务。
"""
chain: list[BaseMessageComponent] = field(default_factory=list)
use_t2i_: bool | None = None # None 为跟随用户设置
type: str | None = None
"""消息链承载的消息的类型。可选,用于让消息平台区分不同业务场景的消息链。"""
def message(self, message: str):
"""添加一条文本消息到消息链 `chain` 中。
Example:
CommandResult().message("Hello ").message("world!")
# 输出 Hello world!
"""
self.chain.append(Plain(message))
return self
def at(self, name: str, qq: str | int):
"""添加一条 At 消息到消息链 `chain` 中。
Example:
CommandResult().at("张三", "12345678910")
# 输出 @张三
"""
self.chain.append(At(name=name, qq=qq))
return self
def at_all(self):
"""添加一条 AtAll 消息到消息链 `chain` 中。
Example:
CommandResult().at_all()
# 输出 @所有人
"""
self.chain.append(AtAll())
return self
@deprecated("请使用 message 方法代替。")
def error(self, message: str):
"""添加一条错误消息到消息链 `chain` 中
Example:
CommandResult().error("解析失败")
"""
self.chain.append(Plain(message))
return self
def url_image(self, url: str):
"""添加一条图片消息(https 链接)到消息链 `chain` 中。
Note:
如果需要发送本地图片,请使用 `file_image` 方法。
Example:
CommandResult().image("https://example.com/image.jpg")
"""
self.chain.append(Image.fromURL(url))
return self
def file_image(self, path: str):
"""添加一条图片消息(本地文件路径)到消息链 `chain` 中。
Note:
如果需要发送网络图片,请使用 `url_image` 方法。
CommandResult().image("image.jpg")
"""
self.chain.append(Image.fromFileSystem(path))
return self
def base64_image(self, base64_str: str):
"""添加一条图片消息(base64 编码字符串)到消息链 `chain` 中。
Example:
CommandResult().base64_image("iVBORw0KGgoAAAANSUhEUgAAAAUA...")
"""
self.chain.append(Image.fromBase64(base64_str))
return self
def use_t2i(self, use_t2i: bool):
"""设置是否使用文本转图片服务。
Args:
use_t2i (bool): 是否使用文本转图片服务。默认为 None,即跟随用户的设置。当设置为 True 时,将会使用文本转图片服务。
"""
self.use_t2i_ = use_t2i
return self
def get_plain_text(self, with_other_comps_mark: bool = False) -> str:
"""获取纯文本消息。这个方法将获取 chain 中所有 Plain 组件的文本并拼接成一条消息。空格分隔。
Args:
with_other_comps_mark (bool): 是否在纯文本中标记其他组件的位置
"""
if not with_other_comps_mark:
return " ".join(
[comp.text for comp in self.chain if isinstance(comp, Plain)]
)
else:
texts = []
for comp in self.chain:
if isinstance(comp, Plain):
texts.append(comp.text)
elif isinstance(comp, Json):
texts.append(f"{comp.data}")
else:
texts.append(f"[{comp.__class__.__name__}]")
return " ".join(texts)
def squash_plain(self):
"""将消息链中的所有 Plain 消息段聚合到第一个 Plain 消息段中。"""
if not self.chain:
return None
new_chain = []
first_plain = None
plain_texts = []
for comp in self.chain:
if isinstance(comp, Plain):
if first_plain is None:
first_plain = comp
new_chain.append(comp)
plain_texts.append(comp.text)
else:
new_chain.append(comp)
if first_plain is not None:
first_plain.text = "".join(plain_texts)
self.chain = new_chain
return self
class EventResultType(enum.Enum):
"""用于描述事件处理的结果类型。
Attributes:
CONTINUE: 事件将会继续传播
STOP: 事件将会终止传播
"""
CONTINUE = enum.auto()
STOP = enum.auto()
class ResultContentType(enum.Enum):
"""用于描述事件结果的内容的类型。"""
LLM_RESULT = enum.auto()
"""调用 LLM 产生的结果"""
AGENT_RUNNER_ERROR = enum.auto()
"""第三方 Agent Runner 返回的错误结果"""
GENERAL_RESULT = enum.auto()
"""普通的消息结果"""
STREAMING_RESULT = enum.auto()
"""调用 LLM 产生的流式结果"""
STREAMING_FINISH = enum.auto()
"""流式输出完成"""
@dataclass
class MessageEventResult(MessageChain):
"""MessageEventResult 描述了一整条消息中带有的所有组件以及事件处理的结果。
现代消息平台的一条富文本消息中可能由多个组件构成,如文本、图片、At 等,并且保留了顺序。
Attributes:
`chain` (list): 用于顺序存储各个组件。
`use_t2i_` (bool): 用于标记是否使用文本转图片服务。默认为 None,即跟随用户的设置。当设置为 True 时,将会使用文本转图片服务。
`result_type` (EventResultType): 事件处理的结果类型。
"""
result_type: EventResultType | None = field(
default_factory=lambda: EventResultType.CONTINUE,
)
result_content_type: ResultContentType | None = field(
default_factory=lambda: ResultContentType.GENERAL_RESULT,
)
async_stream: AsyncGenerator | None = None
"""异步流"""
def stop_event(self) -> "MessageEventResult":
"""终止事件传播。"""
self.result_type = EventResultType.STOP
return self
def continue_event(self) -> "MessageEventResult":
"""继续事件传播。"""
self.result_type = EventResultType.CONTINUE
return self
def is_stopped(self) -> bool:
"""是否终止事件传播。"""
return self.result_type == EventResultType.STOP
def set_async_stream(self, stream: AsyncGenerator) -> "MessageEventResult":
"""设置异步流。"""
self.async_stream = stream
return self
def set_result_content_type(self, typ: ResultContentType) -> "MessageEventResult":
"""设置事件处理的结果类型。
Args:
result_type (EventResultType): 事件处理的结果类型。
"""
self.result_content_type = typ
return self
def is_llm_result(self) -> bool:
"""是否为 LLM 结果。"""
return self.result_content_type == ResultContentType.LLM_RESULT
def is_model_result(self) -> bool:
"""Whether result comes from model execution (including runner errors)."""
return self.result_content_type in (
ResultContentType.LLM_RESULT,
ResultContentType.AGENT_RUNNER_ERROR,
)
# 为了兼容旧版代码,保留 CommandResult 的别名
CommandResult = MessageEventResult