From eb8a1387135f06c606073c0f38e562b456c3f01f Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Tue, 1 Jul 2025 21:00:43 +0800 Subject: [PATCH 1/3] feat: enhance conversation actions with delete functionality and improved styling --- dashboard/src/views/ChatPage.vue | 99 +++++++++++--------------------- 1 file changed, 32 insertions(+), 67 deletions(-) diff --git a/dashboard/src/views/ChatPage.vue b/dashboard/src/views/ChatPage.vue index 2954153ae..0dae041b2 100644 --- a/dashboard/src/views/ChatPage.vue +++ b/dashboard/src/views/ChatPage.vue @@ -31,7 +31,7 @@ elevation="0">
- +
@@ -49,8 +49,12 @@ }} --> @@ -65,22 +69,6 @@ -
- -
-
- -
- - mdi-delete - {{ tm('actions.deleteChat') }} - -
-
-
@@ -112,7 +100,7 @@ @@ -1631,6 +1638,7 @@ export default { border-radius: 6px; overflow-x: auto; margin: 12px 0; + position: relative; } .markdown-content code { @@ -1642,6 +1650,144 @@ export default { color: var(--v-theme-code); } +/* 代码块中的code标签样式 */ +.markdown-content pre code { + background-color: transparent; + padding: 0; + border-radius: 0; + font-family: 'Fira Code', 'Consolas', 'Monaco', 'Courier New', monospace; + font-size: 0.85em; + color: inherit; + display: block; + overflow-x: auto; + line-height: 1.5; +} + +/* 自定义代码高亮样式 */ +.markdown-content pre { + border: 1px solid var(--v-theme-border); + box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1); +} + +/* 确保highlight.js的样式正确应用 */ +.markdown-content pre code.hljs { + background: transparent !important; + color: inherit; +} + +/* 亮色主题下的代码高亮 */ +.v-theme--light .markdown-content pre { + background-color: #f6f8fa; +} + +/* 暗色主题下的代码块样式 */ +.v-theme--dark .markdown-content pre { + background-color: #0d1117 !important; + border-color: rgba(255, 255, 255, 0.1); +} + +.v-theme--dark .markdown-content pre code { + color: #e6edf3 !important; +} + +/* 暗色主题下的highlight.js样式覆盖 */ +.v-theme--dark .hljs { + background: #0d1117 !important; + color: #e6edf3 !important; +} + +.v-theme--dark .hljs-keyword, +.v-theme--dark .hljs-selector-tag, +.v-theme--dark .hljs-built_in, +.v-theme--dark .hljs-name, +.v-theme--dark .hljs-tag { + color: #ff7b72 !important; +} + +.v-theme--dark .hljs-string, +.v-theme--dark .hljs-title, +.v-theme--dark .hljs-section, +.v-theme--dark .hljs-attribute, +.v-theme--dark .hljs-literal, +.v-theme--dark .hljs-template-tag, +.v-theme--dark .hljs-template-variable, +.v-theme--dark .hljs-type, +.v-theme--dark .hljs-addition { + color: #a5d6ff !important; +} + +.v-theme--dark .hljs-comment, +.v-theme--dark .hljs-quote, +.v-theme--dark .hljs-deletion, +.v-theme--dark .hljs-meta { + color: #8b949e !important; +} + +.v-theme--dark .hljs-number, +.v-theme--dark .hljs-regexp, +.v-theme--dark .hljs-symbol, +.v-theme--dark .hljs-variable, +.v-theme--dark .hljs-template-variable, +.v-theme--dark .hljs-link, +.v-theme--dark .hljs-selector-attr, +.v-theme--dark .hljs-selector-pseudo { + color: #79c0ff !important; +} + +.v-theme--dark .hljs-function, +.v-theme--dark .hljs-class, +.v-theme--dark .hljs-title.class_ { + color: #d2a8ff !important; +} + +/* 复制按钮样式 */ +.copy-code-btn { + position: absolute; + top: 8px; + right: 8px; + background: rgba(255, 255, 255, 0.9); + border: 1px solid rgba(0, 0, 0, 0.1); + border-radius: 4px; + padding: 6px; + cursor: pointer; + opacity: 0; + transition: all 0.2s ease; + display: flex; + align-items: center; + justify-content: center; + color: #666; + font-size: 12px; + z-index: 10; + backdrop-filter: blur(4px); +} + +.copy-code-btn:hover { + background: rgba(255, 255, 255, 1); + color: #333; + transform: scale(1.05); + box-shadow: 0 2px 8px rgba(0, 0, 0, 0.15); +} + +.copy-code-btn:active { + transform: scale(0.95); +} + +.markdown-content pre:hover .copy-code-btn { + opacity: 1; +} + +.v-theme--dark .copy-code-btn { + background: rgba(45, 45, 45, 0.9); + border-color: rgba(255, 255, 255, 0.15); + color: #ccc; +} + +.v-theme--dark .copy-code-btn:hover { + background: rgba(45, 45, 45, 1); + color: #fff; + box-shadow: 0 2px 8px rgba(0, 0, 0, 0.3); +} + .markdown-content img { max-width: 100%; border-radius: 8px; From 6a503b82c30d831b09eadd9213ec777b02988fc9 Mon Sep 17 00:00:00 2001 From: Soulter <905617992@qq.com> Date: Tue, 1 Jul 2025 22:34:17 +0800 Subject: [PATCH 3/3] refactor: web chat queue management and streamline chat route handling --- astrbot/core/__init__.py | 2 - .../process_stage/method/llm_request.py | 8 - .../sources/webchat/webchat_adapter.py | 48 +- .../platform/sources/webchat/webchat_event.py | 15 +- .../sources/webchat/webchat_queue_mgr.py | 33 ++ astrbot/dashboard/routes/chat.py | 68 +-- astrbot/dashboard/routes/multi_user_chat.py | 0 dashboard/src/views/ChatPage.vue | 411 ++++++------------ 8 files changed, 245 insertions(+), 340 deletions(-) create mode 100644 astrbot/core/platform/sources/webchat/webchat_queue_mgr.py create mode 100644 astrbot/dashboard/routes/multi_user_chat.py diff --git a/astrbot/core/__init__.py b/astrbot/core/__init__.py index 104a9edb6..16f108ece 100644 --- a/astrbot/core/__init__.py +++ b/astrbot/core/__init__.py @@ -28,5 +28,3 @@ pip_installer = PipInstaller( astrbot_config.get("pip_install_arg", ""), astrbot_config.get("pypi_index_url", None), ) -web_chat_queue = asyncio.Queue(maxsize=32) -web_chat_back_queue = asyncio.Queue(maxsize=32) diff --git a/astrbot/core/pipeline/process_stage/method/llm_request.py b/astrbot/core/pipeline/process_stage/method/llm_request.py index 2ebe4bd42..961463c7a 100644 --- a/astrbot/core/pipeline/process_stage/method/llm_request.py +++ b/astrbot/core/pipeline/process_stage/method/llm_request.py @@ -23,7 +23,6 @@ from astrbot.core.provider.entities import ( LLMResponse, ) from astrbot.core.star.star_handler import EventType -from astrbot.core import web_chat_back_queue from ..agent_runner.tool_loop_agent import ToolLoopAgent @@ -283,13 +282,6 @@ class LLMRequestSubStage(Stage): cid=cid, title=title, ) - web_chat_back_queue.put_nowait( - { - "type": "update_title", - "cid": cid, - "data": title, - } - ) async def _save_to_history( self, diff --git a/astrbot/core/platform/sources/webchat/webchat_adapter.py b/astrbot/core/platform/sources/webchat/webchat_adapter.py index fa384ed99..41d3e9418 100644 --- a/astrbot/core/platform/sources/webchat/webchat_adapter.py +++ b/astrbot/core/platform/sources/webchat/webchat_adapter.py @@ -2,7 +2,7 @@ import time import asyncio import uuid import os -from typing import Awaitable, Any +from typing import Awaitable, Any, Callable from astrbot.core.platform import ( Platform, AstrBotMessage, @@ -13,7 +13,7 @@ from astrbot.core.platform import ( from astrbot.core.message.message_event_result import MessageChain from astrbot.core.message.components import Plain, Image, Record # noqa: F403 from astrbot import logger -from astrbot.core import web_chat_queue +from .webchat_queue_mgr import webchat_queue_mgr, WebChatQueueMgr from .webchat_event import WebChatMessageEvent from astrbot.core.platform.astr_message_event import MessageSesion from ...register import register_platform_adapter @@ -21,14 +21,46 @@ from astrbot.core.utils.astrbot_path import get_astrbot_data_path class QueueListener: - def __init__(self, queue: asyncio.Queue, callback: callable) -> None: - self.queue = queue + def __init__(self, webchat_queue_mgr: WebChatQueueMgr, callback: Callable) -> None: + self.webchat_queue_mgr = webchat_queue_mgr self.callback = callback + self.running_tasks = set() + + async def listen_to_queue(self, conversation_id: str): + """Listen to a specific conversation queue""" + queue = self.webchat_queue_mgr.get_or_create_queue(conversation_id) + while True: + try: + data = await queue.get() + await self.callback(data) + except Exception as e: + logger.error( + f"Error processing message from conversation {conversation_id}: {e}" + ) + break async def run(self): + """Monitor for new conversation queues and start listeners""" + monitored_conversations = set() + while True: - data = await self.queue.get() - await self.callback(data) + # Check for new conversations + current_conversations = set(self.webchat_queue_mgr.queues.keys()) + new_conversations = current_conversations - monitored_conversations + + # Start listeners for new conversations + for conversation_id in new_conversations: + task = asyncio.create_task(self.listen_to_queue(conversation_id)) + self.running_tasks.add(task) + task.add_done_callback(self.running_tasks.discard) + monitored_conversations.add(conversation_id) + logger.debug(f"Started listener for conversation: {conversation_id}") + + # Clean up monitored conversations that no longer exist + removed_conversations = monitored_conversations - current_conversations + monitored_conversations -= removed_conversations + + await asyncio.sleep(1) # Check for new conversations every second @register_platform_adapter("webchat", "webchat") @@ -45,7 +77,7 @@ class WebChatAdapter(Platform): os.makedirs(self.imgs_dir, exist_ok=True) self.metadata = PlatformMetadata( - name="webchat", description="webchat", id=self.config.get("id") + name="webchat", description="webchat", id=self.config.get("id", "") ) async def send_by_session( @@ -105,7 +137,7 @@ class WebChatAdapter(Platform): abm = await self.convert_message(data) await self.handle_msg(abm) - bot = QueueListener(web_chat_queue, callback) + bot = QueueListener(webchat_queue_mgr, callback) return bot.run() def meta(self) -> PlatformMetadata: diff --git a/astrbot/core/platform/sources/webchat/webchat_event.py b/astrbot/core/platform/sources/webchat/webchat_event.py index 111027a5c..c4e5d63c0 100644 --- a/astrbot/core/platform/sources/webchat/webchat_event.py +++ b/astrbot/core/platform/sources/webchat/webchat_event.py @@ -5,8 +5,8 @@ from astrbot.api import logger from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.message_components import Plain, Image, Record from astrbot.core.utils.io import download_image_by_url -from astrbot.core import web_chat_back_queue from astrbot.core.utils.astrbot_path import get_astrbot_data_path +from .webchat_queue_mgr import webchat_queue_mgr imgs_dir = os.path.join(get_astrbot_data_path(), "webchat", "imgs") @@ -18,13 +18,14 @@ class WebChatMessageEvent(AstrMessageEvent): @staticmethod async def _send(message: MessageChain, session_id: str, streaming: bool = False): + cid = session_id.split("!")[-1] + web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid) if not message: await web_chat_back_queue.put( {"type": "end", "data": "", "streaming": False} ) return "" - cid = session_id.split("!")[-1] data = "" for comp in message.chain: if isinstance(comp, Plain): @@ -98,18 +99,22 @@ class WebChatMessageEvent(AstrMessageEvent): async def send(self, message: MessageChain): await WebChatMessageEvent._send(message, session_id=self.session_id) + cid = self.session_id.split("!")[-1] + web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid) await web_chat_back_queue.put( { "type": "end", "data": "", "streaming": False, - "cid": self.session_id.split("!")[-1], + "cid": cid, } ) await super().send(message) async def send_streaming(self, generator, use_fallback: bool = False): final_data = "" + cid = self.session_id.split("!")[-1] + web_chat_back_queue = webchat_queue_mgr.get_or_create_back_queue(cid) async for chain in generator: if chain.type == "break" and final_data: # 分割符 @@ -118,7 +123,7 @@ class WebChatMessageEvent(AstrMessageEvent): "type": "end", "data": final_data, "streaming": True, - "cid": self.session_id.split("!")[-1], + "cid": cid, } ) final_data = "" @@ -132,7 +137,7 @@ class WebChatMessageEvent(AstrMessageEvent): "type": "end", "data": final_data, "streaming": True, - "cid": self.session_id.split("!")[-1], + "cid": cid, } ) await super().send_streaming(generator, use_fallback) diff --git a/astrbot/core/platform/sources/webchat/webchat_queue_mgr.py b/astrbot/core/platform/sources/webchat/webchat_queue_mgr.py new file mode 100644 index 000000000..96e172212 --- /dev/null +++ b/astrbot/core/platform/sources/webchat/webchat_queue_mgr.py @@ -0,0 +1,33 @@ +import asyncio + +class WebChatQueueMgr: + def __init__(self) -> None: + self.queues = {} + """Conversation ID to asyncio.Queue mapping""" + self.back_queues = {} + """Conversation ID to asyncio.Queue mapping for responses""" + + def get_or_create_queue(self, conversation_id: str) -> asyncio.Queue: + """Get or create a queue for the given conversation ID""" + if conversation_id not in self.queues: + self.queues[conversation_id] = asyncio.Queue() + return self.queues[conversation_id] + + def get_or_create_back_queue(self, conversation_id: str) -> asyncio.Queue: + """Get or create a back queue for the given conversation ID""" + if conversation_id not in self.back_queues: + self.back_queues[conversation_id] = asyncio.Queue() + return self.back_queues[conversation_id] + + def remove_queues(self, conversation_id: str): + """Remove queues for the given conversation ID""" + if conversation_id in self.queues: + del self.queues[conversation_id] + if conversation_id in self.back_queues: + del self.back_queues[conversation_id] + + def has_queue(self, conversation_id: str) -> bool: + """Check if a queue exists for the given conversation ID""" + return conversation_id in self.queues + +webchat_queue_mgr = WebChatQueueMgr() diff --git a/astrbot/dashboard/routes/chat.py b/astrbot/dashboard/routes/chat.py index 270c92b44..a273bccdc 100644 --- a/astrbot/dashboard/routes/chat.py +++ b/astrbot/dashboard/routes/chat.py @@ -2,7 +2,7 @@ import uuid import json import os from .route import Route, Response, RouteContext -from astrbot.core import web_chat_queue, web_chat_back_queue +from astrbot.core.platform.sources.webchat.webchat_queue_mgr import webchat_queue_mgr from quart import request, Response as QuartResponse, g, make_response from astrbot.core.db import BaseDatabase import asyncio @@ -21,7 +21,6 @@ class ChatRoute(Route): super().__init__(context) self.routes = { "/chat/send": ("POST", self.chat), - "/chat/listen": ("GET", self.listener), "/chat/new_conversation": ("GET", self.new_conversation), "/chat/conversations": ("GET", self.get_conversations), "/chat/get_conversation": ("GET", self.get_conversation), @@ -40,9 +39,6 @@ class ChatRoute(Route): self.supported_imgs = ["jpg", "jpeg", "png", "gif", "webp"] - self.curr_user_cid = {} - self.curr_chat_sse = {} - async def status(self): has_llm_enabled = ( self.core_lifecycle.provider_manager.curr_provider_inst is not None @@ -133,21 +129,10 @@ class ChatRoute(Route): if not conversation_id: return Response().error("conversation_id is empty").__dict__ - self.curr_user_cid[username] = conversation_id + # Get conversation-specific queues + back_queue = webchat_queue_mgr.get_or_create_back_queue(conversation_id) - await web_chat_queue.put( - ( - username, - conversation_id, - { - "message": message, - "image_url": image_url, # list - "audio_url": audio_url, - }, - ) - ) - - # 持久化 + # append user message conversation = self.db.get_conversation_by_user_id(username, conversation_id) try: history = json.loads(conversation.history) @@ -164,30 +149,12 @@ class ChatRoute(Route): username, conversation_id, history=json.dumps(history) ) - return Response().ok().__dict__ - - async def listener(self): - """一直保持长连接""" - - username = g.get("username", "guest") - - if username in self.curr_chat_sse: - return Response().error("Already connected").__dict__ - - self.curr_chat_sse[username] = None - - heartbeat = json.dumps({"type": "heartbeat", "data": "ping"}) - async def stream(): try: - yield f"data: {heartbeat}\n\n" # 心跳包 while True: try: - result = await asyncio.wait_for( - web_chat_back_queue.get(), timeout=10 - ) # 设置超时时间为5秒 + result = await asyncio.wait_for(back_queue.get(), timeout=10) except asyncio.TimeoutError: - yield f"data: {heartbeat}\n\n" # 心跳包 continue if not result: @@ -197,9 +164,6 @@ class ChatRoute(Route): type = result.get("type") cid = result.get("cid") streaming = result.get("streaming", False) - if cid != self.curr_user_cid.get(username): - # 丢弃 - continue yield f"data: {json.dumps(result, ensure_ascii=False)}\n\n" await asyncio.sleep(0.05) @@ -210,6 +174,7 @@ class ChatRoute(Route): continue if result_text: + # append bot message conversation = self.db.get_conversation_by_user_id( username, cid ) @@ -222,11 +187,25 @@ class ChatRoute(Route): self.db.update_conversation( username, cid, history=json.dumps(history) ) + break except BaseException as _: logger.debug(f"用户 {username} 断开聊天长连接。") - self.curr_chat_sse.pop(username) return + # Put message to conversation-specific queue + chat_queue = webchat_queue_mgr.get_or_create_queue(conversation_id) + await chat_queue.put( + ( + username, + conversation_id, + { + "message": message, + "image_url": image_url, # list + "audio_url": audio_url, + }, + ) + ) + response = await make_response( stream(), { @@ -236,7 +215,6 @@ class ChatRoute(Route): "Connection": "keep-alive", }, ) - response.timeout = None return response async def delete_conversation(self): @@ -245,6 +223,8 @@ class ChatRoute(Route): if not conversation_id: return Response().error("Missing key: conversation_id").__dict__ + # Clean up queues when deleting conversation + webchat_queue_mgr.remove_queues(conversation_id) self.db.delete_conversation(username, conversation_id) return Response().ok().__dict__ @@ -279,6 +259,4 @@ class ChatRoute(Route): conversation = self.db.get_conversation_by_user_id(username, conversation_id) - self.curr_user_cid[username] = conversation_id - return Response().ok(data=conversation).__dict__ diff --git a/astrbot/dashboard/routes/multi_user_chat.py b/astrbot/dashboard/routes/multi_user_chat.py new file mode 100644 index 000000000..e69de29bb diff --git a/dashboard/src/views/ChatPage.vue b/dashboard/src/views/ChatPage.vue index 89262b0d8..8c68f083e 100644 --- a/dashboard/src/views/ChatPage.vue +++ b/dashboard/src/views/ChatPage.vue @@ -187,8 +187,7 @@ style="width: 85%; max-width: 900px; margin: 0 auto; border: 1px solid #e0e0e0; border-radius: 24px; padding: 4px;"> + style="width: 100%; resize: none; outline: none; border: 1px solid var(--v-theme-border); border-radius: 12px; padding: 12px 16px; min-height: 40px; font-family: inherit; font-size: 16px; background-color: var(--v-theme-surface);">
or /chatbox/ pattern @@ -394,7 +390,6 @@ export default { // Theme is now handled globally by the customizer store. // 设置输入框标签 this.inputFieldLabel = this.tm('input.chatPrompt'); - this.startListeningEvent(); this.checkStatus(); this.getConversations(); let inputField = document.getElementById('input-field'); @@ -420,8 +415,6 @@ export default { }, beforeUnmount() { - this.disconnectSSE(); - // 移除keyup事件监听 document.removeEventListener('keyup', this.handleInputKeyUp); @@ -529,246 +522,10 @@ export default { } }, - // 断开SSE连接 - disconnectSSE() { - if (this.eventSourceReader) { - try { - this.eventSourceReader.cancel(); - console.log('SSE Reader cancelled'); - } catch (error) { - console.warn('Error cancelling SSE reader:', error); - } - this.eventSourceReader = null; - } - if (this.eventSource) { - try { - this.eventSource.cancel(); - console.log('SSE连接已断开'); - } catch (error) { - console.warn('Error cancelling SSE:', error); - } - this.eventSource = null; - } - }, - - // 重新连接SSE - async reconnectSSE() { - if (this.sseReconnecting) { - console.log('SSE reconnection already in progress'); - return; - } - - this.sseReconnecting = true; - console.log('Reconnecting SSE...'); - this.disconnectSSE(); - - // 等待更长时间确保后端连接完全清理 - await new Promise(resolve => setTimeout(resolve, 1000)); - - this.startListeningEvent(); - }, - - async startListeningEvent() { - // 确保之前的连接已断开 - this.disconnectSSE(); - - // 如果正在重连过程中,等待一下 - if (this.sseReconnecting) { - await new Promise(resolve => setTimeout(resolve, 500)); - } - - let retryCount = 0; - const maxRetries = 3; - - while (retryCount < maxRetries) { - try { - console.log(`尝试建立SSE连接 (${retryCount + 1}/${maxRetries})`); - - const response = await fetch('/api/chat/listen', { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - 'Authorization': 'Bearer ' + localStorage.getItem('token') - } - }); - - if (!response.ok) { - throw new Error(`SSE连接失败: ${response.statusText}`); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - this.eventSource = reader; - this.eventSourceReader = reader; - this.sseReconnecting = false; - - let in_streaming = false; - let message_obj = null; - console.log('SSE连接已建立'); - // 显示连接成功状态 - if (retryCount > 0) { - this.showConnectionStatus(this.tm('connection.status.reconnected'), 'success'); - } - - while (true) { - try { - const { done, value } = await reader.read(); - if (done) { - console.log('SSE连接正常关闭'); - break; - } - - const chunk = decoder.decode(value, { stream: true }); - - // 可能有多行 - let lines = chunk.split('\n\n'); - - console.log('SSE数据:', lines); - - for (let i = 0; i < lines.length; i++) { - let line = lines[i].trim(); - - if (!line) { - continue; - } - - console.log(line); // 处理后端错误响应格式 - if (line.startsWith('{"status":"error"')) { - try { - const errorObj = JSON.parse(line); - if (errorObj.message === 'Already connected') { - throw new Error('CONNECTION_CONFLICT'); - } - console.error('后端错误:', errorObj.message); - continue; - } catch (parseError) { - if (parseError.message === 'CONNECTION_CONFLICT') { - throw parseError; - } - console.warn('解析错误响应失败:', line); - continue; - } - } - - // data: {"type": "plain", "data": "helloworld"} - let chunk_json; - try { - chunk_json = JSON.parse(line.replace('data: ', '')); - } catch (parseError) { - console.warn('JSON解析失败:', line, parseError); - continue; - } - - // 检查解析后的数据是否有效 - if (!chunk_json || typeof chunk_json !== 'object') { - console.warn('无效的数据对象:', chunk_json); - continue; - } - - // 检查是否有type字段 - if (!chunk_json.hasOwnProperty('type')) { - console.warn('数据缺少type字段:', chunk_json); - continue; - } - - if (chunk_json.type === 'heartbeat') { - continue; // 心跳包 - } - if (chunk_json.type === 'error') { - console.error('Error received:', chunk_json.data); - continue; - } - - if (chunk_json.type === 'image') { - let img = chunk_json.data.replace('[IMAGE]', ''); - const imageUrl = await this.getMediaFile(img); - let bot_resp = { - type: 'bot', - message: `` - } - this.messages.push(bot_resp); - } else if (chunk_json.type === 'record') { - let audio = chunk_json.data.replace('[RECORD]', ''); - const audioUrl = await this.getMediaFile(audio); - let bot_resp = { - type: 'bot', - message: `` - } - this.messages.push(bot_resp); - } else if (chunk_json.type === 'plain') { - if (!in_streaming) { - message_obj = { - type: 'bot', - message: this.ref(chunk_json.data), - } - this.messages.push(message_obj); - in_streaming = true; - } else { - message_obj.message.value += chunk_json.data; - } - } else if (chunk_json.type === 'end') { - in_streaming = false; - // 在消息流结束后初始化代码复制按钮 - this.initCodeCopyButtons(); - continue; - } else if (chunk_json.type === 'update_title') { - // 更新对话标题 - const conversation = this.conversations.find(c => c.cid === chunk_json.cid); - if (conversation) { - conversation.title = chunk_json.data; - } - } else { - console.warn('未知数据类型:', chunk_json.type); - } - this.scrollToBottom(); - } - } catch (readError) { - if (readError.name === 'AbortError') { - console.log('SSE连接被取消'); - break; - } - if (readError.message === 'CONNECTION_CONFLICT') { - throw readError; - } - console.error('SSE读取错误:', readError); - break; - } - } - - // 如果成功连接并正常结束,跳出重试循环 - break; - - } catch (error) { - console.error(`SSE连接错误 (尝试 ${retryCount + 1}):`, error); - - retryCount++; - if (error.message === 'CONNECTION_CONFLICT' && retryCount < maxRetries) { - console.log(`连接冲突,等待 ${2000 * retryCount}ms 后重试...`); - this.showConnectionStatus(`${this.tm('connection.status.reconnecting')} (${retryCount}/${maxRetries})`, 'warning'); - await new Promise(resolve => setTimeout(resolve, 2000 * retryCount)); - continue; - } - - if (retryCount >= maxRetries) { - console.error('SSE连接重试次数已达上限'); - this.showConnectionStatus(this.tm('connection.status.failed'), 'error'); - this.sseReconnecting = false; - break; - } - - // 等待一段时间后重试 - await new Promise(resolve => setTimeout(resolve, 1000 * retryCount)); - } finally { - this.eventSource = null; - this.eventSourceReader = null; - } - } - - this.sseReconnecting = false; + showConnectionStatus(message, type) { + // You can implement a toast notification here or update UI status + console.log(`Connection status: ${message} (${type})`); }, removeAudio() { @@ -920,7 +677,6 @@ export default { } } this.messages = message; - // 初始化代码复制按钮 this.initCodeCopyButtons(); }).catch(err => { console.error(err); @@ -1032,33 +788,144 @@ export default { this.messages.push(userMessage); this.scrollToBottom(); - this.loadingChat = true; + this.loadingChat = true - fetch('/api/chat/send', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': 'Bearer ' + localStorage.getItem('token') - }, - body: JSON.stringify({ - message: this.prompt.trim(), // 确保发送的消息已去除前后空格 - conversation_id: this.currCid, - image_url: this.stagedImagesName, - audio_url: this.stagedAudioUrl ? [this.stagedAudioUrl] : [] - }) - }) - .then(response => { - this.prompt = ''; - this.stagedImagesName = []; - this.stagedImagesUrl = []; - this.stagedAudioUrl = ""; - this.loadingChat = false; - }) - .catch(err => { - console.error(err); - this.loadingChat = false; + try { + const response = await fetch('/api/chat/send', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer ' + localStorage.getItem('token') + }, + body: JSON.stringify({ + message: this.prompt.trim(), // 确保发送的消息已去除前后空格 + conversation_id: this.currCid, + image_url: this.stagedImagesName, + audio_url: this.stagedAudioUrl ? [this.stagedAudioUrl] : [] + }) }); + + this.prompt = ''; // 清空输入框; + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let in_streaming = false; + let message_obj = null; + + while (true) { + try { + const { done, value } = await reader.read(); + if (done) { + console.log('SSE stream completed'); + break; + } + + const chunk = decoder.decode(value, { stream: true }); + const lines = chunk.split('\n\n'); + + for (let i = 0; i < lines.length; i++) { + let line = lines[i].trim(); + + if (!line) { + continue; + } + + // Parse SSE data + let chunk_json; + try { + chunk_json = JSON.parse(line.replace('data: ', '')); + } catch (parseError) { + console.warn('JSON解析失败:', line, parseError); + continue; + } + + // 检查解析后的数据是否有效 + if (!chunk_json || typeof chunk_json !== 'object' || !chunk_json.hasOwnProperty('type')) { + console.warn('无效的数据对象:', chunk_json); + continue; + } + + if (chunk_json.type === 'heartbeat') { + continue; // 心跳包 + } + if (chunk_json.type === 'error') { + console.error('Error received:', chunk_json.data); + continue; + } + + if (chunk_json.type === 'image') { + let img = chunk_json.data.replace('[IMAGE]', ''); + const imageUrl = await this.getMediaFile(img); + let bot_resp = { + type: 'bot', + message: `` + } + this.messages.push(bot_resp); + } else if (chunk_json.type === 'record') { + let audio = chunk_json.data.replace('[RECORD]', ''); + const audioUrl = await this.getMediaFile(audio); + let bot_resp = { + type: 'bot', + message: `` + } + this.messages.push(bot_resp); + } else if (chunk_json.type === 'plain') { + if (!in_streaming) { + message_obj = { + type: 'bot', + message: this.ref(chunk_json.data), + } + this.messages.push(message_obj); + in_streaming = true; + } else { + message_obj.message.value += chunk_json.data; + } + } else if (chunk_json.type === 'end') { + in_streaming = false; + // 在消息流结束后初始化代码复制按钮 + this.initCodeCopyButtons(); + continue; + } else if (chunk_json.type === 'update_title') { + // 更新对话标题 + const conversation = this.conversations.find(c => c.cid === chunk_json.cid); + if (conversation) { + conversation.title = chunk_json.data; + } + } else { + console.warn('未知数据类型:', chunk_json.type); + } + this.scrollToBottom(); + } + } catch (readError) { + console.error('SSE读取错误:', readError); + break; + } + } + + // Clear input after successful send + this.prompt = ''; + this.stagedImagesName = []; + this.stagedImagesUrl = []; + this.stagedAudioUrl = ""; + this.loadingChat = false; + + // get the latest conversations + this.getConversations(); + + } catch (err) { + console.error('发送消息失败:', err); + this.loadingChat = false; + this.showConnectionStatus(this.tm('connection.status.failed'), 'error'); + } }, + scrollToBottom() { this.$nextTick(() => { const container = this.$refs.messageContainer;