93decaa997
* test: add comprehensive tests for core lifecycle and agent execution - Add core lifecycle unit tests - Add main agent execution tests - Add computer use tests - Enhance event bus tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: 更新用户查询标题生成逻辑,确保处理为纯文本并忽略内部指令 refactor(tests): 移除测试文件中的循环导入注释 refactor(tests): 优化计算机客户端测试,简化不可用引导程序的处理逻辑 * fix(event_bus): 优化事件处理逻辑,简化配置检查并增强错误日志记录,优化了测试内容 * fix(astr_main_agent): 简化 LLM 安全模式系统提示的设置逻辑 * test: enhance persona resolution in mock context for persona management tests --------- Co-authored-by: whatevertogo <whatevertogo@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Soulter <905617992@qq.com>
702 lines
25 KiB
Python
702 lines
25 KiB
Python
"""Tests for EventBus."""
|
|
|
|
import asyncio
|
|
from contextlib import suppress
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from astrbot.core.event_bus import EventBus
|
|
|
|
|
|
@pytest.fixture
|
|
def event_queue():
|
|
"""Create an event queue."""
|
|
return asyncio.Queue()
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_pipeline_scheduler():
|
|
"""Create a mock pipeline scheduler."""
|
|
scheduler = MagicMock()
|
|
scheduler.execute = AsyncMock()
|
|
return scheduler
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_config_manager():
|
|
"""Create a mock config manager."""
|
|
config_mgr = MagicMock()
|
|
config_mgr.get_conf_info = MagicMock(
|
|
return_value={"id": "test-conf-id", "name": "Test Config"}
|
|
)
|
|
return config_mgr
|
|
|
|
|
|
@pytest.fixture
|
|
def event_bus(event_queue, mock_pipeline_scheduler, mock_config_manager):
|
|
"""Create an EventBus instance."""
|
|
return EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping={"test-conf-id": mock_pipeline_scheduler},
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
|
|
class TestEventBusInit:
|
|
"""Tests for EventBus initialization."""
|
|
|
|
def test_init(self, event_queue, mock_pipeline_scheduler, mock_config_manager):
|
|
"""Test EventBus initialization."""
|
|
bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping={"test": mock_pipeline_scheduler},
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
assert bus.event_queue == event_queue
|
|
assert bus.pipeline_scheduler_mapping == {"test": mock_pipeline_scheduler}
|
|
assert bus.astrbot_config_mgr == mock_config_manager
|
|
|
|
|
|
class TestEventBusDispatch:
|
|
"""Tests for EventBus dispatch method."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_processes_event(
|
|
self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager
|
|
):
|
|
"""Test that dispatch processes an event from the queue."""
|
|
processed = asyncio.Event()
|
|
|
|
async def execute_and_signal(event): # noqa: ARG001
|
|
processed.set()
|
|
|
|
mock_pipeline_scheduler.execute.side_effect = execute_and_signal
|
|
|
|
# Create a mock event
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "test-platform:group:123"
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = "TestUser"
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Hello"
|
|
|
|
# Put event in queue
|
|
await event_queue.put(mock_event)
|
|
|
|
# Start dispatch in background and cancel after processing
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Verify scheduler was called
|
|
mock_pipeline_scheduler.execute.assert_called_once_with(mock_event)
|
|
mock_config_manager.get_conf_info.assert_called_once_with(
|
|
"test-platform:group:123"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_handles_missing_scheduler(
|
|
self,
|
|
event_bus,
|
|
event_queue,
|
|
mock_config_manager,
|
|
mock_pipeline_scheduler,
|
|
):
|
|
"""Test that dispatch handles missing scheduler gracefully."""
|
|
logged = asyncio.Event()
|
|
|
|
def error_and_signal(*args, **kwargs): # noqa: ARG001
|
|
logged.set()
|
|
|
|
# Configure to return a config ID that has no scheduler
|
|
mock_config_manager.get_conf_info.return_value = {
|
|
"id": "missing-scheduler",
|
|
"name": "Missing Config",
|
|
}
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "test-platform:group:123"
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = None
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Hello"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
with patch("astrbot.core.event_bus.logger") as mock_logger:
|
|
mock_logger.error.side_effect = error_and_signal
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(logged.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
mock_logger.error.assert_called_once()
|
|
assert "missing-scheduler" in mock_logger.error.call_args[0][0]
|
|
|
|
mock_pipeline_scheduler.execute.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_multiple_events(
|
|
self, event_bus, event_queue, mock_pipeline_scheduler, mock_config_manager
|
|
):
|
|
"""Test that dispatch processes multiple events."""
|
|
processed_all = asyncio.Event()
|
|
processed_count = 0
|
|
|
|
async def execute_and_count(event): # noqa: ARG001
|
|
nonlocal processed_count
|
|
processed_count += 1
|
|
if processed_count == 3:
|
|
processed_all.set()
|
|
|
|
mock_pipeline_scheduler.execute.side_effect = execute_and_count
|
|
|
|
events = []
|
|
for i in range(3):
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = f"test-platform:group:{i}"
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = f"User{i}"
|
|
mock_event.get_sender_id.return_value = f"user{i}"
|
|
mock_event.get_message_outline.return_value = f"Message {i}"
|
|
events.append(mock_event)
|
|
await event_queue.put(mock_event)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed_all.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
assert mock_pipeline_scheduler.execute.call_count == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_falls_back_to_conf_id_when_name_missing(
|
|
self,
|
|
event_bus,
|
|
event_queue,
|
|
mock_config_manager,
|
|
mock_pipeline_scheduler,
|
|
):
|
|
"""Test that missing conf name does not block dispatch."""
|
|
processed = asyncio.Event()
|
|
mock_config_manager.get_conf_info.return_value = {
|
|
"id": "test-conf-id",
|
|
}
|
|
|
|
async def execute_and_signal(event): # noqa: ARG001
|
|
processed.set()
|
|
|
|
mock_pipeline_scheduler.execute.side_effect = execute_and_signal
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "test-platform:group:123"
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = "TestUser"
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Hello"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
with patch.object(event_bus, "_print_event") as mock_print_event:
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
mock_print_event.assert_called_once_with(mock_event, "test-conf-id")
|
|
mock_pipeline_scheduler.execute.assert_called_once_with(mock_event)
|
|
|
|
|
|
class TestPrintEvent:
|
|
"""Tests for _print_event method."""
|
|
|
|
def test_print_event_with_sender_name(self, event_bus):
|
|
"""Test printing event with sender name."""
|
|
mock_event = MagicMock()
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = "TestUser"
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Hello"
|
|
|
|
with patch("astrbot.core.event_bus.logger") as mock_logger:
|
|
event_bus._print_event(mock_event, "TestConfig")
|
|
|
|
mock_logger.info.assert_called_once()
|
|
call_args = mock_logger.info.call_args[0][0]
|
|
assert "TestConfig" in call_args
|
|
assert "TestUser" in call_args
|
|
assert "user123" in call_args
|
|
assert "Hello" in call_args
|
|
|
|
def test_print_event_without_sender_name(self, event_bus):
|
|
"""Test printing event without sender name."""
|
|
mock_event = MagicMock()
|
|
mock_event.get_platform_id.return_value = "test-platform"
|
|
mock_event.get_platform_name.return_value = "Test Platform"
|
|
mock_event.get_sender_name.return_value = None
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Hello"
|
|
|
|
with patch("astrbot.core.event_bus.logger") as mock_logger:
|
|
event_bus._print_event(mock_event, "TestConfig")
|
|
|
|
mock_logger.info.assert_called_once()
|
|
call_args = mock_logger.info.call_args[0][0]
|
|
assert "TestConfig" in call_args
|
|
assert "user123" in call_args
|
|
assert "Hello" in call_args
|
|
# Should not have sender name separator
|
|
assert "/" not in call_args
|
|
|
|
|
|
class TestEventSubscription:
|
|
"""Tests for event subscription functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscriber_registration(self, event_queue, mock_config_manager):
|
|
"""Test registering a subscriber (scheduler) to the event bus."""
|
|
# Create multiple schedulers as subscribers
|
|
scheduler1 = MagicMock()
|
|
scheduler1.execute = AsyncMock()
|
|
scheduler2 = MagicMock()
|
|
scheduler2.execute = AsyncMock()
|
|
|
|
# Create EventBus with multiple subscribers
|
|
pipeline_mapping = {
|
|
"conf-id-1": scheduler1,
|
|
"conf-id-2": scheduler2,
|
|
}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
# Verify both subscribers are registered
|
|
assert "conf-id-1" in event_bus.pipeline_scheduler_mapping
|
|
assert "conf-id-2" in event_bus.pipeline_scheduler_mapping
|
|
assert event_bus.pipeline_scheduler_mapping["conf-id-1"] == scheduler1
|
|
assert event_bus.pipeline_scheduler_mapping["conf-id-2"] == scheduler2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_subscribers_receive_events(
|
|
self, event_queue, mock_config_manager
|
|
):
|
|
"""Test that events are dispatched to the correct subscriber based on config."""
|
|
processed = asyncio.Event()
|
|
call_tracker = {"scheduler1": False, "scheduler2": False}
|
|
mock_config_manager.get_conf_info.return_value = {
|
|
"id": "conf-id-1",
|
|
"name": "Test Config",
|
|
}
|
|
|
|
scheduler1 = MagicMock()
|
|
scheduler1.execute = AsyncMock()
|
|
|
|
async def execute_scheduler1(event): # noqa: ARG001
|
|
call_tracker["scheduler1"] = True
|
|
processed.set()
|
|
|
|
scheduler1.execute.side_effect = execute_scheduler1
|
|
|
|
scheduler2 = MagicMock()
|
|
scheduler2.execute = AsyncMock()
|
|
|
|
async def execute_scheduler2(event): # noqa: ARG001
|
|
call_tracker["scheduler2"] = True
|
|
|
|
scheduler2.execute.side_effect = execute_scheduler2
|
|
|
|
pipeline_mapping = {
|
|
"conf-id-1": scheduler1,
|
|
"conf-id-2": scheduler2,
|
|
}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "platform:group:123"
|
|
mock_event.get_platform_id.return_value = "platform"
|
|
mock_event.get_platform_name.return_value = "Platform"
|
|
mock_event.get_sender_name.return_value = "User"
|
|
mock_event.get_sender_id.return_value = "user1"
|
|
mock_event.get_message_outline.return_value = "Test"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Only scheduler1 should have been called (based on mock_config_manager default)
|
|
assert call_tracker["scheduler1"] is True
|
|
assert call_tracker["scheduler2"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsubscribe_by_removing_scheduler(
|
|
self, event_queue, mock_config_manager
|
|
):
|
|
"""Test that removing a scheduler effectively unsubscribes it."""
|
|
scheduler = MagicMock()
|
|
scheduler.execute = AsyncMock()
|
|
|
|
pipeline_mapping = {"conf-id": scheduler}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
# Verify scheduler is registered
|
|
assert "conf-id" in event_bus.pipeline_scheduler_mapping
|
|
|
|
# Remove the scheduler (unsubscribe)
|
|
del event_bus.pipeline_scheduler_mapping["conf-id"]
|
|
|
|
# Verify scheduler is no longer registered
|
|
assert "conf-id" not in event_bus.pipeline_scheduler_mapping
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subscriber_exception_handling(
|
|
self, event_queue, mock_config_manager
|
|
):
|
|
"""Test that exceptions in subscriber execution don't crash the event bus."""
|
|
exception_raised = asyncio.Event()
|
|
second_event_processed = asyncio.Event()
|
|
mock_config_manager.get_conf_info.return_value = {
|
|
"id": "conf-id-1",
|
|
"name": "Test Config",
|
|
}
|
|
|
|
scheduler1 = MagicMock()
|
|
scheduler1.execute = AsyncMock()
|
|
|
|
async def execute_with_exception(event): # noqa: ARG001
|
|
exception_raised.set()
|
|
raise RuntimeError("Subscriber error")
|
|
|
|
scheduler1.execute.side_effect = execute_with_exception
|
|
|
|
scheduler2 = MagicMock()
|
|
scheduler2.execute = AsyncMock()
|
|
|
|
async def execute_normal(event): # noqa: ARG001
|
|
second_event_processed.set()
|
|
|
|
scheduler2.execute.side_effect = execute_normal
|
|
|
|
pipeline_mapping = {
|
|
"conf-id-1": scheduler1,
|
|
"conf-id-2": scheduler2,
|
|
}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
# First event will cause exception
|
|
mock_event1 = MagicMock()
|
|
mock_event1.unified_msg_origin = "platform:group:1"
|
|
mock_event1.get_platform_id.return_value = "platform"
|
|
mock_event1.get_platform_name.return_value = "Platform"
|
|
mock_event1.get_sender_name.return_value = "User"
|
|
mock_event1.get_sender_id.return_value = "user1"
|
|
mock_event1.get_message_outline.return_value = "Test"
|
|
|
|
await event_queue.put(mock_event1)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(exception_raised.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Verify the scheduler was called (exception occurred but didn't crash)
|
|
scheduler1.execute.assert_called_once()
|
|
|
|
|
|
class TestEventFiltering:
|
|
"""Tests for event filtering functionality."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_by_event_origin(self, event_queue):
|
|
"""Test filtering events by their unified_msg_origin."""
|
|
scheduler1 = MagicMock()
|
|
scheduler1.execute = AsyncMock()
|
|
scheduler2 = MagicMock()
|
|
scheduler2.execute = AsyncMock()
|
|
|
|
config_mgr = MagicMock()
|
|
|
|
# Route different origins to different schedulers
|
|
def get_conf_info(origin):
|
|
if origin.startswith("telegram"):
|
|
return {"id": "telegram-conf", "name": "Telegram Config"}
|
|
elif origin.startswith("discord"):
|
|
return {"id": "discord-conf", "name": "Discord Config"}
|
|
return {"id": "default-conf", "name": "Default Config"}
|
|
|
|
config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info)
|
|
|
|
pipeline_mapping = {
|
|
"telegram-conf": scheduler1,
|
|
"discord-conf": scheduler2,
|
|
}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=config_mgr,
|
|
)
|
|
|
|
processed = asyncio.Event()
|
|
scheduler1.execute.side_effect = lambda e: processed.set() # noqa: ARG001
|
|
|
|
# Create Telegram event
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "telegram:private:123"
|
|
mock_event.get_platform_id.return_value = "telegram"
|
|
mock_event.get_platform_name.return_value = "Telegram"
|
|
mock_event.get_sender_name.return_value = "TGUser"
|
|
mock_event.get_sender_id.return_value = "tg123"
|
|
mock_event.get_message_outline.return_value = "TG Message"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Only telegram scheduler should be called
|
|
scheduler1.execute.assert_called_once()
|
|
scheduler2.execute.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_filter_by_message_content_type(
|
|
self, event_queue, mock_config_manager
|
|
):
|
|
"""Test filtering based on message content (e.g., group vs private)."""
|
|
processed = asyncio.Event()
|
|
scheduler = MagicMock()
|
|
scheduler.execute = AsyncMock()
|
|
|
|
async def execute_and_signal(event): # noqa: ARG001
|
|
processed.set()
|
|
|
|
scheduler.execute.side_effect = execute_and_signal
|
|
|
|
pipeline_mapping = {"test-conf-id": scheduler}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=mock_config_manager,
|
|
)
|
|
|
|
# Create event with group message origin
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "platform:group:456"
|
|
mock_event.get_platform_id.return_value = "platform"
|
|
mock_event.get_platform_name.return_value = "Platform"
|
|
mock_event.get_sender_name.return_value = "GroupUser"
|
|
mock_event.get_sender_id.return_value = "user456"
|
|
mock_event.get_message_outline.return_value = "Group message"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Verify config was queried with correct origin
|
|
mock_config_manager.get_conf_info.assert_called_once_with("platform:group:456")
|
|
scheduler.execute.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_combined_filter_conditions(self, event_queue):
|
|
"""Test filtering with combined conditions (platform + message type)."""
|
|
scheduler_telegram_group = MagicMock()
|
|
scheduler_telegram_group.execute = AsyncMock()
|
|
scheduler_telegram_private = MagicMock()
|
|
scheduler_telegram_private.execute = AsyncMock()
|
|
scheduler_discord = MagicMock()
|
|
scheduler_discord.execute = AsyncMock()
|
|
|
|
config_mgr = MagicMock()
|
|
|
|
def get_conf_info(origin):
|
|
# Combined filtering based on platform and message type
|
|
if origin.startswith("telegram:group"):
|
|
return {"id": "tg-group-conf", "name": "Telegram Group"}
|
|
elif origin.startswith("telegram:private"):
|
|
return {"id": "tg-private-conf", "name": "Telegram Private"}
|
|
elif origin.startswith("discord"):
|
|
return {"id": "discord-conf", "name": "Discord"}
|
|
return {"id": "unknown", "name": "Unknown"}
|
|
|
|
config_mgr.get_conf_info = MagicMock(side_effect=get_conf_info)
|
|
|
|
pipeline_mapping = {
|
|
"tg-group-conf": scheduler_telegram_group,
|
|
"tg-private-conf": scheduler_telegram_private,
|
|
"discord-conf": scheduler_discord,
|
|
}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=config_mgr,
|
|
)
|
|
|
|
processed = asyncio.Event()
|
|
scheduler_telegram_group.execute.side_effect = lambda e: processed.set() # noqa: ARG001
|
|
|
|
# Create Telegram group event
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "telegram:group:789"
|
|
mock_event.get_platform_id.return_value = "telegram"
|
|
mock_event.get_platform_name.return_value = "Telegram"
|
|
mock_event.get_sender_name.return_value = "GroupUser"
|
|
mock_event.get_sender_id.return_value = "user789"
|
|
mock_event.get_message_outline.return_value = "Group msg"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(processed.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Only telegram group scheduler should be called
|
|
scheduler_telegram_group.execute.assert_called_once()
|
|
scheduler_telegram_private.execute.assert_not_called()
|
|
scheduler_discord.execute.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_matching_filter_ignores_event(self, event_queue):
|
|
"""Test that events with no matching filter are ignored."""
|
|
error_logged = asyncio.Event()
|
|
|
|
scheduler = MagicMock()
|
|
scheduler.execute = AsyncMock()
|
|
|
|
config_mgr = MagicMock()
|
|
# Return a config ID that doesn't exist in pipeline_mapping
|
|
config_mgr.get_conf_info.return_value = {
|
|
"id": "nonexistent-conf",
|
|
"name": "Nonexistent",
|
|
}
|
|
|
|
pipeline_mapping = {"existing-conf": scheduler}
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=config_mgr,
|
|
)
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "unknown:platform:123"
|
|
mock_event.get_platform_id.return_value = "unknown"
|
|
mock_event.get_platform_name.return_value = "Unknown"
|
|
mock_event.get_sender_name.return_value = "User"
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Test"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
with patch("astrbot.core.event_bus.logger") as mock_logger:
|
|
mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() # noqa: ARG001
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(error_logged.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Verify error was logged
|
|
mock_logger.error.assert_called_once()
|
|
assert "nonexistent-conf" in mock_logger.error.call_args[0][0]
|
|
|
|
# Scheduler should not have been called
|
|
scheduler.execute.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_pipeline_mapping_filters_all(self, event_queue):
|
|
"""Test that empty pipeline mapping filters out all events."""
|
|
error_logged = asyncio.Event()
|
|
|
|
config_mgr = MagicMock()
|
|
config_mgr.get_conf_info.return_value = {
|
|
"id": "some-conf",
|
|
"name": "Some Config",
|
|
}
|
|
|
|
pipeline_mapping = {} # Empty mapping
|
|
event_bus = EventBus(
|
|
event_queue=event_queue,
|
|
pipeline_scheduler_mapping=pipeline_mapping,
|
|
astrbot_config_mgr=config_mgr,
|
|
)
|
|
|
|
mock_event = MagicMock()
|
|
mock_event.unified_msg_origin = "platform:group:123"
|
|
mock_event.get_platform_id.return_value = "platform"
|
|
mock_event.get_platform_name.return_value = "Platform"
|
|
mock_event.get_sender_name.return_value = "User"
|
|
mock_event.get_sender_id.return_value = "user123"
|
|
mock_event.get_message_outline.return_value = "Test"
|
|
|
|
await event_queue.put(mock_event)
|
|
|
|
with patch("astrbot.core.event_bus.logger") as mock_logger:
|
|
mock_logger.error.side_effect = lambda *args, **kwargs: error_logged.set() # noqa: ARG001
|
|
task = asyncio.create_task(event_bus.dispatch())
|
|
try:
|
|
await asyncio.wait_for(error_logged.wait(), timeout=1.0)
|
|
finally:
|
|
task.cancel()
|
|
with suppress(asyncio.CancelledError):
|
|
await task
|
|
|
|
# Verify error was logged for missing scheduler
|
|
mock_logger.error.assert_called_once()
|