diff --git a/astrbot/core/message/message_event_result.py b/astrbot/core/message/message_event_result.py index 4cc7fb842..83c03b7fc 100644 --- a/astrbot/core/message/message_event_result.py +++ b/astrbot/core/message/message_event_result.py @@ -152,4 +152,5 @@ class MessageEventResult(MessageChain): return self.result_content_type == ResultContentType.LLM_RESULT +# 为了兼容旧版代码,保留 CommandResult 的别名 CommandResult = MessageEventResult diff --git a/astrbot/core/pipeline/scheduler.py b/astrbot/core/pipeline/scheduler.py index c5339ac4b..d29c7ec80 100644 --- a/astrbot/core/pipeline/scheduler.py +++ b/astrbot/core/pipeline/scheduler.py @@ -23,7 +23,12 @@ class PipelineScheduler: await stage.initialize(self.ctx) async def _process_stages(self, event: AstrMessageEvent, from_stage=0): - """依次执行各个阶段""" + """依次执行各个阶段 + + Args: + event (AstrMessageEvent): 事件对象 + from_stage (int): 从第几个阶段开始执行, 默认从0开始 + """ for i in range(from_stage, len(registered_stages)): stage = registered_stages[i] # 获取当前要执行的阶段 # logger.debug(f"执行阶段 {stage.__class__ .__name__}") @@ -60,7 +65,11 @@ class PipelineScheduler: break async def execute(self, event: AstrMessageEvent): - """执行 pipeline""" + """执行 pipeline + + Args: + event (AstrMessageEvent): 事件对象 + """ await self._process_stages(event) # 如果没有发送操作, 则发送一个空消息, 以便于后续的处理 diff --git a/astrbot/core/pipeline/stage.py b/astrbot/core/pipeline/stage.py index ea87b29ee..c7d4ff792 100644 --- a/astrbot/core/pipeline/stage.py +++ b/astrbot/core/pipeline/stage.py @@ -22,14 +22,24 @@ class Stage(abc.ABC): @abc.abstractmethod async def initialize(self, ctx: PipelineContext) -> None: - """初始化阶段""" + """初始化阶段 + + Args: + ctx (PipelineContext): 消息管道上下文对象, 包括配置和插件管理器 + """ raise NotImplementedError @abc.abstractmethod async def process( self, event: AstrMessageEvent ) -> Union[None, AsyncGenerator[None, None]]: - """处理事件""" + """处理事件 + + Args: + event (AstrMessageEvent): 事件对象,包含事件的相关信息 + Returns: + Union[None, AsyncGenerator[None, None]]: 处理结果,可能是 None 或者异步生成器, 如果为 None 则表示不需要继续处理, 如果为异步生成器则表示需要继续处理(进入下一个阶段) + """ raise NotImplementedError async def _call_handler( @@ -40,9 +50,23 @@ class Stage(abc.ABC): *args, **kwargs, ) -> AsyncGenerator[None, None]: - """调用 Handler。""" - # 判断 handler 是否是类方法(通过装饰器注册的没有 __self__ 属性) - ready_to_call = None + """执行事件处理函数并处理其返回结果 + + 该方法负责调用处理函数并处理不同类型的返回值。它支持两种类型的处理函数: + 1. 异步生成器: 实现洋葱模型,每次yield都会将控制权交回上层 + 2. 协程: 执行一次并处理返回值 + + Args: + ctx (PipelineContext): 消息管道上下文对象 + event (AstrMessageEvent): 待处理的事件对象 + handler (Awaitable): 事件处理函数 + *args: 传递给handler的位置参数 + **kwargs: 传递给handler的关键字参数 + + Returns: + AsyncGenerator[None, None]: 异步生成器,用于在管道中传递控制流 + """ + ready_to_call = None # 一个协程或者异步生成器(async def) trace_ = None @@ -51,29 +75,36 @@ class Stage(abc.ABC): except TypeError as _: # 向下兼容 trace_ = traceback.format_exc() + # 以前的handler会额外传入一个参数, 但是context对象实际上在插件实例中有一份 ready_to_call = handler(event, ctx.plugin_manager.context, *args, **kwargs) if isinstance(ready_to_call, AsyncGenerator): - _has_yielded = False + # 如果是一个异步生成器, 进入洋葱模型 + _has_yielded = False # 是否返回过值 try: async for ret in ready_to_call: - # 如果处理函数是生成器,返回值只能是 MessageEventResult 或者 None(无返回值) + # 这里逐步执行异步生成器, 对于每个yield返回的ret, 执行下面的代码 + # 返回值只能是 MessageEventResult 或者 None(无返回值) _has_yielded = True if isinstance(ret, (MessageEventResult, CommandResult)): + # 如果返回值是 MessageEventResult, 设置结果并继续 event.set_result(ret) - yield + yield # 传递控制权给上一层的process函数 else: - yield ret + # 如果返回值是 None, 则不设置结果并继续 + # 继续执行后续阶段 + yield ret # 传递控制权给上一层的process函数 if not _has_yielded: + # 如果这个异步生成器没有执行到yield分支 yield except Exception as e: logger.error(f"Previous Error: {trace_}") raise e elif inspect.iscoroutine(ready_to_call): - # 如果只是一个 coroutine + # 如果只是一个协程, 直接执行 ret = await ready_to_call if isinstance(ret, (MessageEventResult, CommandResult)): event.set_result(ret) - yield + yield # 传递控制权给上一层的process函数 else: - yield ret + yield ret # 传递控制权给上一层的process函数