From 140ddc70e683bfc9ee89e4fd94c1d11244b93c35 Mon Sep 17 00:00:00 2001 From: Gao Jinzhe <2968474907@qq.com> Date: Wed, 23 Jul 2025 00:37:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BD=BF=E7=94=A8=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E9=94=81=E4=BF=9D=E8=AF=81=E5=88=86=E6=AE=B5=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E6=B6=88=E6=81=AF=E5=8F=91=E9=80=81=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F=20(#2130)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 优化分段消息发送逻辑,为分段消息添加消息队列 * 删除了不必要的代码 * style: code quality * 将消息队列机制重构为会话锁机制 * perf: narrow the lock scope * refactor: replace get_lock with async context manager for session locks * refactor: optimize session lock management with defaultdict --------- Co-authored-by: Soulter <905617992@qq.com> Co-authored-by: Raven95676 --- astrbot/core/pipeline/respond/stage.py | 40 ++++++++++++++------------ astrbot/core/utils/session_lock.py | 29 +++++++++++++++++++ 2 files changed, 50 insertions(+), 19 deletions(-) create mode 100644 astrbot/core/utils/session_lock.py diff --git a/astrbot/core/pipeline/respond/stage.py b/astrbot/core/pipeline/respond/stage.py index 50b436043..54ad1e63b 100644 --- a/astrbot/core/pipeline/respond/stage.py +++ b/astrbot/core/pipeline/respond/stage.py @@ -13,6 +13,7 @@ from astrbot.core.message.message_event_result import BaseMessageComponent from astrbot.core.star.star_handler import star_handlers_registry, EventType from astrbot.core.star.star import star_map from astrbot.core.utils.path_util import path_Mapping +from astrbot.core.utils.session_lock import session_lock_manager @register_stage @@ -177,25 +178,26 @@ class RespondStage(Stage): result.chain.remove(comp) break - for rcomp in record_comps: - i = await self._calc_comp_interval(rcomp) - await asyncio.sleep(i) - try: - await event.send(MessageChain([rcomp])) - except Exception as e: - logger.error(f"发送消息失败: {e} chain: {result.chain}") - break - - # 分段回复 - for comp in non_record_comps: - i = await self._calc_comp_interval(comp) - await asyncio.sleep(i) - try: - await event.send(MessageChain([*decorated_comps, comp])) - decorated_comps = [] # 清空已发送的装饰组件 - except Exception as e: - logger.error(f"发送消息失败: {e} chain: {result.chain}") - break + # leverage lock to guarentee the order of message sending among different events + async with session_lock_manager.acquire_lock(event.unified_msg_origin): + for rcomp in record_comps: + i = await self._calc_comp_interval(rcomp) + await asyncio.sleep(i) + try: + await event.send(MessageChain([rcomp])) + except Exception as e: + logger.error(f"发送消息失败: {e} chain: {result.chain}") + break + # 分段回复 + for comp in non_record_comps: + i = await self._calc_comp_interval(comp) + await asyncio.sleep(i) + try: + await event.send(MessageChain([*decorated_comps, comp])) + decorated_comps = [] # 清空已发送的装饰组件 + except Exception as e: + logger.error(f"发送消息失败: {e} chain: {result.chain}") + break else: for rcomp in record_comps: try: diff --git a/astrbot/core/utils/session_lock.py b/astrbot/core/utils/session_lock.py new file mode 100644 index 000000000..912d91e53 --- /dev/null +++ b/astrbot/core/utils/session_lock.py @@ -0,0 +1,29 @@ +import asyncio +from collections import defaultdict +from contextlib import asynccontextmanager + + +class SessionLockManager: + def __init__(self): + self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self._lock_count: dict[str, int] = defaultdict(int) + self._access_lock = asyncio.Lock() + + @asynccontextmanager + async def acquire_lock(self, session_id: str): + async with self._access_lock: + lock = self._locks[session_id] + self._lock_count[session_id] += 1 + + try: + async with lock: + yield + finally: + async with self._access_lock: + self._lock_count[session_id] -= 1 + if self._lock_count[session_id] == 0: + self._locks.pop(session_id, None) + self._lock_count.pop(session_id, None) + + +session_lock_manager = SessionLockManager()