fix: fix log loss on SSE reconnect using Last-Event-ID (#4205)

* feat: implement last-event-id handing in log route

* perf: better log handling

* chore: ruff format

* perf: log

* Update ConsoleDisplayer.vue

* Update package.json

* Update ConsoleDisplayer.vue

* Update common.js

* chore: ruff format

* fix: ensure last_event_id is required for log replay

---------

Co-authored-by: Soulter <905617992@qq.com>
This commit is contained in:
NoctuUFO
2025-12-26 18:01:58 +08:00
committed by GitHub
parent aa38fe776a
commit 9a5cc977c2
4 changed files with 217 additions and 89 deletions
+44 -10
View File
@@ -1,15 +1,26 @@
import asyncio
import json
import time
from collections.abc import AsyncGenerator
from typing import cast
from quart import Response as QuartResponse
from quart import make_response
from quart import make_response, request
from astrbot.core import LogBroker, logger
from .route import Response, Route, RouteContext
def _format_log_sse(log: dict, ts: float) -> str:
"""辅助函数:格式化 SSE 消息"""
payload = {
"type": "log",
**log,
}
return f"id: {ts}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
class LogRoute(Route):
def __init__(self, context: RouteContext, log_broker: LogBroker) -> None:
super().__init__(context)
@@ -21,21 +32,44 @@ class LogRoute(Route):
methods=["GET"],
)
async def log(self):
async def _replay_cached_logs(
self, last_event_id: str
) -> AsyncGenerator[str, None]:
"""辅助生成器:重放缓存的日志"""
try:
last_ts = float(last_event_id)
cached_logs = list(self.log_broker.log_cache)
for log_item in cached_logs:
log_ts = float(log_item.get("time", 0))
if log_ts > last_ts:
yield _format_log_sse(log_item, log_ts)
except ValueError:
pass
except Exception as e:
logger.error(f"Log SSE 补发历史错误: {e}")
async def log(self) -> QuartResponse:
last_event_id = request.headers.get("Last-Event-ID")
async def stream():
queue = None
try:
if last_event_id:
async for event in self._replay_cached_logs(last_event_id):
yield event
queue = self.log_broker.register()
while True:
message = await queue.get()
payload = {
"type": "log",
**message, # see astrbot/core/log.py
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
except asyncio.CancelledError:
pass
except BaseException as e:
except Exception as e:
logger.error(f"Log SSE 连接错误: {e}")
finally:
if queue:
@@ -53,7 +87,7 @@ class LogRoute(Route):
},
),
)
response.timeout = None
response.timeout = None # type: ignore
return response
async def log_history(self):
@@ -69,6 +103,6 @@ class LogRoute(Route):
)
.__dict__
)
except BaseException as e:
except Exception as e:
logger.error(f"获取日志历史失败: {e}")
return Response().error(f"获取日志历史失败: {e}").__dict__
+1
View File
@@ -22,6 +22,7 @@
"axios-mock-adapter": "^1.22.0",
"chance": "1.1.11",
"date-fns": "2.30.0",
"event-source-polyfill": "^1.0.31",
"highlight.js": "^11.11.1",
"js-md5": "^0.8.3",
"katex": "^0.16.27",
@@ -1,12 +1,11 @@
<script setup>
import { useCommonStore } from '@/stores/common';
import { storeToRefs } from 'pinia';
import axios from 'axios';
import { EventSourcePolyfill } from 'event-source-polyfill';
</script>
<template>
<div>
<!-- 添加筛选级别控件 -->
<div class="filter-controls mb-2" v-if="showLevelBtns">
<v-chip-group v-model="selectedLevels" column multiple>
<v-chip v-for="level in logLevels" :key="level" :color="getLevelColor(level)" filter variant="flat" size="small"
@@ -26,20 +25,19 @@ export default {
name: 'ConsoleDisplayer',
data() {
return {
autoScroll: true, // 默认开启自动滚动
autoScroll: true,
logColorAnsiMap: {
'\u001b[1;34m': 'color: #0000FF; font-weight: bold;', // bold_blue
'\u001b[1;36m': 'color: #00FFFF; font-weight: bold;', // bold_cyan
'\u001b[1;33m': 'color: #FFFF00; font-weight: bold;', // bold_yellow
'\u001b[31m': 'color: #FF0000;', // red
'\u001b[1;31m': 'color: #FF0000; font-weight: bold;', // bold_red
'\u001b[0m': 'color: inherit; font-weight: normal;', // reset
'\u001b[32m': 'color: #00FF00;', // green
'\u001b[1;34m': 'color: #0000FF; font-weight: bold;',
'\u001b[1;36m': 'color: #00FFFF; font-weight: bold;',
'\u001b[1;33m': 'color: #FFFF00; font-weight: bold;',
'\u001b[31m': 'color: #FF0000;',
'\u001b[1;31m': 'color: #FF0000; font-weight: bold;',
'\u001b[0m': 'color: inherit; font-weight: normal;',
'\u001b[32m': 'color: #00FF00;',
'default': 'color: #FFFFFF;'
},
historyNum_: -1,
logLevels: ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
selectedLevels: [0, 1, 2, 3, 4], // 默认选中所有级别
selectedLevels: [0, 1, 2, 3, 4],
levelColors: {
'DEBUG': 'grey',
'INFO': 'blue-lighten-3',
@@ -47,17 +45,19 @@ export default {
'ERROR': 'red',
'CRITICAL': 'purple'
},
lastProcessedTime: 0, // 记录最后处理的日志时间戳
localLogCache: [], // 本地日志缓存
localLogCache: [],
eventSource: null,
retryTimer: null,
retryAttempts: 0,
maxRetryAttempts: 10,
baseRetryDelay: 1000,
lastEventId: null,
}
},
computed: {
commonStore() {
return useCommonStore();
},
logCache() {
return this.commonStore.log_cache;
}
},
props: {
historyNum: {
@@ -70,41 +70,6 @@ export default {
}
},
watch: {
logCache: {
handler(newVal) {
// 基于 timestamp 处理新增的日志
if (newVal && newVal.length > 0) {
// 确保 DOM 已经准备好
this.$nextTick(() => {
// 合并到本地缓存并按时间排序
const newLogs = newVal.filter(log => log.time > this.lastProcessedTime);
if (newLogs.length > 0) {
this.localLogCache.push(...newLogs);
// 按时间戳排序
this.localLogCache.sort((a, b) => a.time - b.time);
// 只保留最新的 log_cache_max_len 条
if (this.localLogCache.length > this.commonStore.log_cache_max_len) {
this.localLogCache.splice(0, this.localLogCache.length - this.commonStore.log_cache_max_len);
}
// 显示新日志
newLogs.forEach(logItem => {
if (this.isLevelSelected(logItem.level)) {
this.printLog(logItem.data);
}
});
// 更新最后处理时间
this.lastProcessedTime = Math.max(...newLogs.map(log => log.time));
}
});
}
},
deep: true,
immediate: false
},
selectedLevels: {
handler() {
this.refreshDisplay();
@@ -113,30 +78,142 @@ export default {
}
},
async mounted() {
// 请求历史日志
await this.fetchLogHistory();
// 等待 DOM 准备好后,显示历史日志
this.$nextTick(() => {
if (this.localLogCache.length > 0) {
this.localLogCache.forEach(logItem => {
if (this.isLevelSelected(logItem.level)) {
this.printLog(logItem.data);
}
});
// 更新最后处理时间
this.lastProcessedTime = Math.max(...this.localLogCache.map(log => log.time));
}
});
this.connectSSE();
},
beforeUnmount() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryAttempts = 0;
},
methods: {
connectSSE() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
console.log(`正在连接日志流... (尝试次数: ${this.retryAttempts})`);
const token = localStorage.getItem('token');
this.eventSource = new EventSourcePolyfill('/api/live-log', {
headers: {
'Authorization': token ? `Bearer ${token}` : ''
},
heartbeatTimeout: 300000,
withCredentials: true
});
this.eventSource.onopen = () => {
console.log('日志流连接成功!');
this.retryAttempts = 0;
if (!this.lastEventId) {
this.fetchLogHistory();
}
};
this.eventSource.onmessage = (event) => {
try {
if (event.lastEventId) {
this.lastEventId = event.lastEventId;
}
const payload = JSON.parse(event.data);
this.processNewLogs([payload]);
} catch (e) {
console.error('解析日志失败:', e);
}
};
this.eventSource.onerror = (err) => {
if (err.status === 401) {
console.error('鉴权失败 (401),可能是 Token 过期了。');
} else {
console.warn('日志流连接错误:', err);
}
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.retryAttempts >= this.maxRetryAttempts) {
console.error('❌ 已达到最大重试次数,停止重连。请刷新页面重试。');
return;
}
const delay = Math.min(
this.baseRetryDelay * Math.pow(2, this.retryAttempts),
30000
);
console.log(`${delay}ms 后尝试第 ${this.retryAttempts + 1} 次重连...`);
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryTimer = setTimeout(async () => {
this.retryAttempts++;
if (!this.lastEventId) {
await this.fetchLogHistory();
}
this.connectSSE();
}, delay);
};
},
processNewLogs(newLogs) {
if (!newLogs || newLogs.length === 0) return;
let hasUpdate = false;
newLogs.forEach(log => {
const exists = this.localLogCache.some(existing =>
existing.time === log.time &&
existing.data === log.data &&
existing.level === log.level
);
if (!exists) {
this.localLogCache.push(log);
hasUpdate = true;
if (this.isLevelSelected(log.level)) {
this.printLog(log.data);
}
}
});
if (hasUpdate) {
this.localLogCache.sort((a, b) => a.time - b.time);
const maxSize = this.commonStore.log_cache_max_len || 200;
if (this.localLogCache.length > maxSize) {
this.localLogCache.splice(0, this.localLogCache.length - maxSize);
}
}
},
async fetchLogHistory() {
try {
const res = await axios.get('/api/log-history');
if (res.data.data.logs && res.data.data.logs.length > 0) {
this.localLogCache = [...res.data.data.logs];
// 按时间戳排序
this.localLogCache.sort((a, b) => a.time - b.time);
this.processNewLogs(res.data.data.logs);
}
} catch (err) {
console.error('Failed to fetch log history:', err);
@@ -162,7 +239,6 @@ export default {
if (termElement) {
termElement.innerHTML = '';
// 重新显示所有符合筛选条件的日志
if (this.localLogCache && this.localLogCache.length > 0) {
this.localLogCache.forEach(logItem => {
if (this.isLevelSelected(logItem.level)) {
@@ -173,16 +249,13 @@ export default {
}
},
toggleAutoScroll() {
this.autoScroll = !this.autoScroll;
},
printLog(log) {
// append 一个 span 标签到 termblock 的方式
let ele = document.getElementById('term')
if (!ele) {
console.warn('term element not found, skipping log print');
return;
}
@@ -196,11 +269,11 @@ export default {
}
}
span.style = style + 'display: block; font-size: 12px; font-family: Consolas, monospace; white-space: pre-wrap;'
span.style = style + 'display: block; font-size: 12px; font-family: Consolas, monospace; white-space: pre-wrap; margin-bottom: 2px;'
span.classList.add('fade-in')
span.innerText = `${log}`;
ele.appendChild(span)
if (this.autoScroll ) {
if (this.autoScroll) {
ele.scrollTop = ele.scrollHeight
}
}
@@ -230,4 +303,4 @@ export default {
opacity: 1;
}
}
</style>
</style>
+23 -3
View File
@@ -21,10 +21,14 @@ export const useCommonStore = defineStore({
}
const controller = new AbortController();
const { signal } = controller;
// 注意:这里如果之前改过 Polyfill 的话,可能需要保持原样
// 如果是用 fetch 的话,这里是支持 Authorization Header 的
const headers = {
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer ' + localStorage.getItem('token')
};
fetch('/api/live-log', {
method: 'GET',
headers,
@@ -72,10 +76,20 @@ export const useCommonStore = defineStore({
try {
const logObject = JSON.parse(logLine);
// give a uuid if not exists
// 修复:兼容 HTTP 环境的 UUID 生成
if (!logObject.uuid) {
logObject.uuid = crypto.randomUUID();
if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') {
logObject.uuid = crypto.randomUUID();
} else {
// 手动生成 UUID v4
logObject.uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
}
this.log_cache.push(logObject);
// Limit log cache size
if (this.log_cache.length > this.log_cache_max_len) {
@@ -93,7 +107,13 @@ export const useCommonStore = defineStore({
}).catch(error => {
console.error('SSE error:', error);
// Attempt to reconnect after a delay
this.log_cache.push('SSE Connection failed, retrying in 5 seconds...');
this.log_cache.push({
type: 'log',
level: 'ERROR',
time: Date.now() / 1000,
data: 'SSE Connection failed, retrying in 5 seconds...',
uuid: 'error-' + Date.now()
});
setTimeout(() => {
this.eventSource = null;
this.createEventSource();