diff --git a/astrbot/core/pipeline/respond/stage.py b/astrbot/core/pipeline/respond/stage.py index 50b436043..54ad1e63b 100644 --- a/astrbot/core/pipeline/respond/stage.py +++ b/astrbot/core/pipeline/respond/stage.py @@ -13,6 +13,7 @@ from astrbot.core.message.message_event_result import BaseMessageComponent from astrbot.core.star.star_handler import star_handlers_registry, EventType from astrbot.core.star.star import star_map from astrbot.core.utils.path_util import path_Mapping +from astrbot.core.utils.session_lock import session_lock_manager @register_stage @@ -177,25 +178,26 @@ class RespondStage(Stage): result.chain.remove(comp) break - for rcomp in record_comps: - i = await self._calc_comp_interval(rcomp) - await asyncio.sleep(i) - try: - await event.send(MessageChain([rcomp])) - except Exception as e: - logger.error(f"发送消息失败: {e} chain: {result.chain}") - break - - # 分段回复 - for comp in non_record_comps: - i = await self._calc_comp_interval(comp) - await asyncio.sleep(i) - try: - await event.send(MessageChain([*decorated_comps, comp])) - decorated_comps = [] # 清空已发送的装饰组件 - except Exception as e: - logger.error(f"发送消息失败: {e} chain: {result.chain}") - break + # leverage lock to guarentee the order of message sending among different events + async with session_lock_manager.acquire_lock(event.unified_msg_origin): + for rcomp in record_comps: + i = await self._calc_comp_interval(rcomp) + await asyncio.sleep(i) + try: + await event.send(MessageChain([rcomp])) + except Exception as e: + logger.error(f"发送消息失败: {e} chain: {result.chain}") + break + # 分段回复 + for comp in non_record_comps: + i = await self._calc_comp_interval(comp) + await asyncio.sleep(i) + try: + await event.send(MessageChain([*decorated_comps, comp])) + decorated_comps = [] # 清空已发送的装饰组件 + except Exception as e: + logger.error(f"发送消息失败: {e} chain: {result.chain}") + break else: for rcomp in record_comps: try: diff --git a/astrbot/core/utils/session_lock.py b/astrbot/core/utils/session_lock.py new file mode 100644 index 000000000..912d91e53 --- /dev/null +++ b/astrbot/core/utils/session_lock.py @@ -0,0 +1,29 @@ +import asyncio +from collections import defaultdict +from contextlib import asynccontextmanager + + +class SessionLockManager: + def __init__(self): + self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self._lock_count: dict[str, int] = defaultdict(int) + self._access_lock = asyncio.Lock() + + @asynccontextmanager + async def acquire_lock(self, session_id: str): + async with self._access_lock: + lock = self._locks[session_id] + self._lock_count[session_id] += 1 + + try: + async with lock: + yield + finally: + async with self._access_lock: + self._lock_count[session_id] -= 1 + if self._lock_count[session_id] == 0: + self._locks.pop(session_id, None) + self._lock_count.pop(session_id, None) + + +session_lock_manager = SessionLockManager()