refactor: 更优雅地处理客户端断开连接
Build and Push Docker Image / build_docker_image (push) Successful in 14m39s

改进对 WebSocket 客户端断开连接的处理,以防止日志中出现大量错误。

主要变更:
- 将关闭 channel 导致的发送失败日志级别从 error 降为 debug,因为客户端断开是正常操作。
- 在尝试发送消息(包括实时事件和历史事件)之前,主动检查客户端 channel 是否已关闭。
- 当检测到 channel 关闭时,及时终止发送任务,以确保资源被迅速清理并防止任务卡死。
This commit is contained in:
2025-10-20 22:57:07 +09:00
parent b64ec550e0
commit a59235cb8c
2 changed files with 34 additions and 9 deletions
+9 -5
View File
@@ -1,5 +1,5 @@
use crate::nostr::NostrEvent; use crate::nostr::NostrEvent;
use log::error; use log::{debug, error};
use serde_json; use serde_json;
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub struct RelayMessage; pub struct RelayMessage;
@@ -41,7 +41,8 @@ impl RelayMessage {
match to_client_msg_tx.send(msg).await { match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); // Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
Err(anyhow!( Err(anyhow!(
"Failed to send EVENT message (sub_id: {}): {}", "Failed to send EVENT message (sub_id: {}): {}",
sub_id, sub_id,
@@ -57,7 +58,8 @@ impl RelayMessage {
match to_client_msg_tx.send(msg).await { match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); // Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
Err(anyhow!( Err(anyhow!(
"Failed to send EOSE message (sub_id: {}): {}", "Failed to send EOSE message (sub_id: {}): {}",
sub_id, sub_id,
@@ -77,7 +79,8 @@ impl RelayMessage {
match to_client_msg_tx.send(msg).await { match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); // Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
Err(anyhow!( Err(anyhow!(
"Failed to send CLOSED message (sub_id: {}): {}", "Failed to send CLOSED message (sub_id: {}): {}",
sub_id, sub_id,
@@ -96,7 +99,8 @@ impl RelayMessage {
match to_client_msg_tx.send(msg).await { match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
error!( // Channel closed 是正常的客户端断连,使用 debug 级别
debug!(
"Failed to send NOTICE message (message: {}): {}", "Failed to send NOTICE message (message: {}): {}",
message, e message, e
); );
+25 -4
View File
@@ -75,13 +75,21 @@ pub async fn handle_ws_connection(
Ok(event_msg) = event_rx.recv() => { Ok(event_msg) = event_rx.recv() => {
let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent
let conn = client_conn_for_send.read().await; let conn = client_conn_for_send.read().await;
// 检查 channel 是否已关闭,如果关闭则立即退出
if conn.sender.is_closed() {
info!("Client {} channel closed, stopping send task", client_id);
break;
}
for (sub_id, subscription) in &conn.subscriptions { for (sub_id, subscription) in &conn.subscriptions {
if subscription.filters.iter().any(|filter| filter.matches(&event)) { if subscription.filters.iter().any(|filter| filter.matches(&event)) {
match RelayMessage::send_event(&event, sub_id, &conn.sender).await { match RelayMessage::send_event(&event, sub_id, &conn.sender).await {
Ok(()) => debug!("Sent event to client {}", client_id), Ok(()) => debug!("Sent event to client {}", client_id),
Err(e) => { Err(_) => {
error!("Failed to send event to client {}: {}", client_id, e); // Channel 已关闭,立即退出整个循环
break; info!("Client {} channel closed during event send, stopping send task", client_id);
return;
}, },
} }
} }
@@ -254,16 +262,29 @@ pub async fn handle_message(
); );
} }
// 检查 channel 是否已关闭
if to_client_msg_tx.is_closed() {
debug!("Client channel closed before sending historical events for sub_id: {}", sub_id);
return Ok(());
}
// 查询历史事件并发送 // 查询历史事件并发送
for filter in &filters { for filter in &filters {
let events = filter.select(pool).await?; // 使用 FilterExt let events = filter.select(pool).await?; // 使用 FilterExt
for event in events { for event in events {
// 在发送每个事件前检查 channel 状态,避免大量失败尝试
if to_client_msg_tx.is_closed() {
debug!("Client channel closed during historical event sending for sub_id: {}", sub_id);
return Ok(());
}
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?; RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?;
} }
} }
// 发送 EOSE 消息,表示历史事件已发送完毕 // 发送 EOSE 消息,表示历史事件已发送完毕
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?; if !to_client_msg_tx.is_closed() {
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?;
}
Ok(()) Ok(())
} }