feat: metrics

This commit is contained in:
Soulter
2026-01-17 15:34:46 +08:00
parent 1d426a7458
commit 19e6253d5d
5 changed files with 157 additions and 13 deletions
+35 -1
View File
@@ -1,4 +1,5 @@
import asyncio
import time
import traceback
from collections.abc import AsyncGenerator
@@ -13,6 +14,7 @@ from astrbot.core.message.message_event_result import (
ResultContentType,
)
from astrbot.core.provider.entities import LLMResponse
from astrbot.core.provider.provider import TTSProvider
AgentRunner = ToolLoopAgentRunner[AstrAgentContext]
@@ -136,7 +138,7 @@ async def run_agent(
async def run_live_agent(
agent_runner: AgentRunner,
tts_provider,
tts_provider: TTSProvider | None = None,
max_step: int = 30,
show_tool_use: bool = True,
show_reasoning: bool = False,
@@ -184,14 +186,46 @@ async def run_live_agent(
return
# 处理 TTS
tts_start_time = time.time()
tts_first_frame_time = 0.0
first_chunk_received = False
if support_stream:
# 使用流式 TTS
async for audio_chunk in _process_stream_tts(llm_stream_chunks, tts_provider):
if not first_chunk_received:
tts_first_frame_time = time.time() - tts_start_time
first_chunk_received = True
yield audio_chunk
else:
# 使用完整音频 TTS
async for audio_chunk in _process_full_tts(llm_stream_chunks, tts_provider):
if not first_chunk_received:
tts_first_frame_time = time.time() - tts_start_time
first_chunk_received = True
yield audio_chunk
tts_end_time = time.time()
# 发送 TTS 统计信息
try:
astr_event = agent_runner.run_context.context.event
if astr_event.get_platform_name() == "webchat":
tts_duration = tts_end_time - tts_start_time
await astr_event.send(
MessageChain(
type="tts_stats",
chain=[
Json(
data={
"duration": tts_duration,
"first_frame_time": tts_first_frame_time,
}
)
],
)
)
except Exception as e:
logger.error(f"发送 TTS 统计信息失败: {e}")
async def _process_stream_tts(chunks: list[MessageChain], tts_provider):
@@ -41,6 +41,7 @@ from ...utils import (
FILE_DOWNLOAD_TOOL,
FILE_UPLOAD_TOOL,
KNOWLEDGE_BASE_QUERY_TOOL,
LIVE_MODE_SYSTEM_PROMPT,
LLM_SAFETY_MODE_SYSTEM_PROMPT,
PYTHON_TOOL,
SANDBOX_MODE_PROMPT,
@@ -687,6 +688,7 @@ class InternalAgentSubStage(Stage):
# 检测 Live Mode
action_type = event.get_extra("action_type")
if action_type == "live":
req.system_prompt += f"\n{LIVE_MODE_SYSTEM_PROMPT}\n"
# Live Mode: 使用 run_live_agent
logger.info("[Internal Agent] 检测到 Live Mode,启用 TTS 处理")
@@ -64,6 +64,11 @@ CHATUI_EXTRA_PROMPT = (
"Such as, user asked you to generate codes, you can add: Do you need me to run these codes for you?"
)
LIVE_MODE_SYSTEM_PROMPT = (
"You are talking to the user in real-time. "
"Behavior like a real friend, do not use template responses. "
"Use natural and native language to answer the user's questions. "
)
@dataclass
class KnowledgeBaseQueryTool(FunctionTool[AstrAgentContext]):
+77 -12
View File
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import time
import uuid
@@ -42,19 +43,20 @@ class LiveChatSession:
if self.is_speaking:
self.audio_frames.append(data)
async def end_speaking(self, stamp: str) -> str | None:
"""结束说话,返回组装的 WAV 文件路径"""
async def end_speaking(self, stamp: str) -> tuple[str | None, float]:
"""结束说话,返回组装的 WAV 文件路径和耗时"""
start_time = time.time()
if not self.is_speaking or stamp != self.current_stamp:
logger.warning(
f"[Live Chat] stamp 不匹配或未在说话状态: {stamp} vs {self.current_stamp}"
)
return None
return None, 0.0
self.is_speaking = False
if not self.audio_frames:
logger.warning("[Live Chat] 没有音频帧数据")
return None
return None, 0.0
# 组装 WAV 文件
try:
@@ -74,11 +76,11 @@ class LiveChatSession:
logger.info(
f"[Live Chat] 音频文件已保存: {audio_path}, 大小: {os.path.getsize(audio_path)} bytes"
)
return audio_path
return audio_path, time.time() - start_time
except Exception as e:
logger.error(f"[Live Chat] 组装 WAV 文件失败: {e}", exc_info=True)
return None
return None, 0.0
def cleanup(self):
"""清理临时文件"""
@@ -184,22 +186,30 @@ class LiveChatRoute(Route):
logger.warning("[Live Chat] end_speaking 缺少 stamp")
return
audio_path = await session.end_speaking(stamp)
audio_path, assemble_duration = await session.end_speaking(stamp)
if not audio_path:
await websocket.send_json({"t": "error", "data": "音频组装失败"})
return
# 处理音频:STT -> LLM -> TTS
await self._process_audio(session, audio_path)
await self._process_audio(session, audio_path, assemble_duration)
elif msg_type == "interrupt":
# 用户打断
session.should_interrupt = True
logger.info(f"[Live Chat] 用户打断: {session.username}")
async def _process_audio(self, session: LiveChatSession, audio_path: str):
async def _process_audio(
self, session: LiveChatSession, audio_path: str, assemble_duration: float
):
"""处理音频:STT -> LLM -> 流式 TTS"""
try:
# 发送 WAV 组装耗时
await websocket.send_json(
{"t": "metrics", "data": {"wav_assemble_time": assemble_duration}}
)
wav_assembly_finish_time = time.time()
session.is_processing = True
session.should_interrupt = False
@@ -219,9 +229,6 @@ class LiveChatRoute(Route):
logger.info(f"[Live Chat] STT 结果: {user_text}")
# 发送用户消息
import time
await websocket.send_json(
{
"t": "user_msg",
@@ -281,8 +288,44 @@ class LiveChatRoute(Route):
continue
result_type = result.get("type")
result_chain_type = result.get("chain_type")
data = result.get("data", "")
if result_chain_type == "agent_stats":
try:
stats = json.loads(data)
await websocket.send_json(
{
"t": "metrics",
"data": {
"llm_ttft": stats.get("time_to_first_token", 0),
"llm_total_time": stats.get("end_time", 0)
- stats.get("start_time", 0),
},
}
)
except Exception as e:
logger.error(f"[Live Chat] 解析 AgentStats 失败: {e}")
continue
if result_chain_type == "tts_stats":
try:
stats = json.loads(data)
await websocket.send_json(
{
"t": "metrics",
"data": {
"tts_total_time": stats.get("duration", 0),
"tts_first_frame_time": stats.get(
"first_frame_time", 0
),
},
}
)
except Exception as e:
logger.error(f"[Live Chat] 解析 TTSStats 失败: {e}")
continue
if result_type == "plain":
# 普通文本消息
bot_text += data
@@ -293,6 +336,19 @@ class LiveChatRoute(Route):
audio_playing = True
logger.debug("[Live Chat] 开始播放音频流")
# Calculate latency from wav assembly finish to first audio chunk
speak_to_first_frame_latency = (
time.time() - wav_assembly_finish_time
)
await websocket.send_json(
{
"t": "metrics",
"data": {
"speak_to_first_frame": speak_to_first_frame_latency
},
}
)
# 发送音频数据给前端
await websocket.send_json(
{
@@ -319,6 +375,15 @@ class LiveChatRoute(Route):
# 发送结束标记
await websocket.send_json({"t": "end"})
# 发送总耗时
wav_to_tts_duration = time.time() - wav_assembly_finish_time
await websocket.send_json(
{
"t": "metrics",
"data": {"wav_to_tts_total_time": wav_to_tts_duration},
}
)
break
except Exception as e:
@@ -19,6 +19,16 @@
</div>
</div>
</div>
<div class="metrics-container" v-if="Object.keys(metrics).length > 0">
<span v-if="metrics.wav_assemble_time">WAV Assemble: {{ (metrics.wav_assemble_time * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.llm_ttft">LLM First Token Latency: {{ (metrics.llm_ttft * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.llm_total_time">LLM Total Latency: {{ (metrics.llm_total_time * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.tts_first_frame_time">TTS First Frame Latency: {{ (metrics.tts_first_frame_time * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.tts_total_time">TTS Total Larency: {{ (metrics.tts_total_time * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.speak_to_first_frame">Speak -> First TTS Frame: {{ (metrics.speak_to_first_frame * 1000).toFixed(0) }}ms</span>
<span v-if="metrics.wav_to_tts_total_time">Speak -> End: {{ (metrics.wav_to_tts_total_time * 1000).toFixed(0) }}ms</span>
</div>
</div>
</div>
</template>
@@ -60,6 +70,17 @@ let isPlaying = ref(false);
// 消息历史
const messages = ref<Array<{ type: 'user' | 'bot', text: string }>>([]);
interface LiveMetrics {
wav_assemble_time?: number;
speak_to_first_frame?: number;
llm_ttft?: number;
llm_total_time?: number;
tts_first_frame_time?: number;
tts_total_time?: number;
wav_to_tts_total_time?: number;
}
const metrics = ref<LiveMetrics>({});
// 当前语音片段标记
let currentStamp = '';
@@ -136,6 +157,7 @@ async function startLiveMode() {
// 发送开始说话消息
if (ws && ws.readyState === WebSocket.OPEN) {
metrics.value = {}; // Reset metrics
ws.send(JSON.stringify({
t: 'start_speaking',
stamp: currentStamp
@@ -302,6 +324,10 @@ function handleWebSocketMessage(event: MessageEvent) {
isProcessing.value = false;
isListening.value = true;
break;
case 'metrics':
metrics.value = { ...metrics.value, ...message.data };
break;
}
} catch (error) {
console.error('[Live Mode] 处理消息失败:', error);
@@ -515,4 +541,16 @@ onBeforeUnmount(() => {
flex: 1;
word-wrap: break-word;
}
.metrics-container {
position: absolute;
bottom: 10px;
left: 10px;
display: flex;
flex-direction: column;
gap: 4px;
font-size: 12px;
color: rgba(var(--v-theme-on-surface), 0.6);
z-index: 100;
}
</style>