diff --git a/src/nostr/messages.rs b/src/nostr/messages.rs index 4cc542f..f7458e2 100644 --- a/src/nostr/messages.rs +++ b/src/nostr/messages.rs @@ -1,5 +1,5 @@ use crate::nostr::NostrEvent; -use log::error; +use log::{debug, error}; use serde_json; use tokio::sync::mpsc; pub struct RelayMessage; @@ -41,7 +41,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send EVENT message (sub_id: {}): {}", sub_id, @@ -57,7 +58,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send EOSE message (sub_id: {}): {}", sub_id, @@ -77,7 +79,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); Err(anyhow!( "Failed to send CLOSED message (sub_id: {}): {}", sub_id, @@ -96,7 +99,8 @@ impl RelayMessage { match to_client_msg_tx.send(msg).await { Ok(_) => Ok(()), Err(e) => { - error!( + // Channel closed 是正常的客户端断连,使用 debug 级别 + debug!( "Failed to send NOTICE message (message: {}): {}", message, e ); diff --git a/src/ws_logic.rs b/src/ws_logic.rs index 5769fdb..f09ecb6 100644 --- a/src/ws_logic.rs +++ b/src/ws_logic.rs @@ -75,13 +75,21 @@ pub async fn handle_ws_connection( Ok(event_msg) = event_rx.recv() => { let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent let conn = client_conn_for_send.read().await; + + // 检查 channel 是否已关闭,如果关闭则立即退出 + if conn.sender.is_closed() { + info!("Client {} channel closed, stopping send task", client_id); + break; + } + for (sub_id, subscription) in &conn.subscriptions { if subscription.filters.iter().any(|filter| filter.matches(&event)) { match RelayMessage::send_event(&event, sub_id, &conn.sender).await { Ok(()) => debug!("Sent event to client {}", client_id), - Err(e) => { - error!("Failed to send event to client {}: {}", client_id, e); - break; + Err(_) => { + // Channel 已关闭,立即退出整个循环 + info!("Client {} channel closed during event send, stopping send task", client_id); + return; }, } } @@ -254,16 +262,29 @@ pub async fn handle_message( ); } + // 检查 channel 是否已关闭 + if to_client_msg_tx.is_closed() { + debug!("Client channel closed before sending historical events for sub_id: {}", sub_id); + return Ok(()); + } + // 查询历史事件并发送 for filter in &filters { let events = filter.select(pool).await?; // 使用 FilterExt for event in events { + // 在发送每个事件前检查 channel 状态,避免大量失败尝试 + if to_client_msg_tx.is_closed() { + debug!("Client channel closed during historical event sending for sub_id: {}", sub_id); + return Ok(()); + } RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?; } } // 发送 EOSE 消息,表示历史事件已发送完毕 - RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?; + if !to_client_msg_tx.is_closed() { + RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?; + } Ok(()) }