From a69195a02ba3f9633011e53114635d5102113569 Mon Sep 17 00:00:00 2001
From: Soulter <37870767+Soulter@users.noreply.github.com>
Date: Sat, 27 Sep 2025 17:57:12 +0800
Subject: [PATCH] fix: webchat streaming queue interrupted after user closing
tab (#2892)
* feat: add toast notification system with snackbar component
* feat: enhance chat functionality with conversation running state and notifications
* fix: update bot message avatar rendering during streaming
* feat: implement conversation tracking context manager for webchat
* fix: update conversation tracking to remove conversation ID on exit
---
.../process_stage/method/llm_request.py | 15 +--
astrbot/dashboard/routes/chat.py | 98 +++++++++++++------
dashboard/src/App.vue | 23 ++++-
dashboard/src/components/chat/Chat.vue | 55 ++++++++---
dashboard/src/components/chat/MessageList.vue | 11 +--
dashboard/src/stores/toast.js | 31 ++++++
dashboard/src/utils/toast.js | 16 +++
7 files changed, 190 insertions(+), 59 deletions(-)
create mode 100644 dashboard/src/stores/toast.js
create mode 100644 dashboard/src/utils/toast.js
diff --git a/astrbot/core/pipeline/process_stage/method/llm_request.py b/astrbot/core/pipeline/process_stage/method/llm_request.py
index 4219ae518..e3ede6c55 100644
--- a/astrbot/core/pipeline/process_stage/method/llm_request.py
+++ b/astrbot/core/pipeline/process_stage/method/llm_request.py
@@ -291,13 +291,6 @@ async def run_agent(
else:
astr_event.set_result(MessageEventResult().message(err_msg))
return
- asyncio.create_task(
- Metric.upload(
- llm_tick=1,
- model_name=agent_runner.provider.get_model(),
- provider_type=agent_runner.provider.meta().type,
- )
- )
class LLMRequestSubStage(Stage):
@@ -524,6 +517,14 @@ class LLMRequestSubStage(Stage):
if event.get_platform_name() == "webchat":
asyncio.create_task(self._handle_webchat(event, req, provider))
+ asyncio.create_task(
+ Metric.upload(
+ llm_tick=1,
+ model_name=agent_runner.provider.get_model(),
+ provider_type=agent_runner.provider.meta().type,
+ )
+ )
+
async def _handle_webchat(
self, event: AstrMessageEvent, req: ProviderRequest, prov: Provider
):
diff --git a/astrbot/dashboard/routes/chat.py b/astrbot/dashboard/routes/chat.py
index 3a52476c2..71fd3472b 100644
--- a/astrbot/dashboard/routes/chat.py
+++ b/astrbot/dashboard/routes/chat.py
@@ -1,17 +1,27 @@
import uuid
import json
import os
+import asyncio
+from contextlib import asynccontextmanager
from .route import Route, Response, RouteContext
from astrbot.core.platform.sources.webchat.webchat_queue_mgr import webchat_queue_mgr
from quart import request, Response as QuartResponse, g, make_response
from astrbot.core.db import BaseDatabase
-import asyncio
from astrbot.core import logger
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
from astrbot.core.platform.astr_message_event import MessageSession
+@asynccontextmanager
+async def track_conversation(convs: dict, conv_id: str):
+ convs[conv_id] = True
+ try:
+ yield
+ finally:
+ convs.pop(conv_id, None)
+
+
class ChatRoute(Route):
def __init__(
self,
@@ -40,6 +50,8 @@ class ChatRoute(Route):
self.conv_mgr = core_lifecycle.conversation_manager
self.platform_history_mgr = core_lifecycle.platform_message_history_manager
+ self.running_convs: dict[str, bool] = {}
+
async def get_file(self):
filename = request.args.get("filename")
if not filename:
@@ -139,42 +151,63 @@ class ChatRoute(Route):
)
async def stream():
+ client_disconnected = False
+
try:
- while True:
- try:
- result = await asyncio.wait_for(back_queue.get(), timeout=10)
- except asyncio.TimeoutError:
- continue
+ async with track_conversation(self.running_convs, webchat_conv_id):
+ while True:
+ try:
+ result = await asyncio.wait_for(back_queue.get(), timeout=1)
+ except asyncio.TimeoutError:
+ continue
+ except asyncio.CancelledError:
+ logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
+ client_disconnected = True
+ except Exception as e:
+ logger.error(f"WebChat stream error: {e}")
- if not result:
- continue
+ if not result:
+ continue
- result_text = result["data"]
- type = result.get("type")
- streaming = result.get("streaming", False)
- yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n"
- await asyncio.sleep(0.05)
+ result_text = result["data"]
+ type = result.get("type")
+ streaming = result.get("streaming", False)
- if type == "end":
- break
- elif (
- (streaming and type == "complete")
- or not streaming
- or type == "break"
- ):
- # append bot message
- new_his = {"type": "bot", "message": result_text}
- await self.platform_history_mgr.insert(
- platform_id="webchat",
- user_id=webchat_conv_id,
- content=new_his,
- sender_id="bot",
- sender_name="bot",
- )
+ try:
+ if not client_disconnected:
+ yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n"
+ except Exception as e:
+ if not client_disconnected:
+ logger.debug(
+ f"[WebChat] 用户 {username} 断开聊天长连接。 {e}"
+ )
+ client_disconnected = True
- except BaseException as _:
- logger.debug(f"用户 {username} 断开聊天长连接。")
- return
+ try:
+ if not client_disconnected:
+ await asyncio.sleep(0.05)
+ except asyncio.CancelledError:
+ logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
+ client_disconnected = True
+
+ if type == "end":
+ break
+ elif (
+ (streaming and type == "complete")
+ or not streaming
+ or type == "break"
+ ):
+ # append bot message
+ new_his = {"type": "bot", "message": result_text}
+ await self.platform_history_mgr.insert(
+ platform_id="webchat",
+ user_id=webchat_conv_id,
+ content=new_his,
+ sender_id="bot",
+ sender_name="bot",
+ )
+ except BaseException as e:
+ logger.exception(f"WebChat stream unexpected error: {e}", exc_info=True)
# Put message to conversation-specific queue
chat_queue = webchat_queue_mgr.get_or_create_queue(webchat_conv_id)
@@ -291,6 +324,7 @@ class ChatRoute(Route):
.ok(
data={
"history": history_res,
+ "is_running": self.running_convs.get(webchat_conv_id, False),
}
)
.__dict__
diff --git a/dashboard/src/App.vue b/dashboard/src/App.vue
index bf88393fd..8f2b8e7b3 100644
--- a/dashboard/src/App.vue
+++ b/dashboard/src/App.vue
@@ -1,7 +1,28 @@
- AstrBot
+ AstrBot