Compare commits

...

16 Commits

Author SHA1 Message Date
laoxong 6cc7346522 feat: Improve Tag search NIP-12
Build and Push Docker Image / build_docker_image (push) Failing after 15m2s
modified:   Cargo.lock
modified:   Cargo.toml
modified:   src/database.rs
modified:   src/nostr/event.rs
modified:   src/nostr/filter.rs
2025-12-02 18:48:05 +09:00
laoxong d70fdbd037 feat: Bump version to 1.0.0
Build and Push Docker Image / build_docker_image (push) Failing after 12m28s
2025-12-01 01:35:48 +09:00
laoxong 355ea8963b Fix: Protocol implementation error
Build and Push Docker Image / build_docker_image (push) Has been cancelled
2025-12-01 01:33:50 +09:00
laoxong 61924823a6 Fix: Fitter 'OR' Error 2025-11-23 21:05:08 +09:00
laoxong e8244a50e9 fix: 修正 Docker Hub 仓库名称
Build and Push Docker Image / build_docker_image (push) Successful in 15m16s
2025-11-23 19:50:24 +09:00
laoxong 98aa3060ce fix: 添加缺失的支持NIP 2025-11-23 19:48:44 +09:00
laoxong e09676582d Fix: wrong logic
Build and Push Docker Image / build_docker_image (push) Successful in 16m20s
2025-11-23 19:23:19 +09:00
laoxong 7dbca66b2f Add: Sqlite Connect Options
Build and Push Docker Image / build_docker_image (push) Successful in 13m50s
2025-10-22 13:39:29 +09:00
laoxong a59235cb8c refactor: 更优雅地处理客户端断开连接
Build and Push Docker Image / build_docker_image (push) Successful in 14m39s
改进对 WebSocket 客户端断开连接的处理,以防止日志中出现大量错误。

主要变更:
- 将关闭 channel 导致的发送失败日志级别从 error 降为 debug,因为客户端断开是正常操作。
- 在尝试发送消息(包括实时事件和历史事件)之前,主动检查客户端 channel 是否已关闭。
- 当检测到 channel 关闭时,及时终止发送任务,以确保资源被迅速清理并防止任务卡死。
2025-10-20 22:57:07 +09:00
laoxong b64ec550e0 refactor: Remove unused imports and handle unused results
Build and Push Docker Image / build_docker_image (push) Successful in 16m28s
This commit performs general code cleanup to improve quality and address compiler warnings.

Unused import statements have been removed from several files, including `HashSet`, `anyhow`, `log::error`, `serde::Deserialize`, and `serde_json::Value`. This declutters the code and makes dependencies clearer.

