From 45d36f86fdb3adeacc765c5a381c516c0fafe2d9 Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Sun, 11 May 2025 21:22:14 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96=E9=99=90=E6=B5=81?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A1=AE=E4=BF=9D=E5=9C=A8=E8=BE=BE?= =?UTF-8?q?=E5=88=B0=E9=99=90=E6=B5=81=E9=98=88=E5=80=BC=E6=97=B6=E6=AD=A3?= =?UTF-8?q?=E7=A1=AE=E5=A4=84=E7=90=86=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/pipeline/rate_limit_check/stage.py | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/astrbot/core/pipeline/rate_limit_check/stage.py b/astrbot/core/pipeline/rate_limit_check/stage.py index 7550d84e0..b36a2fbd0 100644 --- a/astrbot/core/pipeline/rate_limit_check/stage.py +++ b/astrbot/core/pipeline/rate_limit_check/stage.py @@ -58,33 +58,30 @@ class RateLimitStage(Stage): now = datetime.now() async with self.locks[session_id]: # 确保同一会话不会并发修改队列 - timestamps = self.event_timestamps[session_id] + # 检查并处理限流,可能需要多次检查直到满足条件 + while True: + timestamps = self.event_timestamps[session_id] + self._remove_expired_timestamps(timestamps, now) - self._remove_expired_timestamps(timestamps, now) + if len(timestamps) < self.rate_limit_count: + timestamps.append(now) + break + else: + next_window_time = timestamps[0] + self.rate_limit_time + stall_duration = (next_window_time - now).total_seconds() + 0.3 - if len(timestamps) >= self.rate_limit_count: - # 达到限流阈值,计算下一个窗口的时间 - next_window_time = timestamps[0] + self.rate_limit_time - stall_duration = (next_window_time - now).total_seconds() - - match self.rl_strategy: - case RateLimitStrategy.STALL.value: - logger.info( - f"会话 {session_id} 被限流。根据限流策略,此会话处理将被暂停 {stall_duration:.2f} 秒。" - ) - await asyncio.sleep(stall_duration) - case RateLimitStrategy.DISCARD.value: - # event.set_result(MessageEventResult().message(f"会话 {session_id} 被限流。根据限流策略,此请求已被丢弃,直到您的限额于 {stall_duration:.2f} 秒后重置。")) - logger.info( - f"会话 {session_id} 被限流。根据限流策略,此请求已被丢弃,直到限额于 {stall_duration:.2f} 秒后重置。" - ) - return event.stop_event() - - self._remove_expired_timestamps( - timestamps, now + timedelta(seconds=stall_duration) - ) - - timestamps.append(now) + match self.rl_strategy: + case RateLimitStrategy.STALL.value: + logger.info( + f"会话 {session_id} 被限流。根据限流策略,此会话处理将被暂停 {stall_duration:.2f} 秒。" + ) + await asyncio.sleep(stall_duration) + now = datetime.now() + case RateLimitStrategy.DISCARD.value: + logger.info( + f"会话 {session_id} 被限流。根据限流策略,此请求已被丢弃,直到限额于 {stall_duration:.2f} 秒后重置。" + ) + return event.stop_event() def _remove_expired_timestamps( self, timestamps: Deque[datetime], now: datetime