fix: webchat streaming queue interrupted after user closing tab (#2892)

* feat: add toast notification system with snackbar component

* feat: enhance chat functionality with conversation running state and notifications

* fix: update bot message avatar rendering during streaming

* feat: implement conversation tracking context manager for webchat

* fix: update conversation tracking to remove conversation ID on exit
This commit is contained in:
Soulter
2025-09-27 17:57:12 +08:00
committed by GitHub
parent 19d7438499
commit a69195a02b
7 changed files with 190 additions and 59 deletions
@@ -291,13 +291,6 @@ async def run_agent(
else:
astr_event.set_result(MessageEventResult().message(err_msg))
return
asyncio.create_task(
Metric.upload(
llm_tick=1,
model_name=agent_runner.provider.get_model(),
provider_type=agent_runner.provider.meta().type,
)
)
class LLMRequestSubStage(Stage):
@@ -524,6 +517,14 @@ class LLMRequestSubStage(Stage):
if event.get_platform_name() == "webchat":
asyncio.create_task(self._handle_webchat(event, req, provider))
asyncio.create_task(
Metric.upload(
llm_tick=1,
model_name=agent_runner.provider.get_model(),
provider_type=agent_runner.provider.meta().type,
)
)
async def _handle_webchat(
self, event: AstrMessageEvent, req: ProviderRequest, prov: Provider
):
+66 -32
View File
@@ -1,17 +1,27 @@
import uuid
import json
import os
import asyncio
from contextlib import asynccontextmanager
from .route import Route, Response, RouteContext
from astrbot.core.platform.sources.webchat.webchat_queue_mgr import webchat_queue_mgr
from quart import request, Response as QuartResponse, g, make_response
from astrbot.core.db import BaseDatabase
import asyncio
from astrbot.core import logger
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
from astrbot.core.platform.astr_message_event import MessageSession
@asynccontextmanager
async def track_conversation(convs: dict, conv_id: str):
convs[conv_id] = True
try:
yield
finally:
convs.pop(conv_id, None)
class ChatRoute(Route):
def __init__(
self,
@@ -40,6 +50,8 @@ class ChatRoute(Route):
self.conv_mgr = core_lifecycle.conversation_manager
self.platform_history_mgr = core_lifecycle.platform_message_history_manager
self.running_convs: dict[str, bool] = {}
async def get_file(self):
filename = request.args.get("filename")
if not filename:
@@ -139,42 +151,63 @@ class ChatRoute(Route):
)
async def stream():
client_disconnected = False
try:
while True:
try:
result = await asyncio.wait_for(back_queue.get(), timeout=10)
except asyncio.TimeoutError:
continue
async with track_conversation(self.running_convs, webchat_conv_id):
while True:
try:
result = await asyncio.wait_for(back_queue.get(), timeout=1)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
client_disconnected = True
except Exception as e:
logger.error(f"WebChat stream error: {e}")
if not result:
continue
if not result:
continue
result_text = result["data"]
type = result.get("type")
streaming = result.get("streaming", False)
yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.05)
result_text = result["data"]
type = result.get("type")
streaming = result.get("streaming", False)
if type == "end":
break
elif (
(streaming and type == "complete")
or not streaming
or type == "break"
):
# append bot message
new_his = {"type": "bot", "message": result_text}
await self.platform_history_mgr.insert(
platform_id="webchat",
user_id=webchat_conv_id,
content=new_his,
sender_id="bot",
sender_name="bot",
)
try:
if not client_disconnected:
yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n"
except Exception as e:
if not client_disconnected:
logger.debug(
f"[WebChat] 用户 {username} 断开聊天长连接。 {e}"
)
client_disconnected = True
except BaseException as _:
logger.debug(f"用户 {username} 断开聊天长连接。")
return
try:
if not client_disconnected:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
logger.debug(f"[WebChat] 用户 {username} 断开聊天长连接。")
client_disconnected = True
if type == "end":
break
elif (
(streaming and type == "complete")
or not streaming
or type == "break"
):
# append bot message
new_his = {"type": "bot", "message": result_text}
await self.platform_history_mgr.insert(
platform_id="webchat",
user_id=webchat_conv_id,
content=new_his,
sender_id="bot",
sender_name="bot",
)
except BaseException as e:
logger.exception(f"WebChat stream unexpected error: {e}", exc_info=True)
# Put message to conversation-specific queue
chat_queue = webchat_queue_mgr.get_or_create_queue(webchat_conv_id)
@@ -291,6 +324,7 @@ class ChatRoute(Route):
.ok(
data={
"history": history_res,
"is_running": self.running_convs.get(webchat_conv_id, False),
}
)
.__dict__
+22 -1
View File
@@ -1,7 +1,28 @@
<template>
<RouterView></RouterView>
<!-- 全局唯一 snackbar -->
<v-snackbar v-if="toastStore.current" v-model="snackbarShow" :color="toastStore.current.color"
:timeout="toastStore.current.timeout" :multi-line="toastStore.current.multiLine"
:location="toastStore.current.location" close-on-back>
{{ toastStore.current.message }}
<template #actions v-if="toastStore.current.closable">
<v-btn variant="text" @click="snackbarShow = false">关闭</v-btn>
</template>
</v-snackbar>
</template>
<script setup lang="ts">
<script setup>
import { RouterView } from 'vue-router';
import { computed } from 'vue'
import { useToastStore } from '@/stores/toast'
const toastStore = useToastStore()
const snackbarShow = computed({
get: () => !!toastStore.current,
set: (val) => {
if (!val) toastStore.shift()
}
})
</script>
+42 -13
View File
@@ -9,7 +9,8 @@
<div style="display: flex; align-items: center; justify-content: center; padding: 16px; padding-bottom: 0px;"
v-if="chatboxMode">
<img width="50" src="@/assets/images/astrbot_logo_mini.webp" alt="AstrBot Logo">
<span v-if="!sidebarCollapsed" style="font-weight: 1000; font-size: 26px; margin-left: 8px;">AstrBot</span>
<span v-if="!sidebarCollapsed"
style="font-weight: 1000; font-size: 26px; margin-left: 8px;">AstrBot</span>
</div>
@@ -46,7 +47,7 @@
|| tm('conversation.newConversation') }}</v-list-item-title>
<v-list-item-subtitle v-if="!sidebarCollapsed" class="timestamp">{{
formatDate(item.updated_at)
}}</v-list-item-subtitle>
}}</v-list-item-subtitle>
<template v-if="!sidebarCollapsed" v-slot:append>
<div class="conversation-actions">
@@ -118,8 +119,9 @@
</div>
<v-divider v-if="currCid && getCurrentConversation" class="conversation-divider"></v-divider>
<MessageList v-if="messages && messages.length > 0" :messages="messages" :isDark="isDark" :isStreaming="isStreaming"
@openImagePreview="openImagePreview" ref="messageList" />
<MessageList v-if="messages && messages.length > 0" :messages="messages" :isDark="isDark"
:isStreaming="isStreaming || isConvRunning" @openImagePreview="openImagePreview"
ref="messageList" />
<div class="welcome-container fade-in" v-else>
<div class="welcome-title">
<span>Hello, I'm</span>
@@ -145,9 +147,10 @@
<!-- 输入区域 -->
<div class="input-area fade-in">
<div
style="width: 85%; max-width: 900px; margin: 0 auto; border: 1px solid #e0e0e0; border-radius: 24px; padding: 4px;">
<textarea id="input-field" v-model="prompt" @keydown="handleInputKeyDown" :disabled="isStreaming"
@click:clear="clearMessage" placeholder="Ask AstrBot..."
style="width: 85%; max-width: 900px; margin: 0 auto; border: 1px solid #e0e0e0; border-radius: 24px;">
<textarea id="input-field" v-model="prompt" @keydown="handleInputKeyDown"
:disabled="isStreaming || isConvRunning" @click:clear="clearMessage"
placeholder="Ask AstrBot..."
style="width: 100%; resize: none; outline: none; border: 1px solid var(--v-theme-border); border-radius: 12px; padding: 8px 16px; min-height: 40px; font-family: inherit; font-size: 16px; background-color: var(--v-theme-surface);"></textarea>
<div
style="display: flex; justify-content: space-between; align-items: center; padding: 0px 8px;">
@@ -155,18 +158,21 @@
<!-- 选择提供商和模型 -->
<ProviderModelSelector ref="providerModelSelector" />
</div>
<div style="display: flex; justify-content: flex-end; margin-top: 8px;">
<div
style="display: flex; justify-content: flex-end; margin-top: 8px; align-items: center;">
<input type="file" ref="imageInput" @change="handleFileSelect" accept="image/*"
style="display: none" multiple />
<v-progress-circular v-if="isStreaming || isConvRunning" indeterminate size="16"
class="mr-1" width="1.5" />
<v-btn @click="triggerImageInput" icon="mdi-plus" variant="text" color="deep-purple"
class="add-btn" size="small" />
<v-btn @click="sendMessage" icon="mdi-send" variant="text" color="deep-purple"
:disabled="!prompt && stagedImagesName.length === 0 && !stagedAudioUrl"
class="send-btn" size="small" />
<v-btn @click="isRecording ? stopRecording() : startRecording()"
:icon="isRecording ? 'mdi-stop-circle' : 'mdi-microphone'" variant="text"
:color="isRecording ? 'error' : 'deep-purple'" class="record-btn"
size="small" />
<v-btn @click="sendMessage" icon="mdi-send" variant="text" color="deep-purple"
:disabled="!prompt && stagedImagesName.length === 0 && !stagedAudioUrl"
class="send-btn" size="small" />
</div>
</div>
@@ -235,6 +241,7 @@ import LanguageSwitcher from '@/components/shared/LanguageSwitcher.vue';
import ProviderModelSelector from '@/components/chat/ProviderModelSelector.vue';
import MessageList from '@/components/chat/MessageList.vue';
import 'highlight.js/styles/github.css';
import { useToast } from '@/utils/toast';
export default {
name: 'ChatPage',
@@ -301,7 +308,10 @@ export default {
imagePreviewDialog: false,
previewImageUrl: '',
isStreaming: false
isStreaming: false,
isConvRunning: false, // Track if the current conversation is running
isToastedRunningInfo: false, // To avoid multiple toasts
}
},
@@ -379,7 +389,7 @@ export default {
} else {
this.sidebarCollapsed = true; // 默认折叠状态
}
// 设置输入框标签
this.inputFieldLabel = this.tm('input.chatPrompt');
this.getConversations();
@@ -662,6 +672,25 @@ export default {
// Update the selected conversation in the sidebar
this.selectedConversations = [cid[0]];
let history = response.data.data.history;
this.isConvRunning = response.data.data.is_running || false;
if (this.isConvRunning) {
if (!this.isToastedRunningInfo) {
useToast().info("该对话正在运行中。", { timeout: 5000 });
this.isToastedRunningInfo = true;
}
// 如果对话还在运行,3秒后重新获取消息
setTimeout(() => {
this.getConversationMessages([this.currCid]);
}, 3000);
}
// 滚动到底部
this.$nextTick(() => {
this.$refs.messageList.scrollToBottom();
});
for (let i = 0; i < history.length; i++) {
let content = history[i].content;
if (content.message.startsWith('[IMAGE]')) {
@@ -29,12 +29,11 @@
<!-- Bot Messages -->
<div v-else class="bot-message">
<div v-if="isStreaming && index === messages.length - 1" style="width: 36px; height: 36px;">
<v-progress-circular indeterminate size="28" width="2"
style="margin-top: 12px;"></v-progress-circular>
</div>
<v-avatar v-else class="bot-avatar" size="36">
<span class="text-h2"></span>
<v-avatar class="bot-avatar" size="36">
<v-progress-circular :index="index" v-if="isStreaming && index === messages.length - 1" indeterminate size="28"
width="2"></v-progress-circular>
<span v-else-if="messages[index - 1]?.content.type !== 'bot'" class="text-h2"></span>
</v-avatar>
<div class="bot-message-content">
<div class="message-bubble bot-bubble">
+31
View File
@@ -0,0 +1,31 @@
import { defineStore } from 'pinia'
import { ref, computed } from 'vue'
export const useToastStore = defineStore('toast', () => {
const queue = ref([])
const current = computed(() => queue.value[0])
function add({
message,
color = 'info', // Vuetify 颜色
timeout = 3000,
closable = true,
multiLine = false,
location = 'top center'
}) {
queue.value.push({
message,
color,
timeout,
closable,
multiLine,
location
})
}
function shift() {
queue.value.shift()
}
return { current, add, shift }
})
+16
View File
@@ -0,0 +1,16 @@
import { useToastStore } from '@/stores/toast'
export function useToast() {
const store = useToastStore()
const toast = (message, color = 'info', opts = {}) =>
store.add({ message, color, ...opts })
return {
toast,
success: (msg, opts) => toast(msg, 'success', opts),
error: (msg, opts) => toast(msg, 'error', opts),
info: (msg, opts) => toast(msg, 'primary', opts),
warning: (msg, opts) => toast(msg, 'warning', opts)
}
}