Additionally, the `Result` returned by `RelayMessage::send_ok` and `RelayMessage::send_notice` is now explicitly ignored. This resolves `must_use` warnings for these fire-and-forget message sending functions, where error handling is not critical to the application's flow.
2025-09-22 16:56:00 +08:00
laoxong 7ac67102ef Update README.md
Build and Push Docker Image / build_docker_image (push) Successful in 8m15s
2025-08-07 00:41:23 +08:00
laoxong 70da823c43 Update README.md
Build and Push Docker Image / build_docker_image (push) Has been cancelled
2025-08-07 00:38:44 +08:00
laoxong e9bfb2b667 Update README.md
Build and Push Docker Image / build_docker_image (push) Has been cancelled
2025-08-07 00:35:14 +08:00
laoxong d7de8b8f8f Feat: Adjust the log level
Build and Push Docker Image / build_docker_image (push) Successful in 8m31s
2025-08-07 00:15:23 +08:00
laoxong fb7a633d14 Fix auth_required comparison 2025-08-04 03:18:48 +08:00
laoxong 8ba270156a Update README.md 2025-08-04 03:01:53 +08:00
15 changed files with 580 additions and 144 deletions
+1 -1
View File
@@ -34,7 +34,7 @@ jobs:
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ secrets.DOCKER_USERNAME }}/nostr_demo # 你的 Docker Hub 仓库名称,替换为你的用户名
images: ${{ secrets.DOCKER_USERNAME }}/nostr
tags: |
type=schedule
type=ref,event=branch
+1
View File
@@ -22,3 +22,4 @@ target/
.idea/
.vscode/
.zed/
.env
@@ -0,0 +1,44 @@
{
"db_name": "SQLite",
"query": "SELECT * FROM deleted_events WHERE event_id = ? AND pubkey = ?",
"describe": {
"columns": [
{
"name": "event_id",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "pubkey",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "kind",
"ordinal": 2,
"type_info": "Integer"
},
{
"name": "d_tag",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "deleted_at",
"ordinal": 4,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
false,
true,
true,
true
]
},
"hash": "20cd6a8251512848aaf6b0a5ea721da5d37b0d946f7a0e083afaa9e8fb296e3f"
}
Generated
+1 -1
View File
@@ -891,7 +891,7 @@ dependencies = [
[[package]]
name = "nostr-relay"
version = "0.0.4"
version = "1.0.1"
dependencies = [
"anyhow",
"env_logger",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nostr-relay"
version = "0.0.4"
version = "1.0.1"
edition = "2024"
[dependencies]
+36 -1
View File
@@ -3,9 +3,44 @@ This is a nostr relay running on [rust](https://www.rust-lang.org/) and [tokio](
A Rust learning project with poor code quality.
# Usage
Run `cargo run` whit environment variable `DB_PATH` set to the path of the database file.
## Running Locally
Run `cargo run` with environment variable `DB_PATH` set to the path of the database file.
`RUST_LOG=info BIND_ADDR="0.0.0.0:8080" DB_PATH=./nostr.db AUTH_REQUIRED=True cargo run`
## Docker
# Run with a persistent database volume
`docker run -p 8080:8080 -v /path/to/your/db:/usr/local/bin/nostr.db laoxong/nostr_demo:master`
`docker run -p 8080:8080 -e AUTH_REQUIRED=true laoxong/nostr_demo:master`
## Docker Compose
```
nostr-relay:
image: laoxong/nostr_demo:master
container_name: nostr-relay
volumes:
- ./nostr-relay/:/etc/nostr
environment:
- DB_PATH=/etc/nostr/nostr.db
- AUTH_REQUIRED=False
```
# Env
- DB_PATH: Path to the SQLite database file. Defaults to `nostr.db`.
- BIND_ADDR: The address and port to bind the server to. Defaults to `0.0.0.0:8080`.
- RUST_LOG: Log level. Defaults to `warn`
- AUTH_REQUIRED: Set to `true` to enable authentication for publishing events (NIP-42). Defaults to `false`. If enabled, only authenticated clients whose pubkey is in the relay's trust list can publish events.
# Features
- NIP-1
- NIP-2
- NIP-5
- NIP-9
- NIP-12
- NIP-42
- NIP-65
+18 -9
View File
@@ -6,15 +6,18 @@ pub const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080";
pub const DEFAULT_DB_PATH: &str = "nostr.db";
pub const MAX_EVENT_TAGS: u32 = 5000;
pub const MAX_LIMIT: u64 = 500;
pub const DEFAULT_LIMIT: u64 = 10;
pub const DEFAULT_LIMIT: u64 = 50;
pub const MAX_FILTERS_PER_REQ: usize = 100;
pub const BROADCAST_CHANNEL_SIZE: usize = 100;
pub const CLIENT_CHANNEL_SIZE: usize = 32;
pub const MAX_SUBSCRIPTIONS: usize = 20;
pub static RELAY_URL: Lazy<String> =
Lazy::new(|| env::var("RELAY_URL").unwrap_or_else(|_| "".to_string()));
pub static SERVER_INFO: Lazy<ServerInfo> = Lazy::new(|| ServerInfo {
contact: "https://www.moec.top/",
description: "Powered by laoXong.",
contact: env::var("RELAY_CONTACT").unwrap_or_else(|_| "https://www.moec.top/".to_string()),
description: env::var("RELAY_DESCRIPTION")
.unwrap_or_else(|_| "Powered by laoXong.".to_string()),
limitation: Limitation {
max_event_tags: MAX_EVENT_TAGS,
max_event_time_newer_than_now: 900,
@@ -26,10 +29,16 @@ pub static SERVER_INFO: Lazy<ServerInfo> = Lazy::new(|| ServerInfo {
max_subscriptions: MAX_SUBSCRIPTIONS as u32, // usize to u32 cast
min_prefix: 10,
},
name: "A rust nostr relay by laoXong",
pubkey: "63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6",
software: "https://git.moe.gift/laoxong/nostr-relay",
supported_nips: vec![1, 2, 5, 42, 65],
version: env!("CARGO_PKG_VERSION"), // 从 Cargo.toml 获取版本
auth_required: env::var("AUTH_REQUIRED").unwrap_or_else(|_| "False".to_string()) == "True",
name: env::var("RELAY_NAME").unwrap_or_else(|_| "A rust nostr relay by laoXong".to_string()),
pubkey: env::var("RELAY_PUBKEY").unwrap_or_else(|_| {
"63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6".to_string()
}),
software: env::var("RELAY_SOFTWARE")
.unwrap_or_else(|_| "https://git.moe.gift/laoxong/nostr-relay".to_string()),
supported_nips: vec![1, 2, 5, 9, 11, 42, 50, 65],
version: env!("CARGO_PKG_VERSION").to_string(), // 从 Cargo.toml 获取版本
auth_required: env::var("AUTH_REQUIRED")
.unwrap_or_else(|_| "false".to_string())
.to_lowercase()
== "true",
});
+30 -2
View File
@@ -4,7 +4,6 @@ use anyhow::Result;
use log::error;
use sqlx::sqlite::SqlitePool;
use sqlx::{Row, query};
use std::collections::HashSet;
pub async fn init_database(pool: &SqlitePool) -> Result<()> {
let create_events_table = r#"
@@ -16,19 +15,48 @@ pub async fn init_database(pool: &SqlitePool) -> Result<()> {
tags TEXT NOT NULL,
content TEXT NOT NULL,
sig TEXT NOT NULL,
d_tag TEXT,
indexed_at INTEGER DEFAULT (unixepoch())
);
"#;
let create_deleted_events_table = r#"
CREATE TABLE IF NOT EXISTS deleted_events (
event_id TEXT,
pubkey TEXT NOT NULL,
kind INTEGER,
d_tag TEXT,
deleted_at INTEGER DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS event_tags (
event_id TEXT NOT NULL,
tag_name TEXT NOT NULL,
tag_value TEXT NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY(event_id) REFERENCES events(id) ON DELETE CASCADE
);
"#;
let create_indexes = vec![
"CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);",
"CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);",
"CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);",
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);",
"CREATE INDEX IF NOT EXISTS idx_events_kind_created_at_desc ON events(kind, created_at DESC);",
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind_d_tag ON events(pubkey, kind, d_tag);",
"CREATE INDEX IF NOT EXISTS idx_deleted_events_pubkey ON deleted_events(pubkey, event_id);",
"CREATE INDEX IF NOT EXISTS idx_deleted_events_kind_d_tag ON deleted_events(kind, pubkey, d_tag);",
"CREATE INDEX IF NOT EXISTS idx_event_tags_name_value_created ON event_tags(tag_name, tag_value, created_at DESC);",
];
query(create_events_table).execute(pool).await?;
// 尝试添加 d_tag 列,如果已存在则忽略错误
let _ = query("ALTER TABLE events ADD COLUMN d_tag TEXT DEFAULT ''")
.execute(pool)
.await;
query(create_deleted_events_table).execute(pool).await?;
for index_sql in create_indexes {
query(index_sql).execute(pool).await?;
@@ -40,7 +68,7 @@ pub async fn init_database(pool: &SqlitePool) -> Result<()> {
// 获取信任账户列表
pub async fn get_trust_accounts(pool: &SqlitePool) -> Vec<String> {
let pubkey = SERVER_INFO.pubkey; // 获取服务器公钥
let pubkey = &SERVER_INFO.pubkey; // 获取服务器公钥
// 查询最新的 kind 3 事件(联系人列表),获取其中的 p 标签作为信任账户
let sql =
"SELECT tags FROM events WHERE kind = 3 AND pubkey = ? ORDER BY created_at DESC LIMIT 1";
+1 -2
View File
@@ -1,6 +1,5 @@
use anyhow::{Result, anyhow};
use anyhow::Result;
use httparse::{EMPTY_HEADER, Request};
use log::{error, info};
use sqlx::SqlitePool;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
+13 -3
View File
@@ -11,6 +11,7 @@ use log::info;
use sqlx::sqlite::SqlitePool;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, broadcast};
@@ -27,9 +28,18 @@ async fn main() -> Result<()> {
// 数据库初始化
let db_path = env::var("DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string());
let db_url = format!("sqlite://{}", db_path);
let pool = SqlitePool::connect(&db_url).await?;
info!("Database pool connected successfully to {}", db_url);
let pool = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(100)
.acquire_timeout(std::time::Duration::from_secs(30))
.connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true)
.foreign_keys(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal),
)
.await?;
info!("Database pool connected successfully to {}", db_path);
database::init_database(&pool).await?; // 使用 database 模块的函数
let pool = Arc::new(pool); // 将 SqlitePool 包裹在 Arc 中,以便多线程共享
info!("Connected to SQLite");
+7 -7
View File
@@ -1,5 +1,5 @@
use crate::nostr::Filter;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;
@@ -7,14 +7,14 @@ use uuid::Uuid;
// 服务器信息结构体(用于 NIP-11)
#[derive(Serialize)]
pub struct ServerInfo {
pub contact: &'static str,
pub description: &'static str,
pub contact: String,
pub description: String,
pub limitation: Limitation,
pub name: &'static str,
pub pubkey: &'static str,
pub software: &'static str,
pub name: String,
pub pubkey: String,
pub software: String,
pub supported_nips: Vec<u32>,
pub version: &'static str,
pub version: String,
pub auth_required: bool,
}
+244 -32
View File
@@ -1,18 +1,19 @@
use anyhow::Result;
use log::{debug, error, info};
use log::{debug, info};
use secp256k1::{
Message as SecpMessage, Secp256k1, XOnlyPublicKey, schnorr::Signature as SchnorrSignature,
};
use serde_json::{Value, json};
use serde_json::json;
use sha2::{Digest, Sha256};
use sqlx::SqlitePool;
use std::{
result,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::{RwLock, mpsc};
use crate::constants::SERVER_INFO;
use crate::constants::{RELAY_URL, SERVER_INFO};
use crate::models::ClientConnection;
use crate::nostr::NostrEvent;
use crate::nostr::messages::RelayMessage;
@@ -20,7 +21,7 @@ use crate::nostr::messages::RelayMessage;
pub trait NostrEventExt {
fn serialize_for_id(&self) -> String; // 用于计算事件 ID 的序列化
fn verify(&self) -> bool; // 验证 ID、时间戳、标签数量和签名
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error>; // 保存事件到数据库
async fn save(&self, pool: &SqlitePool) -> Result<(), anyhow::Error>; // 保存事件到数据库
async fn handle_auth_event(
&self,
client_conn: &Arc<RwLock<ClientConnection>>,
@@ -31,7 +32,7 @@ pub trait NostrEventExt {
impl NostrEventExt for NostrEvent {
// 根据 Nostr 协议规则序列化事件,用于计算事件 ID
fn serialize_for_id(&self) -> String {
let serialized = json!([
json!([
0, // 用于 ID 计算的协议版本,Nostr 当前为 0
self.pubkey,
self.created_at,
@@ -39,8 +40,7 @@ impl NostrEventExt for NostrEvent {
self.tags,
self.content
])
.to_string();
serialized
.to_string()
}
// 验证事件的有效性(ID、时间戳、标签数量、签名)
@@ -90,6 +90,11 @@ impl NostrEventExt for NostrEvent {
);
return false;
}
// 验证只有一个e标签
if self.tags.iter().filter(|tag| tag[0] == "e").count() > 1 {
debug!("Event has more than one e tag");
return false;
}
// 4. 解析公钥:从十六进制字符串解析 XOnlyPublicKey
let pubkey_bytes: Vec<u8> = match hex::decode(&self.pubkey) {
@@ -175,8 +180,17 @@ impl NostrEventExt for NostrEvent {
}
// 保存事件到数据库,并处理可替换事件和删除事件
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
async fn save(&self, pool: &SqlitePool) -> Result<(), anyhow::Error> {
let tags_json = serde_json::to_string(&self.tags).unwrap();
let mut tx = pool.begin().await?;
// 提取 d_tag
let d_tag = self
.tags
.iter()
.find(|tag| tag.len() >= 2 && tag[0] == "d")
.map(|tag| tag[1].clone())
.unwrap_or_default(); // 默认为空字符串
match self.kind {
// NIP-09 事件删除 (Event Deletion)
@@ -189,11 +203,24 @@ impl NostrEventExt for NostrEvent {
"Attempting to delete event with ID: {} by request of pubkey {}",
event_id_to_delete, self.pubkey
);
let sql = "DELETE FROM events WHERE id = ? AND pubkey = ?";
let result = sqlx::query(sql)
let result =
sqlx::query("DELETE FROM events WHERE id = ? AND pubkey = ?")
.bind(event_id_to_delete)
.bind(&self.pubkey)
.execute(pool)
.execute(&mut *tx)
.await?;
if result.rows_affected() > 0 {
info!(
"Deleted event {} by pubkey (kind 5): {}",
event_id_to_delete, self.pubkey
);
let result = sqlx::query(
"INSERT INTO deleted_events (event_id, pubkey, deleted_at) VALUES (?, ?, ?)",
)
.bind(event_id_to_delete)
.bind(&self.pubkey)
.bind(self.created_at as i64)
.execute(&mut *tx)
.await?;
if result.rows_affected() > 0 {
info!(
@@ -202,42 +229,189 @@ impl NostrEventExt for NostrEvent {
);
} else {
debug!(
"Could not delete event {} for pubkey {}. It might not exist or unauthorized.",
"Could not delete event {} for pubkey {}.",
event_id_to_delete, self.pubkey
);
}
} else {
debug!(
"Could not delete event {} for pubkey {}.",
event_id_to_delete, self.pubkey
);
}
} else if tag.get(0).map(|s| s == "a").unwrap_or(false) {
let d_tag_value = &tag[1];
let tag_d_vector = d_tag_value.splitn(3, ':').collect::<Vec<&str>>();
if tag_d_vector.len() != 3 {
debug!("Invalid a-tag format: {}", d_tag_value);
continue;
}
let result = sqlx::query(
"DELETE FROM events WHERE kind = ? AND pubkey = ? AND d_tag = ? AND created_at <= ?;",
)
.bind(tag_d_vector[0].parse::<i64>().unwrap())
.bind(&self.pubkey)
.bind(tag_d_vector[2])
.bind(self.created_at as i64)
.execute(&mut *tx)
.await?;
if result.rows_affected() > 0 {
info!(
"Deleted event {:?} by pubkey (kind 5): {}",
tag_d_vector, self.pubkey
);
let result = sqlx::query(
"INSERT INTO deleted_events (kind, pubkey, d_tag, deleted_at) VALUES (?, ?, ?, ?)",
)
.bind(tag_d_vector[0])
.bind(&self.pubkey)
.bind(tag_d_vector[2])
.bind(self.created_at as i64)
.execute(&mut *tx)
.await?;
if result.rows_affected() > 0 {
info!(
"Deleted event {:?} by pubkey (kind 5): {}",
tag_d_vector, self.pubkey
);
} else {
debug!(
"Could not delete event {:?} for pubkey {}.",
tag_d_vector, self.pubkey
);
}
} else {
debug!(
"Could not delete event {:?} for pubkey {}.",
tag_d_vector, self.pubkey
);
}
}
}
}
}
// NIP-01 可替换事件 (Replaceable Events): kind 0 (Metadata), kind 3 (Contact List)
// NIP-16 可替换事件 (Replaceable Events): kinds 10000 to 19999
// NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999
// 对于所有这些可替换事件,新事件会替换掉旧事件。
// 这里的简化处理是,对于所有可替换事件,都基于 (pubkey, kind) 来删除旧事件。
// 对于 NIP-33 事件,严格来说还需要匹配 'd' tag。
// 但考虑到数据库操作的复杂性以及原始代码的实现,这里仍采用 (pubkey, kind) 的方式。
// 一个更健壮的 NIP-33 实现可能需要单独的字段来存储 'd' tag 或更复杂的 SQL。
0 | 3 | _
if (self.kind >= 10000 && self.kind < 20000)
|| (self.kind >= 30000 && self.kind < 40000) =>
{
// 对于这些可替换事件,新事件会替换掉旧事件。
0 | 3 | _ if (self.kind >= 10000 && self.kind < 20000) => {
debug!(
"Attempting to delete previous replaceable event for pubkey: {}, kind: {}",
self.pubkey, self.kind
);
let is_deleted = sqlx::query(
"SELECT 1 FROM deleted_events WHERE pubkey = ? AND kind = ? AND deleted_at > ?",
)
.bind(&self.pubkey)
.bind(self.kind)
.bind(self.created_at as i64)
.fetch_optional(&mut *tx)
.await?;
if is_deleted.is_some() {
return Err(anyhow::anyhow!("Event already deleted"));
}
let sql = "SELECT created_at FROM events WHERE pubkey = ? AND kind = ? ORDER BY created_at DESC LIMIT 1";
let created_at: Option<u64> = sqlx::query_scalar(sql)
.bind(&self.pubkey)
.bind(self.kind)
.fetch_optional(&mut *tx)
.await?;
if let Some(prev_created_at) = created_at {
if prev_created_at <= self.created_at {
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ?";
sqlx::query(sql)
.bind(&self.pubkey)
.bind(self.kind)
.execute(pool)
.execute(&mut *tx)
.await?;
debug!(
"Deleted previous replaceable event for pubkey: {}, kind: {}",
self.pubkey, self.kind
);
} else {
debug!(
"Previous replaceable event for pubkey: {}, kind: {} is not older than current event",
self.pubkey, self.kind
);
// 如果新事件不比旧事件新,则报错
anyhow::bail!("Event is not newer than existing replaceable event");
}
}
}
// NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999
_ if (self.kind >= 30000 && self.kind < 40000) => {
debug!(
"Attempting to delete previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {}",
self.pubkey, self.kind, d_tag
);
let result = sqlx::query(
"SELECT * FROM deleted_events WHERE pubkey = ? AND kind = ? AND d_tag = ? AND deleted_at > ?",
)
.bind(&self.pubkey)
.bind(self.kind)
.bind(&d_tag)
.bind(self.created_at as i64)
.fetch_optional(&mut *tx)
.await?;
if let Some(_) = result {
debug!(
"Event {} already deleted by pubkey {}",
self.id, self.pubkey
);
return Err(anyhow::anyhow!("Event already deleted"));
}
let sql = "SELECT created_at FROM events WHERE pubkey = ? AND kind = ? AND d_tag = ? ORDER BY created_at DESC LIMIT 1";
let created_at: Option<u64> = sqlx::query_scalar(sql)
.bind(&self.pubkey)
.bind(self.kind)
.bind(&d_tag)
.fetch_optional(&mut *tx)
.await?;
if let Some(prev_created_at) = created_at {
if prev_created_at < self.created_at {
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ? AND d_tag = ?";
sqlx::query(sql)
.bind(&self.pubkey)
.bind(self.kind)
.bind(&d_tag)
.execute(&mut *tx)
.await?;
debug!(
"Deleted previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {}",
self.pubkey, self.kind, d_tag
);
} else {
debug!(
"Previous parameterized replaceable event for pubkey: {}, kind: {}, d_tag: {} is not older than current event",
self.pubkey, self.kind, d_tag
);
anyhow::bail!(
"Event is not newer than existing parameterized replaceable event"
);
}
}
}
_ => { /* 非可替换事件不需要在插入前删除 */ }
}
// 插入新事件
let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);";
if !(self.kind >= 20000 && self.kind < 30000) {
let result =
sqlx::query("SELECT * FROM deleted_events WHERE event_id = ? AND pubkey = ?")
.bind(&self.id)
.bind(&self.pubkey)
.fetch_optional(&mut *tx)
.await?;
if let Some(_) = result {
debug!(
"Event {} already deleted by pubkey {}",
self.id, self.pubkey
);
return Err(anyhow::anyhow!("Event already deleted"));
}
let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig, d_tag) VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
sqlx::query(sql)
.bind(&self.id)
.bind(&self.pubkey)
@@ -246,8 +420,27 @@ impl NostrEventExt for NostrEvent {
.bind(tags_json)
.bind(&self.content)
.bind(&self.sig)
.execute(pool)
.bind(&d_tag)
.execute(&mut *tx)
.await?;
for tag in &self.tags {
if tag.len() >= 2 {
let tag_name = &tag[0];
let tag_value = &tag[1];
if tag_name.len() == 1 {
sqlx::query("INSERT INTO event_tags (event_id, tag_name, tag_value, created_at) VALUES (?, ?, ?, ?)")
.bind(&self.id)
.bind(tag_name)
.bind(tag_value)
.bind(self.created_at as i64)
.execute(&mut *tx)
.await?;
}
}
}
}
tx.commit().await?;
Ok(())
}
@@ -259,7 +452,7 @@ impl NostrEventExt for NostrEvent {
) -> Result<(), anyhow::Error> {
// 1. 验证事件类型是否为 NIP-42 认证事件 (kind 22242)
if self.kind != 22242 {
RelayMessage::send_ok(
let _ = RelayMessage::send_ok(
&self.id,
false,
"AUTH event must be kind 22242".to_string(), // 根据协议,OK 消息应包含事件 ID
@@ -271,7 +464,10 @@ impl NostrEventExt for NostrEvent {
// 2. 验证事件签名是否有效
if !self.verify() {
RelayMessage::send_notice("Invalid AUTH event signature".to_string(), to_client_msg_tx)
let _ = RelayMessage::send_notice(
"Invalid AUTH event signature".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
@@ -293,7 +489,7 @@ impl NostrEventExt for NostrEvent {
let challenge = match challenge {
Some(c) => c,
None => {
RelayMessage::send_notice(
let _ = RelayMessage::send_notice(
"AUTH event missing challenge tag".to_string(),
to_client_msg_tx,
)
@@ -302,10 +498,23 @@ impl NostrEventExt for NostrEvent {
}
};
let relay_url = match relay_url {
Some(r) => r,
None => {
let _ = RelayMessage::send_notice(
"AUTH event missing relay tag".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
};
// 4. 验证 challenge 是否匹配客户端连接 ID,并且未过期
let is_valid_challenge = {
let conn = client_conn.read().await;
if conn.id.to_string() == *challenge {
if relay_url.as_str() != RELAY_URL.as_str() {
false
} else if conn.id.to_string() == *challenge {
// 检查 challenge 是否在 15 分钟内有效 (可配置)
let connected_at_duration = UNIX_EPOCH + Duration::from_secs(conn.connected_at);
match SystemTime::now().duration_since(connected_at_duration) {
@@ -318,7 +527,10 @@ impl NostrEventExt for NostrEvent {
};
if !is_valid_challenge {
RelayMessage::send_notice("Invalid or expired challenge".to_string(), to_client_msg_tx)
let _ = RelayMessage::send_notice(
"Invalid or expired challenge".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
@@ -329,7 +541,7 @@ impl NostrEventExt for NostrEvent {
.unwrap_or(Duration::ZERO)
.as_secs();
if self.created_at < now.saturating_sub(60) || self.created_at > now + 60 {
RelayMessage::send_notice(
let _ = RelayMessage::send_notice(
"AUTH event timestamp out of acceptable range (must be within 60s of now)"
.to_string(),
to_client_msg_tx,
@@ -347,7 +559,7 @@ impl NostrEventExt for NostrEvent {
info!("Client authenticated with pubkey: {}", self.pubkey);
// 根据 NIP-42,认证成功也应该回复 OK 消息
RelayMessage::send_ok(
let _ = RelayMessage::send_ok(
&self.id,
true,
"Authentication successful.".to_string(),
+32 -22
View File
@@ -2,7 +2,6 @@ use anyhow::Result;
use log::debug;
use sqlx::sqlite::SqlitePool;
use sqlx::{Execute, QueryBuilder, Row};
use std::collections::HashMap;
use crate::constants::{DEFAULT_LIMIT, MAX_LIMIT};
use crate::nostr::Filter;
@@ -25,34 +24,45 @@ impl FilterExt for Filter {
if let Some(ids) = &self.ids {
if !ids.is_empty() {
sql.push(" AND (");
let mut separated = sql.separated(" OR ");
let mut first = true;
for id_prefix in ids {
if id_prefix.len() < 64 {
// 如果是 ID 前缀,使用 LIKE
separated
.push("id LIKE ")
.push_bind(format!("{}%", id_prefix));
if !first {
sql.push(" OR ");
}
if id_prefix.len() == 64 {
sql.push("id = ");
sql.push_bind(id_prefix);
} else {
// 否则进行精确匹配
separated.push("id = ").push_bind(id_prefix);
sql.push("id LIKE ");
sql.push_bind(format!("{}%", id_prefix));
}
first = false;
}
separated.push_unseparated(")");
sql.push(")");
}
}
// 作者过滤 (NIP-01 支持前缀匹配)
if let Some(authors) = &self.authors {
if !authors.is_empty() {
sql.push(" AND pubkey IN (");
let mut separated = sql.separated(",");
sql.push(" AND (");
let mut first = true;
for author in authors {
separated.push_bind(author);
if !first {
sql.push(" OR ");
}
separated.push_unseparated(")");
if author.len() == 64 {
sql.push("pubkey = ");
sql.push_bind(author);
} else {
sql.push("pubkey LIKE ");
sql.push_bind(format!("{}%", author));
}
first = false;
}
sql.push(")");
}
}
// 类型过滤
if let Some(kinds) = &self.kinds {
if !kinds.is_empty() {
@@ -67,12 +77,13 @@ impl FilterExt for Filter {
// 时间过滤 (since)
if let Some(since) = self.since {
sql.push(" AND created_at >= ").push_bind(since as i64);
sql.push(" AND created_at >= ");
sql.push_bind(since as i64);
}
// 时间过滤 (until)
if let Some(until) = self.until {
sql.push(" AND created_at <= ").push_bind(until as i64);
sql.push(" AND created_at <= ");
sql.push_bind(until as i64);
}
// 标签过滤 (NIP-12 Generic Tag Queries)
@@ -80,11 +91,10 @@ impl FilterExt for Filter {
if tag_name.starts_with('#') && !tag_values.is_empty() {
let tag_letter = &tag_name[1..]; // 提取标签字母 (e.g., 'e', 'p', 'd')
// 使用 JSON 函数查询 tags 数组
// SELECT 1 FROM json_each(tags) AS tag_arr WHERE tag_arr.value->>0 = 'p' AND tag_arr.value->>1 IN ('...', '...')
sql.push(" AND EXISTS (SELECT 1 FROM json_each(tags) AS tag_arr WHERE json_array_length(tag_arr.value) > 1 AND json_extract(tag_arr.value, '$[0]') = ");
// 使用 event_tags 表查询
sql.push(" AND EXISTS (SELECT 1 FROM event_tags WHERE event_id = events.id AND tag_name = ");
sql.push_bind(tag_letter);
sql.push(" AND json_extract(tag_arr.value, '$[1]') IN (");
sql.push(" AND tag_value IN (");
let mut separated = sql.separated(",");
for value in tag_values {
separated.push_bind(value);
+58 -15
View File
@@ -1,8 +1,9 @@
use crate::nostr::NostrEvent;
use log::error;
use log::{debug, error};
use serde_json;
use tokio::sync::mpsc;
pub struct RelayMessage;
use anyhow::{Result, anyhow};
impl RelayMessage {
// 发送 OK 消息 (NIP-20)
@@ -23,27 +24,48 @@ impl RelayMessage {
event: &NostrEvent,
sub_id: &str,
to_client_msg_tx: &mpsc::Sender<String>,
) {
) -> Result<()> {
// 尝试将 NostrEvent 序列化为 JSON 字符串
let event_json = match serde_json::to_string(event) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize event {} for sending: {}", event.id, e);
return; // 序列化失败则不发送
return Err(anyhow!(
"Failed to serialize event {} for sending: {}",
event.id,
e
));
}
};
let msg = format!("[\"EVENT\", \"{}\", {}]", sub_id, event_json);
if let Err(e) = to_client_msg_tx.send(msg).await {
// 此错误通常表示客户端的接收端已关闭(客户端已断开)
error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
Err(anyhow!(
"Failed to send EVENT message (sub_id: {}): {}",
sub_id,
e
))
}
}
}
// 发送 EOSE 消息 (NIP-15)
pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) -> Result<()> {
let msg = format!("[\"EOSE\", \"{}\"]", sub_id);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
Err(anyhow!(
"Failed to send EOSE message (sub_id: {}): {}",
sub_id,
e
))
}
}
}
@@ -52,21 +74,42 @@ impl RelayMessage {
sub_id: &str,
message: String,
to_client_msg_tx: &mpsc::Sender<String>,
) {
) -> Result<()> {
let msg = format!("[\"CLOSED\", \"{}\", \"{}\"]", sub_id, message);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// Channel closed 是正常的客户端断连,使用 debug 级别
debug!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
Err(anyhow!(
"Failed to send CLOSED message (sub_id: {}): {}",
sub_id,
e
))
}
}
}
// 发送 NOTICE 消息 (NIP-01)
pub async fn send_notice(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
pub async fn send_notice(
message: String,
to_client_msg_tx: &mpsc::Sender<String>,
) -> Result<()> {
let msg = format!("[\"NOTICE\", \"{}\"]", message);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!(
match to_client_msg_tx.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// Channel closed 是正常的客户端断连,使用 debug 级别
debug!(
"Failed to send NOTICE message (message: {}): {}",
message, e
);
Err(anyhow!(
"Failed to send NOTICE message (message: {}): {}",
message,
e
))
}
}
}
}
+73 -28
View File
@@ -9,7 +9,7 @@ use std::{
};
use tokio::net::TcpStream;
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message};
use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};
use uuid::Uuid;
use crate::constants::{CLIENT_CHANNEL_SIZE, MAX_FILTERS_PER_REQ, MAX_SUBSCRIPTIONS, SERVER_INFO};
@@ -18,7 +18,7 @@ use crate::nostr::event::NostrEventExt;
use crate::nostr::filter::FilterExt;
use crate::nostr::messages::RelayMessage;
use crate::nostr::utils::extract_p_tags_from_vec;
use crate::nostr::{ClientMessage, Filter, NostrEvent};
use crate::nostr::{Filter, NostrEvent};
pub async fn handle_ws_connection(
stream: TcpStream,
@@ -75,9 +75,23 @@ pub async fn handle_ws_connection(
Ok(event_msg) = event_rx.recv() => {
let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent
let conn = client_conn_for_send.read().await;
// 检查 channel 是否已关闭,如果关闭则立即退出
if conn.sender.is_closed() {
info!("Client {} channel closed, stopping send task", client_id);
break;
}
for (sub_id, subscription) in &conn.subscriptions {
if subscription.filters.iter().any(|filter| filter.matches(&event)) {
RelayMessage::send_event(&event, sub_id, &conn.sender).await;
match RelayMessage::send_event(&event, sub_id, &conn.sender).await {
Ok(()) => debug!("Sent event to client {}", client_id),
Err(_) => {
// Channel 已关闭,立即退出整个循环
info!("Client {} channel closed during event send, stopping send task", client_id);
return;
},
}
}
}
}
@@ -124,17 +138,30 @@ pub async fn handle_ws_connection(
.await
{
error!("Error handling message for client {}: {}", client_id, e);
RelayMessage::send_notice(format!("Invalid message: {}", e), &client_tx)
.await;
match RelayMessage::send_notice(
format!("Invalid message: {}", e),
&client_tx,
)
.await
{
Ok(()) => debug!("Sent error notice to client {}", client_id),
Err(e) => {
debug!(
"Failed to send error notice to client {}: {}",
client_id, e
);
return;
}
};
}
});
}
Ok(Message::Close(_)) => {
info!("Client {} disconnected via CLOSE message", client_id);
debug!("Client {} disconnected via CLOSE message", client_id);
break;
}
Err(e) => {
error!("Client {} WebSocket error: {}", client_id, e);
debug!("Client {} WebSocket error: {}", client_id, e);
break;
}
_ => {}
@@ -185,7 +212,7 @@ pub async fn handle_message(
.to_string(),
to_client_msg_tx,
)
.await;
.await?;
return Ok(());
}
let sub_id = arr
@@ -203,7 +230,7 @@ pub async fn handle_message(
format!("Maximum subscriptions ({}) exceeded", MAX_SUBSCRIPTIONS),
to_client_msg_tx,
)
.await;
.await?;
return Ok(());
}
}
@@ -215,7 +242,7 @@ pub async fn handle_message(
format!("Maximum filters ({}) exceeded", MAX_FILTERS_PER_REQ),
to_client_msg_tx,
)
.await;
.await?;
return Ok(());
}
let filter: Filter = serde_json::from_value(arr[i].clone())
@@ -235,16 +262,35 @@ pub async fn handle_message(
);
}
// 检查 channel 是否已关闭
if to_client_msg_tx.is_closed() {
debug!(
"Client channel closed before sending historical events for sub_id: {}",
sub_id
);
return Ok(());
}
// 查询历史事件并发送
for filter in &filters {
let events = filter.select(pool).await?; // 使用 FilterExt
for event in events {
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await;
// 在发送每个事件前检查 channel 状态,避免大量失败尝试
if to_client_msg_tx.is_closed() {
debug!(
"Client channel closed during historical event sending for sub_id: {}",
sub_id
);
return Ok(());
}
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await?;
}
}
// 发送 EOSE 消息,表示历史事件已发送完毕
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await;
if !to_client_msg_tx.is_closed() {
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await?;
}
Ok(())
}
@@ -278,27 +324,26 @@ pub async fn handle_message(
// 权限检查:是否需要认证,以及是否在信任列表中
if SERVER_INFO.auth_required {
let conn = client_conn_clone.read().await;
if !conn.authenticated {
RelayMessage::send_closed(
// NIP-42 Auth: Forbid unauthenticated EVENT
&conn.id.to_string(), // 使用 client_id 作为 sub_id 来匹配 AUTH 挑战
"auth-required: Authentication required to publish events".to_string(),
&to_client_msg_tx_clone,
)
.await;
debug!("Client {}: Not authenticated, event rejected.", conn.id);
return;
}
let trust_accounts_guard = trust_accounts_clone.read().await;
if !trust_accounts_guard.contains(&event_clone.pubkey) {
RelayMessage::send_closed(
match RelayMessage::send_closed(
// NIP-42 Auth: Forbid unauthorized EVENT
&conn.id.to_string(),
"restricted: You are not in the trust list to publish events"
.to_string(),
&to_client_msg_tx_clone,
)
.await;
.await
{
Ok(_) => debug!(
"Client {}: Pubkey {} not in trust list, authentication challenge sent.",
conn.id, event_clone.pubkey
),
Err(e) => {
error!("Failed to send authentication challenge: {}", e);
return;
}
};
debug!(
"Client {}: Pubkey {} not in trust list, event rejected.",
conn.id, event_clone.pubkey
@@ -332,7 +377,7 @@ pub async fn handle_message(
new_trust_accounts.into_iter().collect();
new_trust_accounts_set.insert(SERVER_INFO.pubkey.to_string()); // 确保服务器公钥始终在信任列表中
*ts = new_trust_accounts_set;
debug!("Trust list updated to {} accounts.", ts.len());
info!("Trust list updated to {} accounts.", ts.len());
}
}
Err(e) => {
@@ -369,7 +414,7 @@ pub async fn handle_message(
"Subscription cancelled".to_string(),
to_client_msg_tx,
)
.await;
.await?;
Ok(())
}
@@ -379,7 +424,7 @@ pub async fn handle_message(
"AUTH message requires an event: [\"AUTH\", <auth_event>]".to_string(),
to_client_msg_tx,
)
.await;
.await?;
return Ok(());
}