Merge pull request #1116 from AstrBotDevs/feat-log-sse

🏗 refactor: log 通信使用 SSE 替代 Websockets
This commit is contained in:
Soulter
2025-04-02 21:07:40 +08:00
committed by GitHub
4 changed files with 138 additions and 44 deletions
+33 -17
View File
@@ -1,5 +1,6 @@
import asyncio
from quart import websocket
import json
from quart import make_response
from astrbot.core import logger, LogBroker
from .route import Route, RouteContext
@@ -8,21 +9,36 @@ class LogRoute(Route):
def __init__(self, context: RouteContext, log_broker: LogBroker) -> None:
super().__init__(context)
self.log_broker = log_broker
self.app.add_url_rule(
"/api/live-log", view_func=self.log, methods=["GET"], websocket=True
)
self.app.add_url_rule("/api/live-log", view_func=self.log, methods=["GET"])
async def log(self):
queue = None
try:
queue = self.log_broker.register()
while True:
message = await queue.get()
await websocket.send(message)
except asyncio.CancelledError:
pass
except BaseException as e:
logger.error(f"WebSocket 连接错误: {e}")
finally:
if queue:
self.log_broker.unregister(queue)
async def stream():
queue = None
try:
queue = self.log_broker.register()
while True:
message = await queue.get()
payload = {
"type": "log",
"data": message,
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
except asyncio.CancelledError:
pass
except BaseException as e:
logger.error(f"Log SSE 连接错误: {e}")
finally:
if queue:
self.log_broker.unregister(queue)
response = await make_response(
stream(),
{
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Transfer-Encoding": "chunked",
},
)
response.timeout = None
return response
@@ -43,18 +43,36 @@ export default {
}
},
mounted() {
this.historyNum_ = parseInt(this.historyNum)
let i = 0
for (let log of this.logCache) {
if (this.historyNum_ != -1 && i >= this.logCache.length - this.historyNum_) {
this.printLog(log)
++i
} else if (this.historyNum_ == -1) {
this.printLog(log)
}
if (this.logCache.length === 0) {
this.delayInit()
} else {
this.init()
}
},
methods: {
delayInit() {
if (this.logCache.length === 0) {
setTimeout(() => {
this.delayInit()
}, 500)
} else {
this.init()
}
},
init() {
this.historyNum_ = parseInt(this.historyNum)
let i = 0
for (let log of this.logCache) {
if (this.historyNum_ != -1 && i >= this.logCache.length - this.historyNum_) {
this.printLog(log)
++i
} else if (this.historyNum_ == -1) {
this.printLog(log)
}
}
},
toggleAutoScroll() {
this.autoScroll = !this.autoScroll;
},
@@ -184,7 +184,7 @@ function updateDashboard() {
checkUpdate();
const commonStore = useCommonStore();
commonStore.createWebSocket();
commonStore.createEventSource(); // log
commonStore.getStartTime();
+77 -17
View File
@@ -5,8 +5,10 @@ export const useCommonStore = defineStore({
id: 'common',
state: () => ({
// @ts-ignore
websocket: null,
eventSource: null,
log_cache: [],
sse_connected: false,
log_cache_max_len: 1000,
startTime: -1,
@@ -18,28 +20,86 @@ export const useCommonStore = defineStore({
"gewechat": "https://astrbot.app/deploy/platform/gewechat.html",
"lark": "https://astrbot.app/deploy/platform/lark.html",
"telegram": "https://astrbot.app/deploy/platform/telegram.html",
"dingtalk": "https://astrbot.app/deploy/platform/dingtalk.html",
"dingtalk": "https: //astrbot.app/deploy/platform/dingtalk.html",
},
pluginMarketData: []
pluginMarketData: [],
}),
actions: {
createWebSocket() {
if (this.websocket) {
createEventSource() {
if (this.eventSource) {
return
}
let protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
let route = '/api/live-log'
let port = window.location.port
let url = `${protocol}://${window.location.hostname}:${port}${route}`
console.log('websocket url:', url)
this.websocket = new WebSocket(url)
this.websocket.onmessage = (evt) => {
this.log_cache.push(evt.data)
if (this.log_cache.length > this.log_cache_max_len) {
this.log_cache.shift()
const controller = new AbortController();
const { signal } = controller;
const headers = {
'Content-Type': 'multipart/form-data',
'Authorization': 'Bearer ' + localStorage.getItem('token')
};
fetch('/api/live-log', {
method: 'GET',
headers,
signal,
cache: 'no-cache',
}).then(response => {
if (!response.ok) {
throw new Error(`SSE connection failed: ${response.status}`);
}
console.log('SSE stream opened');
this.sse_connected = true;
const reader = response.body.getReader();
const decoder = new TextDecoder();
const processStream = ({ done, value }) => {
if (done) {
console.log('SSE stream closed');
setTimeout(() => {
this.eventSource = null;
this.createEventSource();
}, 2000);
return;
}
const text = decoder.decode(value);
const lines = text.split('\n');
lines.forEach(line => {
if (line.startsWith('data:')) {
const data = line.substring(5).trim();
// {"type":"log","data":"[2021-08-01 00:00:00] INFO: Hello, world!"}
let data_json = JSON.parse(data)
if (data_json.type === 'log') {
let log = data_json.data
this.log_cache.push(log);
if (this.log_cache.length > this.log_cache_max_len) {
this.log_cache.shift();
}
}
}
});
return reader.read().then(processStream);
};
reader.read().then(processStream);
}).catch(error => {
console.error('SSE error:', error);
// Attempt to reconnect after a delay
this.log_cache.push('SSE Connection failed, retrying in 5 seconds...');
setTimeout(() => {
this.eventSource = null;
this.createEventSource();
}, 1000);
});
// Store controller to allow closing the connection
this.eventSource = controller;
},
closeEventSourcet() {
if (this.eventSource) {
this.eventSource.abort();
this.eventSource = null;
}
},
getLogCache() {
@@ -50,7 +110,7 @@ export const useCommonStore = defineStore({
return this.startTime
}
axios.get('/api/stat/start-time').then((res) => {
this.startTime = res.data.data.start_time
this.startTime = res.data.data.start_time
})
},
getTutorialLink(platform) {