feat: implement log history retrieval and improve log streaming handling (#2190)
This commit is contained in:
@@ -96,8 +96,6 @@ class LogBroker:
|
||||
Queue: 订阅者的队列, 可用于接收日志消息
|
||||
"""
|
||||
q = Queue(maxsize=CACHED_SIZE + 10)
|
||||
for log in self.log_cache:
|
||||
q.put_nowait(log)
|
||||
self.subscribers.append(q)
|
||||
return q
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import asyncio
|
||||
import json
|
||||
from quart import make_response
|
||||
from astrbot.core import logger, LogBroker
|
||||
from .route import Route, RouteContext
|
||||
from .route import Route, RouteContext, Response
|
||||
|
||||
|
||||
class LogRoute(Route):
|
||||
@@ -10,6 +10,7 @@ class LogRoute(Route):
|
||||
super().__init__(context)
|
||||
self.log_broker = log_broker
|
||||
self.app.add_url_rule("/api/live-log", view_func=self.log, methods=["GET"])
|
||||
self.app.add_url_rule("/api/log-history", view_func=self.log_history, methods=["GET"])
|
||||
|
||||
async def log(self):
|
||||
async def stream():
|
||||
@@ -23,7 +24,6 @@ class LogRoute(Route):
|
||||
**message, # see astrbot/core/log.py
|
||||
}
|
||||
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
||||
await asyncio.sleep(0.07) # 控制发送频率,避免过快
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except BaseException as e:
|
||||
@@ -43,3 +43,14 @@ class LogRoute(Route):
|
||||
)
|
||||
response.timeout = None
|
||||
return response
|
||||
|
||||
async def log_history(self):
|
||||
"""获取日志历史"""
|
||||
try:
|
||||
logs = list(self.log_broker.log_cache)
|
||||
return Response().ok(data={
|
||||
"logs": logs,
|
||||
}).__dict__
|
||||
except BaseException as e:
|
||||
logger.error(f"获取日志历史失败: {e}")
|
||||
return Response().error(f"获取日志历史失败: {e}").__dict__
|
||||
|
||||
@@ -15,7 +15,22 @@ export const useCommonStore = defineStore({
|
||||
pluginMarketData: [],
|
||||
}),
|
||||
actions: {
|
||||
createEventSource() {
|
||||
async createEventSource() {
|
||||
|
||||
const fetchLogHistory = async () => {
|
||||
try {
|
||||
const res = await axios.get('/api/log-history');
|
||||
if (res.data.data.logs) {
|
||||
this.log_cache.push(...res.data.data.logs);
|
||||
} else {
|
||||
this.log_cache = [];
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to fetch log history:', err);
|
||||
}
|
||||
};
|
||||
await fetchLogHistory();
|
||||
|
||||
if (this.eventSource) {
|
||||
return
|
||||
}
|
||||
@@ -40,7 +55,24 @@ export const useCommonStore = defineStore({
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
let incompleteLine = ""; // 用于存储不完整的行
|
||||
|
||||
const handleIncompleteLine = (line) => {
|
||||
incompleteLine += line;
|
||||
// if can parse as JSON, return it
|
||||
try {
|
||||
const data_json = JSON.parse(incompleteLine);
|
||||
incompleteLine = ""; // 清空不完整行
|
||||
return data_json;
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const processStream = ({ done, value }) => {
|
||||
// get bytes length
|
||||
const bytesLength = value ? value.byteLength : 0;
|
||||
console.log(`Received ${bytesLength} bytes from live log`);
|
||||
if (done) {
|
||||
console.log('SSE stream closed');
|
||||
setTimeout(() => {
|
||||
@@ -53,6 +85,9 @@ export const useCommonStore = defineStore({
|
||||
const text = decoder.decode(value);
|
||||
const lines = text.split('\n\n');
|
||||
lines.forEach(line => {
|
||||
if (!line.trim()) {
|
||||
return;
|
||||
}
|
||||
if (line.startsWith('data:')) {
|
||||
const data = line.substring(5).trim();
|
||||
// {"type":"log","data":"[2021-08-01 00:00:00] INFO: Hello, world!"}
|
||||
@@ -60,21 +95,29 @@ export const useCommonStore = defineStore({
|
||||
try {
|
||||
data_json = JSON.parse(data);
|
||||
} catch (e) {
|
||||
console.error('Invalid JSON:', data);
|
||||
data_json = {
|
||||
type: 'log',
|
||||
data: data,
|
||||
level: 'INFO',
|
||||
time: new Date().toISOString(),
|
||||
console.warn('Invalid JSON:', data);
|
||||
// 尝试处理不完整的行
|
||||
const parsedData = handleIncompleteLine(data);
|
||||
if (parsedData) {
|
||||
data_json = parsedData;
|
||||
} else {
|
||||
return; // 如果无法解析,跳过当前行
|
||||
}
|
||||
}
|
||||
if (data_json.type === 'log') {
|
||||
// let log = data_json.data
|
||||
this.log_cache.push(data_json);
|
||||
if (this.log_cache.length > this.log_cache_max_len) {
|
||||
this.log_cache.shift();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const parsedData = handleIncompleteLine(line);
|
||||
if (parsedData && parsedData.type === 'log') {
|
||||
this.log_cache.push(parsedData);
|
||||
if (this.log_cache.length > this.log_cache_max_len) {
|
||||
this.log_cache.shift();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return reader.read().then(processStream);
|
||||
|
||||
Reference in New Issue
Block a user