Compare commits
16 Commits
604603785e
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 6cc7346522 | |||
| d70fdbd037 | |||
| 355ea8963b | |||
| 61924823a6 | |||
| e8244a50e9 | |||
| 98aa3060ce | |||
| e09676582d | |||
| 7dbca66b2f | |||
| a59235cb8c | |||
| b64ec550e0 | |||
| 7ac67102ef | |||
| 70da823c43 | |||
| e9bfb2b667 | |||
| d7de8b8f8f | |||
| fb7a633d14 | |||
| 8ba270156a |
@@ -34,7 +34,7 @@ jobs:
|
||||
id: meta
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ secrets.DOCKER_USERNAME }}/nostr_demo # 你的 Docker Hub 仓库名称,替换为你的用户名
|
||||
images: ${{ secrets.DOCKER_USERNAME }}/nostr
|
||||
tags: |
|
||||
type=schedule
|
||||
type=ref,event=branch
|
||||
|
||||
@@ -22,3 +22,4 @@ target/
|
||||
.idea/
|
||||
.vscode/
|
||||
.zed/
|
||||
.env
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "SELECT * FROM deleted_events WHERE event_id = ? AND pubkey = ?",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "event_id",
|
||||
"ordinal": 0,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "pubkey",
|
||||
"ordinal": 1,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "kind",
|
||||
"ordinal": 2,
|
||||
"type_info": "Integer"
|
||||
},
|
||||
{
|
||||
"name": "d_tag",
|
||||
"ordinal": 3,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "deleted_at",
|
||||
"ordinal": 4,
|
||||
"type_info": "Integer"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": [
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
]
|
||||
},
|
||||
"hash": "20cd6a8251512848aaf6b0a5ea721da5d37b0d946f7a0e083afaa9e8fb296e3f"
|
||||
}
|
||||
Generated
+1
-1
@@ -891,7 +891,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nostr-relay"
|
||||
version = "0.0.4"
|
||||
version = "1.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"env_logger",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nostr-relay"
|
||||
version = "0.0.4"
|
||||
version = "1.0.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -3,9 +3,44 @@ This is a nostr relay running on [rust](https://www.rust-lang.org/) and [tokio](
|
||||
A Rust learning project with poor code quality.
|
||||
|
||||
# Usage
|
||||
Run `cargo run` whit environment variable `DB_PATH` set to the path of the database file.
|
||||
## Running Locally
|
||||
Run `cargo run` with environment variable `DB_PATH` set to the path of the database file.
|
||||
`RUST_LOG=info BIND_ADDR="0.0.0.0:8080" DB_PATH=./nostr.db AUTH_REQUIRED=True cargo run`
|
||||
|
||||
## Docker
|
||||
|
||||
# Run with a persistent database volume
|
||||
`docker run -p 8080:8080 -v /path/to/your/db:/usr/local/bin/nostr.db laoxong/nostr_demo:master`
|
||||
|
||||
`docker run -p 8080:8080 -e AUTH_REQUIRED=true laoxong/nostr_demo:master`
|
||||
|
||||
|
||||
## Docker Compose
|
||||
```
|
||||
nostr-relay:
|
||||
image: laoxong/nostr_demo:master
|
||||
container_name: nostr-relay
|
||||
volumes:
|
||||
- ./nostr-relay/:/etc/nostr
|
||||
environment:
|
||||
- DB_PATH=/etc/nostr/nostr.db
|
||||
- AUTH_REQUIRED=False
|
||||
```
|
||||
|
||||
# Env
|
||||
- DB_PATH: Path to the SQLite database file. Defaults to `nostr.db`.
|
||||
|
||||
- BIND_ADDR: The address and port to bind the server to. Defaults to `0.0.0.0:8080`.
|
||||
|
||||
- RUST_LOG: Log level. Defaults to `warn`
|
||||
|
||||
- AUTH_REQUIRED: Set to `true` to enable authentication for publishing events (NIP-42). Defaults to `false`. If enabled, only authenticated clients whose pubkey is in the relay's trust list can publish events.
|
||||
|
||||
# Features
|
||||
- NIP-1
|
||||
- NIP-2
|
||||
- NIP-5
|
||||
- NIP-9
|
||||
- NIP-12
|
||||
- NIP-42
|
||||
- NIP-65
|
||||
|
||||
+18
-9
@@ -6,15 +6,18 @@ pub const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080";
|
||||
pub const DEFAULT_DB_PATH: &str = "nostr.db";
|
||||
pub const MAX_EVENT_TAGS: u32 = 5000;
|
||||
pub const MAX_LIMIT: u64 = 500;
|
||||
pub const DEFAULT_LIMIT: u64 = 10;
|
||||
pub const DEFAULT_LIMIT: u64 = 50;
|
||||
pub const MAX_FILTERS_PER_REQ: usize = 100;
|
||||
pub const BROADCAST_CHANNEL_SIZE: usize = 100;
|
||||
pub const CLIENT_CHANNEL_SIZE: usize = 32;
|
||||
pub const MAX_SUBSCRIPTIONS: usize = 20;
|
||||
pub static RELAY_URL: Lazy<String> =
|
||||
Lazy::new(|| env::var("RELAY_URL").unwrap_or_else(|_| "".to_string()));
|
||||
|
||||
pub static SERVER_INFO: Lazy<ServerInfo> = Lazy::new(|| ServerInfo {
|
||||
contact: "https://www.moec.top/",
|
||||
description: "Powered by laoXong.",
|
||||
contact: env::var("RELAY_CONTACT").unwrap_or_else(|_| "https://www.moec.top/".to_string()),
|
||||
description: env::var("RELAY_DESCRIPTION")
|
||||
.unwrap_or_else(|_| "Powered by laoXong.".to_string()),
|
||||
limitation: Limitation {
|
||||
max_event_tags: MAX_EVENT_TAGS,
|
||||
max_event_time_newer_than_now: 900,
|
||||
@@ -26,10 +29,16 @@ pub static SERVER_INFO: Lazy<ServerInfo> = Lazy::new(|| ServerInfo {
|
||||
max_subscriptions: MAX_SUBSCRIPTIONS as u32, // usize to u32 cast
|
||||
min_prefix: 10,
|
||||
},
|
||||
name: "A rust nostr relay by laoXong",
|
||||
pubkey: "63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6",
|
||||
software: "https://git.moe.gift/laoxong/nostr-relay",
|
||||
supported_nips: vec![1, 2, 5, 42, 65],
|
||||
version: env!("CARGO_PKG_VERSION"), // 从 Cargo.toml 获取版本
|
||||
auth_required: env::var("AUTH_REQUIRED").unwrap_or_else(|_| "False".to_string()) == "True",
|
||||
name: env::var("RELAY_NAME").unwrap_or_else(|_| "A rust nostr relay by laoXong".to_string()),
|
||||
pubkey: env::var("RELAY_PUBKEY").unwrap_or_else(|_| {
|
||||
"63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6".to_string()
|
||||
}),
|
||||
software: env::var("RELAY_SOFTWARE")
|
||||
.unwrap_or_else(|_| "https://git.moe.gift/laoxong/nostr-relay".to_string()),
|
||||
supported_nips: vec![1, 2, 5, 9, 11, 42, 50, 65],
|
||||
version: env!("CARGO_PKG_VERSION").to_string(), // 从 Cargo.toml 获取版本
|
||||
auth_required: env::var("AUTH_REQUIRED")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.to_lowercase()
|
||||
== "true",
|
||||
});
|
||||
|
||||
+30
-2
@@ -4,7 +4,6 @@ use anyhow::Result;
|
||||
use log::error;
|
||||
use sqlx::sqlite::SqlitePool;
|
||||
use sqlx::{Row, query};
|
||||
use std::collections::HashSet;
|
||||
|
||||
pub async fn init_database(pool: &SqlitePool) -> Result<()> {
|
||||
let create_events_table = r#"
|
||||
@@ -16,19 +15,48 @@ pub async fn init_database(pool: &SqlitePool) -> Result<()> {
|
||||
tags TEXT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
sig TEXT NOT NULL,
|
||||
d_tag TEXT,
|
||||
indexed_at INTEGER DEFAULT (unixepoch())
|
||||
);
|
||||
"#;
|
||||
|
||||
let create_deleted_events_table = r#"
|
||||
CREATE TABLE IF NOT EXISTS deleted_events (
|
||||
event_id TEXT,
|
||||
pubkey TEXT NOT NULL,
|
||||
kind INTEGER,
|
||||
d_tag TEXT,
|
||||
deleted_at INTEGER DEFAULT (unixepoch())
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event_tags (
|
||||
event_id TEXT NOT NULL,
|
||||
tag_name TEXT NOT NULL,
|
||||
tag_value TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
FOREIGN KEY(event_id) REFERENCES events(id) ON DELETE CASCADE
|
||||
);
|
||||
"#;
|
||||
|
||||
let create_indexes = vec![
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_kind_created_at_desc ON events(kind, created_at DESC);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind_d_tag ON events(pubkey, kind, d_tag);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_deleted_events_pubkey ON deleted_events(pubkey, event_id);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_deleted_events_kind_d_tag ON deleted_events(kind, pubkey, d_tag);",
|
||||
"CREATE INDEX IF NOT EXISTS idx_event_tags_name_value_created ON event_tags(tag_name, tag_value, created_at DESC);",
|
||||
];
|
||||
|
||||
query(create_events_table).execute(pool).await?;
|
||||
// 尝试添加 d_tag 列,如果已存在则忽略错误
|
||||
let _ = query("ALTER TABLE events ADD COLUMN d_tag TEXT DEFAULT ''")
|
||||
.execute(pool)
|
||||
.await;
|
||||
|
||||
query(create_deleted_events_table).execute(pool).await?;
|
||||
|
||||
for index_sql in create_indexes {
|
||||
query(index_sql).execute(pool).await?;
|
||||
@@ -40,7 +68,7 @@ pub async fn init_database(pool: &SqlitePool) -> Result<()> {
|
||||
|
||||
// 获取信任账户列表
|
||||
pub async fn get_trust_accounts(pool: &SqlitePool) -> Vec<String> {
|
||||
let pubkey = SERVER_INFO.pubkey; // 获取服务器公钥
|
||||
let pubkey = &SERVER_INFO.pubkey; // 获取服务器公钥
|
||||
// 查询最新的 kind 3 事件(联系人列表),获取其中的 p 标签作为信任账户
|
||||
let sql =
|
||||
"SELECT tags FROM events WHERE kind = 3 AND pubkey = ? ORDER BY created_at DESC LIMIT 1";
|
||||
|
||||
+1
-2
@@ -1,6 +1,5 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use anyhow::Result;
|
||||
use httparse::{EMPTY_HEADER, Request};
|
||||
use log::{error, info};
|
||||
use sqlx::SqlitePool;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
+13
-3
@@ -11,6 +11,7 @@ use log::info;
|
||||
use sqlx::sqlite::SqlitePool;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::{RwLock, broadcast};
|
||||
@@ -27,9 +28,18 @@ async fn main() -> Result<()> {
|
||||
|
||||
// 数据库初始化
|
||||
let db_path = env::var("DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string());
|
||||
let db_url = format!("sqlite://{}", db_path);
|
||||
let pool = SqlitePool::connect(&db_url).await?;
|
||||
info!("Database pool connected successfully to {}", db_url);
|
||||
let pool = sqlx::sqlite::SqlitePoolOptions::new()
|
||||
.max_connections(100)
|
||||
.acquire_timeout(std::time::Duration::from_secs(30))
|
||||
.connect_with(
|
||||
sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true)
|
||||
.foreign_keys(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal),
|
||||
)
|
||||
.await?;
|
||||
info!("Database pool connected successfully to {}", db_path);
|
||||
database::init_database(&pool).await?; // 使用 database 模块的函数
|
||||
let pool = Arc::new(pool); // 将 SqlitePool 包裹在 Arc 中,以便多线程共享
|
||||
info!("Connected to SQLite");
|
||||
|
||||
+7
-7
@@ -1,5 +1,5 @@
|
||||
use crate::nostr::Filter;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
@@ -7,14 +7,14 @@ use uuid::Uuid;
|
||||
// 服务器信息结构体(用于 NIP-11)
|
||||
#[derive(Serialize)]
|
||||
pub struct ServerInfo {
|
||||
pub contact: &'static str,
|
||||
pub description: &'static str,
|
||||
pub contact: String,
|
||||
pub description: String,
|
||||
pub limitation: Limitation,
|
||||
pub name: &'static str,
|
||||
pub pubkey: &'static str,
|
||||
pub software: &'static str,
|
||||
pub name: String,
|
||||
pub pubkey: String,
|
||||
pub software: String,
|
||||
pub supported_nips: Vec<u32>,
|
||||
pub version: &'static str,
|
||||
pub version: String,
|
||||
pub auth_required: bool,
|
||||
}
|
||||
|
||||
|
||||
+260
-48
@@ -1,18 +1,19 @@
|
||||
use anyhow::Result;
|
||||
use log::{debug, error, info};
|
||||
use log::{debug, info};
|
||||
use secp256k1::{
|
||||
Message as SecpMessage, Secp256k1, XOnlyPublicKey, schnorr::Signature as SchnorrSignature,
|
||||
};
|
||||
use serde_json::{Value, json};
|
||||
use serde_json::json;
|
||||
use sha2::{Digest, Sha256};
|
||||
use sqlx::SqlitePool;
|
||||
use std::{
|
||||
result,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
|
||||
use crate::constants::SERVER_INFO;
|
||||
use crate::constants::{RELAY_URL, SERVER_INFO};
|
||||
use crate::models::ClientConnection;
|
||||
use crate::nostr::NostrEvent;
|
||||
use crate::nostr::messages::RelayMessage;
|
||||
@@ -20,7 +21,7 @@ use crate::nostr::messages::RelayMessage;
|
||||
pub trait NostrEventExt {
|
||||
fn serialize_for_id(&self) -> String; // 用于计算事件 ID 的序列化
|
||||
fn verify(&self) -> bool; // 验证 ID、时间戳、标签数量和签名
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error>; // 保存事件到数据库
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), anyhow::Error>; // 保存事件到数据库
|
||||
async fn handle_auth_event(
|
||||
&self,
|
||||
client_conn: &Arc<RwLock<ClientConnection>>,
|
||||
@@ -31,7 +32,7 @@ pub trait NostrEventExt {
|
||||
impl NostrEventExt for NostrEvent {
|
||||
// 根据 Nostr 协议规则序列化事件,用于计算事件 ID
|
||||
fn serialize_for_id(&self) -> String {
|
||||
let serialized = json!([
|
||||
json!([
|
||||
0, // 用于 ID 计算的协议版本,Nostr 当前为 0
|
||||
self.pubkey,
|
||||
self.created_at,
|
||||
@@ -39,8 +40,7 @@ impl NostrEventExt for NostrEvent {
|
||||
self.tags,
|
||||
self.content
|
||||
])
|
||||
.to_string();
|
||||
serialized
|
||||
.to_string()
|
||||
}
|
||||
|
||||
// 验证事件的有效性(ID、时间戳、标签数量、签名)
|
||||
@@ -90,6 +90,11 @@ impl NostrEventExt for NostrEvent {
|
||||
);
|
||||
return false;
|
||||
}
|
||||
// 验证只有一个e标签
|
||||
if self.tags.iter().filter(|tag| tag[0] == "e").count() > 1 {
|
||||
debug!("Event has more than one e tag");
|
||||
return false;
|
||||
}
|
||||
|
||||
// 4. 解析公钥:从十六进制字符串解析 XOnlyPublicKey
|
||||
let pubkey_bytes: Vec<u8> = match hex::decode(&self.pubkey) {
|
||||
@@ -175,8 +180,17 @@ impl NostrEventExt for NostrEvent {
|
||||
}
|
||||
|
||||
// 保存事件到数据库,并处理可替换事件和删除事件
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), anyhow::Error> {
|
||||
let tags_json = serde_json::to_string(&self.tags).unwrap();
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
// 提取 d_tag
|
||||
let d_tag = self
|
||||
.tags
|
||||
.iter()
|
||||
.find(|tag| tag.len() >= 2 && tag[0] == "d")
|
||||
.map(|tag| tag[1].clone())
|
||||
.unwrap_or_default(); // 默认为空字符串
|
||||
|
||||
match self.kind {
|
||||
// NIP-09 事件删除 (Event Deletion)
|
||||
@@ -189,65 +203,244 @@ impl NostrEventExt for NostrEvent {
|
||||
"Attempting to delete event with ID: {} by request of pubkey {}",
|
||||
event_id_to_delete, self.pubkey
|
||||
);
|
||||
let sql = "DELETE FROM events WHERE id = ? AND pubkey = ?";
|
||||
let result = sqlx::query(sql)
|
||||
.bind(event_id_to_delete)
|
||||
.bind(&self.pubkey)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
let result =
|
||||
sqlx::query("DELETE FROM events WHERE id = ? AND pubkey = ?")
|
||||
.bind(event_id_to_delete)
|
||||
.bind(&self.pubkey)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
if result.rows_affected() > 0 {
|
||||
info!(
|
||||
"Deleted event {} by pubkey (kind 5): {}",
|
||||
event_id_to_delete, self.pubkey
|
||||
);
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO deleted_events (event_id, pubkey, deleted_at) VALUES (?, ?, ?)",
|
||||
)
|
||||
.bind(event_id_to_delete)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.created_at as i64)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
if result.rows_affected() > 0 {
|
||||
info!(
|
||||
"Deleted event {} by pubkey (kind 5): {}",
|
||||
event_id_to_delete, self.pubkey
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Could not delete event {} for pubkey {}.",
|
||||
event_id_to_delete, self.pubkey
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"Could not delete event {} for pubkey {}. It might not exist or unauthorized.",
|
||||
"Could not delete event {} for pubkey {}.",
|
||||
event_id_to_delete, self.pubkey
|
||||
);
|
||||
}
|
||||
} else if tag.get(0).map(|s| s == "a").unwrap_or(false) {
|
||||
let d_tag_value = &tag[1];
|
||||
let tag_d_vector = d_tag_value.splitn(3, ':').collect::<Vec<&str>>();
|
||||
if tag_d_vector.len() != 3 {
|
||||
debug!("Invalid a-tag format: {}", d_tag_value);
|
||||
continue;
|
||||
}
|
||||
let result = sqlx::query(
|
||||
"DELETE FROM events WHERE kind = ? AND pubkey = ? AND d_tag = ? AND created_at <= ?;",
|
||||
)
|
||||
.bind(tag_d_vector[0].parse::<i64>().unwrap())
|
||||
.bind(&self.pubkey)
|
||||
.bind(tag_d_vector[2])
|
||||
.bind(self.created_at as i64)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
if result.rows_affected() > 0 {
|
||||
info!(
|
||||
"Deleted event {:?} by pubkey (kind 5): {}",
|
||||
tag_d_vector, self.pubkey
|
||||
);
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO deleted_events (kind, pubkey, d_tag, deleted_at) VALUES (?, ?, ?, ?)",
|
||||
)
|
||||
.bind(tag_d_vector[0])
|
||||
.bind(&self.pubkey)
|
||||
.bind(tag_d_vector[2])
|
||||
.bind(self.created_at as i64)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
if result.rows_affected() > 0 {
|
||||
info!(
|
||||
"Deleted event {:?} by pubkey (kind 5): {}",
|
||||
tag_d_vector, self.pubkey
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Could not delete event {:?} for pubkey {}.",
|
||||
tag_d_vector, self.pubkey
|
||||
);
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"Could not delete event {:?} for pubkey {}.",
|
||||
tag_d_vector, self.pubkey
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// NIP-01 可替换事件 (Replaceable Events): kind 0 (Metadata), kind 3 (Contact List)
|
||||
// NIP-16 可替换事件 (Replaceable Events): kinds 10000 to 19999
|
||||
// NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999
|
||||
// 对于所有这些可替换事件,新事件会替换掉旧事件。
|
||||
// 这里的简化处理是,对于所有可替换事件,都基于 (pubkey, kind) 来删除旧事件。
|
||||
// 对于 NIP-33 事件,严格来说还需要匹配 'd' tag。
|
||||
// 但考虑到数据库操作的复杂性以及原始代码的实现,这里仍采用 (pubkey, kind) 的方式。
|
||||
// 一个更健壮的 NIP-33 实现可能需要单独的字段来存储 'd' tag 或更复杂的 SQL。
|
||||
0 | 3 | _
|
||||
if (self.kind >= 10000 && self.kind < 20000)
|
||||
|| (self.kind >= 30000 && self.kind < 40000) =>
|
||||
{
|
||||
// 对于这些可替换事件,新事件会替换掉旧事件。
|
||||
0 | 3 | _ if (self.kind >= 10000 && self.kind < 20000) => {
|
||||
debug!(
|
||||
"Attempting to delete previous replaceable event for pubkey: {}, kind: {}",
|
||||
self.pubkey, self.kind
|
||||
);
|
||||
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ?";
|
||||
sqlx::query(sql)
|
||||
let is_deleted = sqlx::query(
|
||||
"SELECT 1 FROM deleted_events WHERE pubkey = ? AND kind = ? AND deleted_at > ?",
|
||||
)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.bind(self.created_at as i64)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
if is_deleted.is_some() {
|
||||
return Err(anyhow::anyhow!("Event already deleted"));
|
||||
}
|
||||
let sql = "SELECT created_at FROM events WHERE pubkey = ? AND kind = ? ORDER BY created_at DESC LIMIT 1";
|
||||
let created_at: Option<u64> = sqlx::query_scalar(sql)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.execute(pool)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
if let Some(prev_created_at) = created_at {
|
||||
if prev_created_at <= self.created_at {
|
||||
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ?";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
debug!(
|
||||
"Deleted previous replaceable event for pubkey: {}, kind: {}",
|
||||
self.pubkey, self.kind
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Previous replaceable event for pubkey: {}, kind: {} is not older than current event",
|
||||
self.pubkey, self.kind
|
||||
);
|
||||
// 如果新事件不比旧事件新,则报错
|
||||
anyhow::bail!("Event is not newer than existing replaceable event");
|
||||
}
|
||||
}
|
||||
}
|
||||
// NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999
|
||||
_ if (self.kind >= 30000 && self.kind < 40000) => {
|
||||
debug!(
|
||||
"Attempting to delete previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {}",
|
||||
self.pubkey, self.kind, d_tag
|
||||
);
|
||||
let result = sqlx::query(
|
||||
"SELECT * FROM deleted_events WHERE pubkey = ? AND kind = ? AND d_tag = ? AND deleted_at > ?",
|
||||
)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.bind(&d_tag)
|
||||
.bind(self.created_at as i64)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if let Some(_) = result {
|
||||
debug!(
|
||||
"Event {} already deleted by pubkey {}",
|
||||
self.id, self.pubkey
|
||||
);
|
||||
return Err(anyhow::anyhow!("Event already deleted"));
|
||||
}
|
||||
let sql = "SELECT created_at FROM events WHERE pubkey = ? AND kind = ? AND d_tag = ? ORDER BY created_at DESC LIMIT 1";
|
||||
let created_at: Option<u64> = sqlx::query_scalar(sql)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.bind(&d_tag)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
if let Some(prev_created_at) = created_at {
|
||||
if prev_created_at < self.created_at {
|
||||
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ? AND d_tag = ?";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.kind)
|
||||
.bind(&d_tag)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
debug!(
|
||||
"Deleted previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {}",
|
||||
self.pubkey, self.kind, d_tag
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
"Previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {} is not older than current event",
|
||||
self.pubkey, self.kind, d_tag
|
||||
);
|
||||
anyhow::bail!(
|
||||
"Event is not newer than existing parameterized replaceable event"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => { /* 非可替换事件不需要在插入前删除 */ }
|
||||
}
|
||||
|
||||
// 插入新事件
|
||||
let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.created_at as i64)
|
||||
.bind(self.kind)
|
||||
.bind(tags_json)
|
||||
.bind(&self.content)
|
||||
.bind(&self.sig)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
if !(self.kind >= 20000 && self.kind < 30000) {
|
||||
let result =
|
||||
sqlx::query("SELECT * FROM deleted_events WHERE event_id = ? AND pubkey = ?")
|
||||
.bind(&self.id)
|
||||
.bind(&self.pubkey)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if let Some(_) = result {
|
||||
debug!(
|
||||
"Event {} already deleted by pubkey {}",
|
||||
self.id, self.pubkey
|
||||
);
|
||||
return Err(anyhow::anyhow!("Event already deleted"));
|
||||
}
|
||||
let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig, d_tag) VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.created_at as i64)
|
||||
.bind(self.kind)
|
||||
.bind(tags_json)
|
||||
.bind(&self.content)
|
||||
.bind(&self.sig)
|
||||
.bind(&d_tag)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
for tag in &self.tags {
|
||||
if tag.len() >= 2 {
|
||||
let tag_name = &tag[0];
|
||||
let tag_value = &tag[1];
|
||||
if tag_name.len() == 1 {
|
||||
sqlx::query("INSERT INTO event_tags (event_id, tag_name, tag_value, created_at) VALUES (?, ?, ?, ?)")
|
||||
.bind(&self.id)
|
||||
.bind(tag_name)
|
||||
.bind(tag_value)
|
||||
.bind(self.created_at as i64)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -259,7 +452,7 @@ impl NostrEventExt for NostrEvent {
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// 1. 验证事件类型是否为 NIP-42 认证事件 (kind 22242)
|
||||
if self.kind != 22242 {
|
||||
RelayMessage::send_ok(
|
||||
let _ = RelayMessage::send_ok(
|
||||
&self.id,
|
||||
false,
|
||||
"AUTH event must be kind 22242".to_string(), // 根据协议,OK 消息应包含事件 ID
|
||||
@@ -271,8 +464,11 @@ impl NostrEventExt for NostrEvent {
|
||||
|
||||
// 2. 验证事件签名是否有效
|
||||
if !self.verify() {
|
||||
RelayMessage::send_notice("Invalid AUTH event signature".to_string(), to_client_msg_tx)
|
||||
.await;
|
||||
let _ = RelayMessage::send_notice(
|
||||
"Invalid AUTH event signature".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -293,7 +489,7 @@ impl NostrEventExt for NostrEvent {
|
||||
let challenge = match challenge {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
RelayMessage::send_notice(
|
||||
let _ = RelayMessage::send_notice(
|
||||
"AUTH event missing challenge tag".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
@@ -302,10 +498,23 @@ impl NostrEventExt for NostrEvent {
|
||||
}
|
||||
};
|
||||
|
||||
let relay_url = match relay_url {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
let _ = RelayMessage::send_notice(
|
||||
"AUTH event missing relay tag".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
// 4. 验证 challenge 是否匹配客户端连接 ID,并且未过期
|
||||
let is_valid_challenge = {
|
||||
let conn = client_conn.read().await;
|
||||
if conn.id.to_string() == *challenge {
|
||||
if relay_url.as_str() != RELAY_URL.as_str() {
|
||||
false
|
||||
} else if conn.id.to_string() == *challenge {
|
||||
// 检查 challenge 是否在 15 分钟内有效 (可配置)
|
||||
let connected_at_duration = UNIX_EPOCH + Duration::from_secs(conn.connected_at);
|
||||
match SystemTime::now().duration_since(connected_at_duration) {
|
||||
@@ -318,8 +527,11 @@ impl NostrEventExt for NostrEvent {
|
||||
};
|
||||
|
||||
if !is_valid_challenge {
|
||||
RelayMessage::send_notice("Invalid or expired challenge".to_string(), to_client_msg_tx)
|
||||
.await;
|
||||
let _ = RelayMessage::send_notice(
|
||||
"Invalid or expired challenge".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -329,7 +541,7 @@ impl NostrEventExt for NostrEvent {
|
||||
.unwrap_or(Duration::ZERO)
|
||||
.as_secs();
|
||||
if self.created_at < now.saturating_sub(60) || self.created_at > now + 60 {
|
||||
RelayMessage::send_notice(
|
||||
let _ = RelayMessage::send_notice(
|
||||
"AUTH event timestamp out of acceptable range (must be within 60s of now)"
|
||||
.to_string(),
|
||||
to_client_msg_tx,
|
||||
@@ -347,7 +559,7 @@ impl NostrEventExt for NostrEvent {
|
||||
|
||||
info!("Client authenticated with pubkey: {}", self.pubkey);
|
||||
// 根据 NIP-42,认证成功也应该回复 OK 消息
|
||||
RelayMessage::send_ok(
|
||||
let _ = RelayMessage::send_ok(
|
||||
&self.id,
|
||||
true,
|
||||
"Authentication successful.".to_string(),
|
||||
|
||||
+33
-23
@@ -2,7 +2,6 @@ use anyhow::Result;
|
||||
use log::debug;
|
||||
use sqlx::sqlite::SqlitePool;
|
||||
use sqlx::{Execute, QueryBuilder, Row};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::constants::{DEFAULT_LIMIT, MAX_LIMIT};
|
||||
use crate::nostr::Filter;
|
||||
@@ -25,34 +24,45 @@ impl FilterExt for Filter {
|
||||
if let Some(ids) = &self.ids {
|
||||
if !ids.is_empty() {
|
||||
sql.push(" AND (");
|
||||
let mut separated = sql.separated(" OR ");
|
||||
let mut first = true;
|
||||
for id_prefix in ids {
|
||||
if id_prefix.len() < 64 {
|
||||
// 如果是 ID 前缀,使用 LIKE
|
||||
separated
|
||||
.push("id LIKE ")
|
||||
.push_bind(format!("{}%", id_prefix));
|
||||
} else {
|
||||
// 否则进行精确匹配
|
||||
separated.push("id = ").push_bind(id_prefix);
|
||||
if !first {
|
||||
sql.push(" OR ");
|
||||
}
|
||||
if id_prefix.len() == 64 {
|
||||
sql.push("id = ");
|
||||
sql.push_bind(id_prefix);
|
||||
} else {
|
||||
sql.push("id LIKE ");
|
||||
sql.push_bind(format!("{}%", id_prefix));
|
||||
}
|
||||
first = false;
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
sql.push(")");
|
||||
}
|
||||
}
|
||||
|
||||
// 作者过滤 (NIP-01 支持前缀匹配)
|
||||
if let Some(authors) = &self.authors {
|
||||
if !authors.is_empty() {
|
||||
sql.push(" AND pubkey IN (");
|
||||
let mut separated = sql.separated(",");
|
||||
sql.push(" AND (");
|
||||
let mut first = true;
|
||||
for author in authors {
|
||||
separated.push_bind(author);
|
||||
if !first {
|
||||
sql.push(" OR ");
|
||||
}
|
||||
if author.len() == 64 {
|
||||
sql.push("pubkey = ");
|
||||
sql.push_bind(author);
|
||||
} else {
|
||||
sql.push("pubkey LIKE ");
|
||||
sql.push_bind(format!("{}%", author));
|
||||
}
|
||||
first = false;
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
sql.push(")");
|
||||
}
|
||||
}
|
||||
|
||||
// 类型过滤
|
||||
if let Some(kinds) = &self.kinds {
|
||||
if !kinds.is_empty() {
|
||||
@@ -67,12 +77,13 @@ impl FilterExt for Filter {
|
||||
|
||||
// 时间过滤 (since)
|
||||
if let Some(since) = self.since {
|
||||
sql.push(" AND created_at >= ").push_bind(since as i64);
|
||||
sql.push(" AND created_at >= ");
|
||||
sql.push_bind(since as i64);
|
||||
}
|
||||
|
||||
// 时间过滤 (until)
|
||||
if let Some(until) = self.until {
|
||||
sql.push(" AND created_at <= ").push_bind(until as i64);
|
||||
sql.push(" AND created_at <= ");
|
||||
sql.push_bind(until as i64);
|
||||
}
|
||||
|
||||
// 标签过滤 (NIP-12 Generic Tag Queries)
|
||||
@@ -80,11 +91,10 @@ impl FilterExt for Filter {
|
||||
if tag_name.starts_with('#') && !tag_values.is_empty() {
|
||||
let tag_letter = &tag_name[1..]; // 提取标签字母 (e.g., 'e', 'p', 'd')
|
||||
|
||||
// 使用 JSON 函数查询 tags 数组
|
||||
// SELECT 1 FROM json_each(tags) AS tag_arr WHERE tag_arr.value->>0 = 'p' AND tag_arr.value->>1 IN ('...', '...')
|
||||
sql.push(" AND EXISTS (SELECT 1 FROM json_each(tags) AS tag_arr WHERE json_array_length(tag_arr.value) > 1 AND json_extract(tag_arr.value, '$[0]') = ");
|
||||
// 使用 event_tags 表查询
|
||||
sql.push(" AND EXISTS (SELECT 1 FROM event_tags WHERE event_id = events.id AND tag_name = ");
|
||||
sql.push_bind(tag_letter);
|
||||
sql.push(" AND json_extract(tag_arr.value, '$[1]') IN (");
|
||||
sql.push(" AND tag_value IN (");
|
||||
let mut separated = sql.separated(",");
|
||||
for value in tag_values {
|
||||
separated.push_bind(value);
|
||||
|
||||
+61
-18
@@ -1,8 +1,9 @@
|
||||
use crate::nostr::NostrEvent;
|
||||
use log::error;
|
||||
use log::{debug, error};
|
||||
use serde_json;
|
||||
use tokio::sync::mpsc;
|
||||
pub struct RelayMessage;
|
||||
use anyhow::{Result, anyhow};
|
||||
|
||||
impl RelayMessage {
|
||||
// 发送 OK 消息 (NIP-20)
|
||||
@@ -23,27 +24,48 @@ impl RelayMessage {
|
||||
event: &NostrEvent,
|
||||
sub_id: &str,
|
||||
to_client_msg_tx: &mpsc::Sender<String>,
|
||||
) {
|
||||
) -> Result<()> {
|
||||
// 尝试将 NostrEvent 序列化为 JSON 字符串
|
||||
let event_json = match serde_json::to_string(event) {
|
||||
Ok(json) => json,
|
||||
Err(e) => {
|
||||
error!("Failed to serialize event {} for sending: {}", event.id, e);
|
||||
return; // 序列化失败则不发送
|
||||
return Err(anyhow!(
|
||||
"Failed to serialize event {} for sending: {}",
|
||||
event.id,
|
||||
e
|
||||
));
|
||||
}
|
||||
};
|
||||
let msg = format!("[\"EVENT\", \"{}\", {}]", sub_id, event_json);
|
||||
if let Err(e) = to_client_msg_tx.send(msg).await {
|
||||
// 此错误通常表示客户端的接收端已关闭(客户端已断开)
|
||||
error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
|
||||
match to_client_msg_tx.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Channel closed 是正常的客户端断连,使用 debug 级别
|
||||
debug!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
|
||||
Err(anyhow!(
|
||||
"Failed to send EVENT message (sub_id: {}): {}",
|
||||
sub_id,
|
||||
e
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 发送 EOSE 消息 (NIP-15)
|
||||
pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
|
||||
pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) -> Result<()> {
|
||||
let msg = format!("[\"EOSE\", \"{}\"]", sub_id);
|
||||
if let Err(e) = to_client_msg_tx.send(msg).await {
|
||||
error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
|
||||
match to_client_msg_tx.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Channel closed 是正常的客户端断连,使用 debug 级别
|
||||
debug!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
|
||||
Err(anyhow!(
|
||||
"Failed to send EOSE message (sub_id: {}): {}",
|
||||
sub_id,
|
||||
e
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,21 +74,42 @@ impl RelayMessage {
|
||||
sub_id: &str,
|
||||
message: String,
|
||||
to_client_msg_tx: &mpsc::Sender<String>,
|
||||
) {
|
||||
) -> Result<()> {
|
||||
let msg = format!("[\"CLOSED\", \"{}\", \"{}\"]", sub_id, message);
|
||||
if let Err(e) = to_client_msg_tx.send(msg).await {
|
||||
error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
|
||||
match to_client_msg_tx.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Channel closed 是正常的客户端断连,使用 debug 级别
|
||||
debug!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
|
||||
Err(anyhow!(
|
||||
"Failed to send CLOSED message (sub_id: {}): {}",
|
||||
sub_id,
|
||||
e
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 发送 NOTICE 消息 (NIP-01)
|
||||
pub async fn send_notice(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
|
||||
pub async fn send_notice(
|
||||
message: String,
|
||||
to_client_msg_tx: &mpsc::Sender<String>,
|
||||
) -> Result<()> {
|
||||
let msg = format!("[\"NOTICE\", \"{}\"]", message);
|
||||
if let Err(e) = to_client_msg_tx.send(msg).await {
|
||||
error!(
|
||||
"Failed to send NOTICE message (message: {}): {}",
|
||||
message, e
|
||||
);
|
||||
match to_client_msg_tx.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Channel closed 是正常的客户端断连,使用 debug 级别
|
||||
debug!(
|
||||
"Failed to send NOTICE message (message: {}): {}",
|
||||
message, e
|
||||
);
|
||||
Err(anyhow!(
|
||||
"Failed to send NOTICE message (message: {}): {}",
|
||||
message,
|
||||
e
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+73
-28
@@ -9,7 +9,7 @@ use std::{
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{RwLock, broadcast, mpsc};
|
||||
use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message};
|
||||
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::constants::{CLIENT_CHANNEL_SIZE, MAX_FILTERS_PER_REQ, MAX_SUBSCRIPTIONS, SERVER_INFO};
|
||||
@@ -18,7 +18,7 @@ use crate::nostr::event::NostrEventExt;
|
||||
use crate::nostr::filter::FilterExt;
|
||||
use crate::nostr::messages::RelayMessage;
|
||||
use crate::nostr::utils::extract_p_tags_from_vec;
|
||||
use crate::nostr::{ClientMessage, Filter, NostrEvent};
|
||||
use crate::nostr::{Filter, NostrEvent};
|
||||
|
||||
pub async fn handle_ws_connection(
|
||||
stream: TcpStream,
|
||||
@@ -75,9 +75,23 @@ pub async fn handle_ws_connection(
|
||||
Ok(event_msg) = event_rx.recv() => {
|
||||
let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent
|
||||
let conn = client_conn_for_send.read().await;
|
||||
|
||||
// 检查 channel 是否已关闭,如果关闭则立即退出
|
||||
if conn.sender.is_closed() {
|
||||
info!("Client {} channel closed, stopping send task", client_id);
|
||||
break;
|
||||
}
|
||||
|
||||
for (sub_id, subscription) in &conn.subscriptions {
|
||||
if subscription.filters.iter().any(|filter| filter.matches(&event)) {
|
||||
RelayMessage::send_event(&event, sub_id, &conn.sender).await;
|
||||
match RelayMessage::send_event(&event, sub_id, &conn.sender).await {
|
||||
Ok(()) => debug!("Sent event to client {}", client_id),
|
||||
Err(_) => {
|
||||
// Channel 已关闭,立即退出整个循环
|
||||
info!("Client {} channel closed during event send, stopping send task", client_id);
|
||||
return;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -124,17 +138,30 @@ pub async fn handle_ws_connection(
|
||||
.await
|
||||
{
|
||||
error!("Error handling message for client {}: {}", client_id, e);
|
||||
RelayMessage::send_notice(format!("Invalid message: {}", e), &client_tx)
|
||||
.await;
|
||||
match RelayMessage::send_notice(
|
||||
format!("Invalid message: {}", e),
|
||||
&client_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => debug!("Sent error notice to client {}", client_id),
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"Failed to send error notice to client {}: {}",
|
||||
client_id, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(Message::Close(_)) => {
|
||||
info!("Client {} disconnected via CLOSE message", client_id);
|
||||
debug!("Client {} disconnected via CLOSE message", client_id);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Client {} WebSocket error: {}", client_id, e);
|
||||
debug!("Client {} WebSocket error: {}", client_id, e);
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
@@ -185,7 +212,7 @@ pub async fn handle_message(
|
||||
.to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
let sub_id = arr
|
||||
@@ -203,7 +230,7 @@ pub async fn handle_message(
|
||||
format!("Maximum subscriptions ({}) exceeded", MAX_SUBSCRIPTIONS),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -215,7 +242,7 @@ pub async fn handle_message(
|
||||
format!("Maximum filters ({}) exceeded", MAX_FILTERS_PER_REQ),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
let filter: Filter = serde_json::from_value(arr[i].clone())
|
||||
@@ -235,16 +262,35 @@ pub async fn handle_message(
|
||||
);
|
||||
}
|
||||
|
||||
// 检查 channel 是否已关闭
|
||||
if to_client_msg_tx.is_closed() {
|
||||
debug!(
|
||||
"Client channel closed before sending historical events for sub_id: {}",
|
||||
sub_id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 查询历史事件并发送
|
||||
for filter in &filters {
|
||||
let events = filter.select(pool).await?; // 使用 FilterExt
|
||||
for event in events {
|
||||
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await;
|
||||
// 在发送每个事件前检查 channel 状态,避免大量失败尝试
|
||||
if to_client_msg_tx.is_closed() {
|
||||
debug!(
|
||||
"Client channel closed during historical event sending for sub_id: {}",
|
||||
sub_id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// 发送 EOSE 消息,表示历史事件已发送完毕
|
||||
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await;
|
||||
if !to_client_msg_tx.is_closed() {
|
||||
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -278,27 +324,26 @@ pub async fn handle_message(
|
||||
// 权限检查:是否需要认证,以及是否在信任列表中
|
||||
if SERVER_INFO.auth_required {
|
||||
let conn = client_conn_clone.read().await;
|
||||
if !conn.authenticated {
|
||||
RelayMessage::send_closed(
|
||||
// NIP-42 Auth: Forbid unauthenticated EVENT
|
||||
&conn.id.to_string(), // 使用 client_id 作为 sub_id 来匹配 AUTH 挑战
|
||||
"auth-required: Authentication required to publish events".to_string(),
|
||||
&to_client_msg_tx_clone,
|
||||
)
|
||||
.await;
|
||||
debug!("Client {}: Not authenticated, event rejected.", conn.id);
|
||||
return;
|
||||
}
|
||||
let trust_accounts_guard = trust_accounts_clone.read().await;
|
||||
if !trust_accounts_guard.contains(&event_clone.pubkey) {
|
||||
RelayMessage::send_closed(
|
||||
match RelayMessage::send_closed(
|
||||
// NIP-42 Auth: Forbid unauthorized EVENT
|
||||
&conn.id.to_string(),
|
||||
"restricted: You are not in the trust list to publish events"
|
||||
.to_string(),
|
||||
&to_client_msg_tx_clone,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
Ok(_) => debug!(
|
||||
"Client {}: Pubkey {} not in trust list, authentication challenge sent.",
|
||||
conn.id, event_clone.pubkey
|
||||
),
|
||||
Err(e) => {
|
||||
error!("Failed to send authentication challenge: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
"Client {}: Pubkey {} not in trust list, event rejected.",
|
||||
conn.id, event_clone.pubkey
|
||||
@@ -332,7 +377,7 @@ pub async fn handle_message(
|
||||
new_trust_accounts.into_iter().collect();
|
||||
new_trust_accounts_set.insert(SERVER_INFO.pubkey.to_string()); // 确保服务器公钥始终在信任列表中
|
||||
*ts = new_trust_accounts_set;
|
||||
debug!("Trust list updated to {} accounts.", ts.len());
|
||||
info!("Trust list updated to {} accounts.", ts.len());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -369,7 +414,7 @@ pub async fn handle_message(
|
||||
"Subscription cancelled".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -379,7 +424,7 @@ pub async fn handle_message(
|
||||
"AUTH message requires an event: [\"AUTH\", <auth_event>]".to_string(),
|
||||
to_client_msg_tx,
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user