From a59235cb8cbc4df6845a69c8ca855fadff87e932 Mon Sep 17 00:00:00 2001 From: laoXong Date: Mon, 20 Oct 2025 22:57:07 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=9B=B4=E4=BC=98=E9=9B=85?= =?UTF-8?q?=E5=9C=B0=E5=A4=84=E7=90=86=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=96=AD?= =?UTF-8?q?=E5=BC=80=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 改进对 WebSocket 客户端断开连接的处理,以防止日志中出现大量错误。 主要变更: - 将关闭 channel 导致的发送失败日志级别从 error 降为 debug,因为客户端断开是正常操作。 - 在尝试发送消息(包括实时事件和历史事件)之前,主动检查客户端 channel 是否已关闭。 - 当检测到 channel 关闭时,及时终止发送任务,以确保资源被迅速清理并防止任务卡死。 --- src/nostr/messages.rs | 14 +++++++++----- src/ws_logic.rs | 29 +++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/nostr/messages.rs b/src/nostr/messages.rs index 4cc542f..f7458e2 100644 --- a/src/nostr/messages.rs +++ b/src/nostr/messages.rs @@ -1,5 +1,5 @@ use crate::nostr::NostrEvent; -use log::error; +use log::{debug, error}; use serde_json; use tokio::sync::mpsc; pub struct RelayMessage; @@ -41,7 +41,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send EVENT message (sub_id: {}): {}", sub_id, @@ -57,7 +58,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send EOSE message (sub_id: {}): {}", sub_id, @@ -77,7 +79,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send CLOSED message (sub_id: {}): {}", sub_id, @@ -96,7 +99,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!( + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!( "Failed to send NOTICE message (message: {}): {}", message, e ); diff --git a/src/ws_logic.rs b/src/ws_logic.rs index 5769fdb..f09ecb6 100644 --- a/src/ws_logic.rs +++ b/src/ws_logic.rs @@ -75,13 +75,21 @@ pub async fn handle_ws_connection( Ok(event_msg) = event_rx.recv() => { let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent let conn = client_conn_for_send.read().await; + + // 检查 channel 是否已关闭,如果关闭则立即退出 + if conn.sender.is_closed() { + info!("Client {} channel closed, stopping send task", client_id); + break; + } + for (sub_id, subscription) in &conn.subscriptions { if subscription.filters.iter().any(|filter| filter.matches(&event)) { match RelayMessage::send_event(&event, sub_id, &conn.sender).await { Ok(()) => debug!("Sent event to client {}", client_id), - Err(e) => { - error!("Failed to send event to client {}: {}", client_id, e); - break; + Err(_) => { + // Channel 已关闭,立即退出整个循环 + info!("Client {} channel closed during event send, stopping send task", client_id); + return; }, } } @@ -254,16 +262,29 @@ pub async fn handle_message( ); } + // 检查 channel 是否已关闭 + if to_client_msg_tx.is_closed() { + debug!("Client channel closed before sending historical events for sub_id: {}", sub_id); + return Ok(()); + } + // 查询历史事件并发送 for filter in &filters { let events = filter.select(pool).await?; // 使用 FilterExt for event in events { + // 在发送每个事件前检查 channel 状态,避免大量失败尝试 + if to_client_msg_tx.is_closed() { + debug!("Client channel closed during historical event sending for sub_id: {}", sub_id); + return Ok(()); + } RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?; } } // 发送 EOSE 消息,表示历史事件已发送完毕 - RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?; + if !to_client_msg_tx.is_closed() { + RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?; + } Ok(()) }