93decaa997
* test: add comprehensive tests for core lifecycle and agent execution - Add core lifecycle unit tests - Add main agent execution tests - Add computer use tests - Enhance event bus tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: 更新用户查询标题生成逻辑,确保处理为纯文本并忽略内部指令 refactor(tests): 移除测试文件中的循环导入注释 refactor(tests): 优化计算机客户端测试,简化不可用引导程序的处理逻辑 * fix(event_bus): 优化事件处理逻辑,简化配置检查并增强错误日志记录,优化了测试内容 * fix(astr_main_agent): 简化 LLM 安全模式系统提示的设置逻辑 * test: enhance persona resolution in mock context for persona management tests --------- Co-authored-by: whatevertogo <whatevertogo@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Soulter <905617992@qq.com>
69 lines
2.8 KiB
Python
69 lines
2.8 KiB
Python
"""事件总线, 用于处理事件的分发和处理
|
|
事件总线是一个异步队列, 用于接收各种消息事件, 并将其发送到Scheduler调度器进行处理
|
|
其中包含了一个无限循环的调度函数, 用于从事件队列中获取新的事件, 并创建一个新的异步任务来执行管道调度器的处理逻辑
|
|
|
|
class:
|
|
EventBus: 事件总线, 用于处理事件的分发和处理
|
|
|
|
工作流程:
|
|
1. 维护一个异步队列, 来接受各种消息事件
|
|
2. 无限循环的调度函数, 从事件队列中获取新的事件, 打印日志并创建一个新的异步任务来执行管道调度器的处理逻辑
|
|
"""
|
|
|
|
import asyncio
|
|
from asyncio import Queue
|
|
|
|
from astrbot.core import logger
|
|
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
|
|
from astrbot.core.pipeline.scheduler import PipelineScheduler
|
|
|
|
from .platform import AstrMessageEvent
|
|
|
|
|
|
class EventBus:
|
|
"""用于处理事件的分发和处理"""
|
|
|
|
def __init__(
|
|
self,
|
|
event_queue: Queue,
|
|
pipeline_scheduler_mapping: dict[str, PipelineScheduler],
|
|
astrbot_config_mgr: AstrBotConfigManager,
|
|
) -> None:
|
|
self.event_queue = event_queue # 事件队列
|
|
# abconf uuid -> scheduler
|
|
self.pipeline_scheduler_mapping = pipeline_scheduler_mapping
|
|
self.astrbot_config_mgr = astrbot_config_mgr
|
|
|
|
async def dispatch(self) -> None:
|
|
while True:
|
|
event: AstrMessageEvent = await self.event_queue.get()
|
|
conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin)
|
|
conf_id = conf_info["id"]
|
|
conf_name = conf_info.get("name") or conf_id
|
|
self._print_event(event, conf_name)
|
|
scheduler = self.pipeline_scheduler_mapping.get(conf_id)
|
|
if not scheduler:
|
|
logger.error(
|
|
f"PipelineScheduler not found for id: {conf_id}, event ignored."
|
|
)
|
|
continue
|
|
asyncio.create_task(scheduler.execute(event))
|
|
|
|
def _print_event(self, event: AstrMessageEvent, conf_name: str) -> None:
|
|
"""用于记录事件信息
|
|
|
|
Args:
|
|
event (AstrMessageEvent): 事件对象
|
|
|
|
"""
|
|
# 如果有发送者名称: [平台名] 发送者名称/发送者ID: 消息概要
|
|
if event.get_sender_name():
|
|
logger.info(
|
|
f"[{conf_name}] [{event.get_platform_id()}({event.get_platform_name()})] {event.get_sender_name()}/{event.get_sender_id()}: {event.get_message_outline()}",
|
|
)
|
|
# 没有发送者名称: [平台名] 发送者ID: 消息概要
|
|
else:
|
|
logger.info(
|
|
f"[{conf_name}] [{event.get_platform_id()}({event.get_platform_name()})] {event.get_sender_id()}: {event.get_message_outline()}",
|
|
)
|