Add: Split main.rs
Build and Push Docker Image / build_docker_image (push) Failing after 9m40s

This commit is contained in:
2025-08-04 01:50:00 +08:00
parent 02a59d2bbc
commit 6c42d5d654
14 changed files with 1741 additions and 1370 deletions
+2 -1
View File
@@ -20,4 +20,5 @@ target/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.vscode/
.vscode/
.zed/
Generated
+92 -82
View File
@@ -13,9 +13,9 @@ dependencies = [
[[package]]
name = "adler2"
version = "2.0.0"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
@@ -34,9 +34,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "anstream"
version = "0.6.18"
version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -49,33 +49,33 @@ dependencies = [
[[package]]
name = "anstyle"
version = "1.0.10"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.8"
version = "3.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa"
checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882"
dependencies = [
"anstyle",
"once_cell_polyfill",
@@ -105,9 +105,9 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "backtrace"
@@ -132,9 +132,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "base64ct"
version = "1.7.3"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3"
checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
[[package]]
name = "bitcoin-io"
@@ -172,9 +172,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.17.0"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]]
name = "byteorder"
@@ -190,24 +190,24 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
version = "1.2.25"
version = "1.2.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0fc897dc1e865cc67c0e05a836d9d3f1df3cbe442aa4a9473b18e12624a4951"
checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2"
dependencies = [
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
[[package]]
name = "colorchoice"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "concurrent-queue"
@@ -513,7 +513,7 @@ checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasi 0.11.1+wasi-snapshot-preview1",
]
[[package]]
@@ -536,9 +536,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "hashbrown"
version = "0.15.3"
version = "0.15.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3"
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
dependencies = [
"allocator-api2",
"equivalent",
@@ -734,14 +734,25 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.9.0"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
dependencies = [
"equivalent",
"hashbrown",
]
[[package]]
name = "io-uring"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4"
dependencies = [
"bitflags",
"cfg-if",
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
@@ -756,9 +767,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a194df1107f33c79f4f93d02c80798520551949d59dfad22b6157048a88cca93"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
@@ -769,9 +780,9 @@ dependencies = [
[[package]]
name = "jiff-static"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c6e1db7ed32c6c71b759497fae34bf7933636f75a251b9e736555da426f6442"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
@@ -799,9 +810,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.172"
version = "0.2.174"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776"
[[package]]
name = "libm"
@@ -854,15 +865,15 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.7.4"
version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "miniz_oxide"
version = "0.8.8"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a"
checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316"
dependencies = [
"adler2",
]
@@ -874,7 +885,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasi 0.11.1+wasi-snapshot-preview1",
"windows-sys 0.59.0",
]
@@ -889,7 +900,7 @@ dependencies = [
"httparse",
"log",
"once_cell",
"rand 0.9.1",
"rand 0.9.2",
"secp256k1",
"serde",
"serde_json",
@@ -1054,9 +1065,9 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "portable-atomic"
version = "1.11.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
@@ -1105,9 +1116,9 @@ dependencies = [
[[package]]
name = "r-efi"
version = "5.2.0"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
@@ -1122,9 +1133,9 @@ dependencies = [
[[package]]
name = "rand"
version = "0.9.1"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
@@ -1170,9 +1181,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.5.12"
version = "0.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af"
checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77"
dependencies = [
"bitflags",
]
@@ -1242,9 +1253,9 @@ dependencies = [
[[package]]
name = "rustc-demangle"
version = "0.1.24"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
[[package]]
name = "rustls"
@@ -1305,7 +1316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c3c81b43dc2d8877c216a3fccf76677ee1ebccd429566d3e67447290d0c42b2"
dependencies = [
"bitcoin_hashes",
"rand 0.9.1",
"rand 0.9.2",
"secp256k1-sys",
]
@@ -1340,9 +1351,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.140"
version = "1.0.142"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7"
dependencies = [
"itoa",
"memchr",
@@ -1420,30 +1431,27 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d"
[[package]]
name = "smallvec"
version = "1.15.0"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
dependencies = [
"serde",
]
[[package]]
name = "socket2"
version = "0.5.10"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1680,9 +1688,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.101"
version = "2.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf"
checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
dependencies = [
"proc-macro2",
"quote",
@@ -1747,20 +1755,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.45.1"
version = "1.47.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1866,9 +1876,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.28"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
@@ -1877,9 +1887,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.33"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
dependencies = [
"once_cell",
]
@@ -1895,7 +1905,7 @@ dependencies = [
"http",
"httparse",
"log",
"rand 0.9.1",
"rand 0.9.2",
"sha1",
"thiserror",
"utf-8",
@@ -1994,9 +2004,9 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasi"
@@ -2249,9 +2259,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.7.11"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74c7b26e3480b707944fc872477815d29a8e429d2f93a1ce000f5fa84a15cbcd"
checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95"
dependencies = [
"memchr",
]
@@ -2297,18 +2307,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.25"
version = "0.8.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb"
checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.25"
version = "0.8.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef"
checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181"
dependencies = [
"proc-macro2",
"quote",
+35
View File
@@ -0,0 +1,35 @@
use crate::models::{Limitation, ServerInfo};
use once_cell::sync::Lazy;
use std::env;
pub const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080";
pub const DEFAULT_DB_PATH: &str = "nostr.db";
pub const MAX_EVENT_TAGS: u32 = 5000;
pub const MAX_LIMIT: u64 = 500;
pub const DEFAULT_LIMIT: u64 = 10;
pub const MAX_FILTERS_PER_REQ: usize = 100;
pub const BROADCAST_CHANNEL_SIZE: usize = 100;
pub const CLIENT_CHANNEL_SIZE: usize = 32;
pub const MAX_SUBSCRIPTIONS: usize = 20;
pub static SERVER_INFO: Lazy<ServerInfo> = Lazy::new(|| ServerInfo {
contact: "https://www.moec.top/",
description: "Powered by laoXong.",
limitation: Limitation {
max_event_tags: MAX_EVENT_TAGS,
max_event_time_newer_than_now: 900,
max_event_time_older_than_now: 315576000,
max_filters: MAX_FILTERS_PER_REQ as u32, // usize to u32 cast
max_limit: MAX_LIMIT as u32, // u64 to u32 cast
max_message_length: 524288,
max_subid_length: 100,
max_subscriptions: MAX_SUBSCRIPTIONS as u32, // usize to u32 cast
min_prefix: 10,
},
name: "A rust nostr relay by laoXong",
pubkey: "63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6",
software: "https://git.moe.gift/laoxong/nostr-relay",
supported_nips: vec![1, 2, 5, 42, 65],
version: env!("CARGO_PKG_VERSION"), // 从 Cargo.toml 获取版本
auth_required: env::var("AUTH_REQUIRED").unwrap_or_else(|_| "False".to_string()) == "True",
});
+62
View File
@@ -0,0 +1,62 @@
use crate::constants::SERVER_INFO;
use crate::nostr::utils::extract_p_tags_from_json;
use anyhow::Result;
use log::error;
use sqlx::sqlite::SqlitePool;
use sqlx::{Row, query};
use std::collections::HashSet;
pub async fn init_database(pool: &SqlitePool) -> Result<()> {
let create_events_table = r#"
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
pubkey TEXT NOT NULL,
created_at INTEGER NOT NULL,
kind INTEGER NOT NULL,
tags TEXT NOT NULL,
content TEXT NOT NULL,
sig TEXT NOT NULL,
indexed_at INTEGER DEFAULT (unixepoch())
);
"#;
let create_indexes = vec![
"CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);",
"CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);",
"CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);",
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);",
"CREATE INDEX IF NOT EXISTS idx_events_kind_created_at_desc ON events(kind, created_at DESC);",
];
query(create_events_table).execute(pool).await?;
for index_sql in create_indexes {
query(index_sql).execute(pool).await?;
}
log::info!("Database initialized successfully");
Ok(())
}
// 获取信任账户列表
pub async fn get_trust_accounts(pool: &SqlitePool) -> Vec<String> {
let pubkey = SERVER_INFO.pubkey; // 获取服务器公钥
// 查询最新的 kind 3 事件(联系人列表),获取其中的 p 标签作为信任账户
let sql =
"SELECT tags FROM events WHERE kind = 3 AND pubkey = ? ORDER BY created_at DESC LIMIT 1";
let row = match sqlx::query(sql).bind(pubkey).fetch_optional(pool).await {
Ok(row) => row,
Err(e) => {
error!("Failed to execute query for trust accounts: {}", e);
return Vec::new();
}
};
match row {
Some(row) => {
let tags_json: String = row.get(0);
extract_p_tags_from_json(&tags_json).unwrap_or_else(|_| Vec::new())
}
None => Vec::new(),
}
}
+257
View File
@@ -0,0 +1,257 @@
use anyhow::{Result, anyhow};
use httparse::{EMPTY_HEADER, Request};
use log::{error, info};
use sqlx::SqlitePool;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use crate::constants::SERVER_INFO;
use crate::models::ClientConnection;
use crate::ws_logic::handle_ws_connection;
// 处理多路复用连接(HTTP 或 WebSocket
pub async fn handle_connection_multiplex(
stream: TcpStream,
tx: broadcast::Sender<String>,
rx: broadcast::Receiver<String>,
pool: Arc<SqlitePool>,
clients: Arc<RwLock<HashMap<uuid::Uuid, Arc<RwLock<ClientConnection>>>>>,
trust_accounts: Arc<RwLock<HashSet<String>>>,
) -> Result<(), anyhow::Error> {
// 分配足够大的缓冲区,一次读完整个 HTTP 头
let mut buf = vec![0u8; 4096];
let n = stream.peek(&mut buf).await?;
let req_bytes = &buf[..n];
let mut headers: [httparse::Header; 32] = [EMPTY_HEADER; 32];
let mut req = Request::new(&mut headers);
let status = req.parse(req_bytes)?; // 解析 HTTP 请求
if status.is_partial() {
// 如果缓冲区不够大,没能完全解析请求头
anyhow::bail!("Request headers too large");
}
let method = req.method.unwrap_or("");
// 将请求头转换为 HashMap 便于查找
let header_map = req
.headers
.iter()
.filter_map(|h| {
let name = h.name.to_ascii_lowercase();
let val = std::str::from_utf8(h.value).ok()?;
Some((name, val))
})
.collect::<std::collections::HashMap<_, _>>();
// 判断是否是 WebSocket 升级请求
if method == "GET"
&& header_map
.get("upgrade")
.map(|&v| v.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
{
handle_ws_connection(stream, tx, rx, pool, clients, trust_accounts).await;
} else {
// 处理普通 HTTP 请求
if let Some(accept) = header_map.get("accept") {
if accept.contains("application/json") || accept.contains("application/nostr+json") {
handle_http_info(stream).await?; // 返回 Nostr Relay Info (NIP-11)
} else if accept.contains("text/html") {
handle_http_dashboard(stream).await?; // 返回 HTML 仪表盘
} else {
handle_http_info(stream).await?; // 默认返回信息
}
} else {
handle_http_info(stream).await?; // 默认返回信息
}
}
Ok(())
}
// 处理 HTTP 信息请求 (NIP-11)
pub async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> {
// 读完请求体的剩余部分,保证连接可以关闭
let mut buffer = vec![0; 1024];
let _ = stream.read(&mut buffer).await?;
let json = serde_json::to_string(&*SERVER_INFO).expect("Failed to serialize server info");
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: application/nostr+json\r\n\
Access-Control-Allow-Origin: *\r\n\
Content-Length: {}\r\n\
\r\n\
{}",
json.len(),
json
);
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
// 处理 HTTP 仪表盘请求
pub async fn handle_http_dashboard(mut stream: TcpStream) -> Result<(), anyhow::Error> {
// 读完请求体的剩余部分,保证连接可以关闭
let mut buffer = vec![0; 1024];
let _ = stream.read(&mut buffer).await?;
// HTML 页面内容
let html = format!(
r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Nostr Relay Dashboard</title>
<style>
:root {{
--bg: #fff;
--fg: #333;
--card: #f9f9f9;
--accent: #0052cc;
--accent-light: #e6ebff;
}}
[data-theme="dark"] {{
--bg: #1e1e1e;
--fg: #ddd;
--card: #2a2a2a;
--accent: #4e8cff;
--accent-light: #3a3f58;
}}
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{
background: var(--bg);
color: var(--fg);
font-family: "Segoe UI", Roboto, sans-serif;
line-height: 1.6;
padding: 20px;
transition: background 0.3s, color 0.3s;
}}
.toggle {{
position: fixed;
top: 20px; right: 20px;
background: var(--card);
border: none;
padding: 8px 12px;
border-radius: 4px;
cursor: pointer;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
transition: background 0.3s;
}}
.container {{
max-width: 600px;
margin: 60px auto 0;
background: var(--card);
border-radius: 8px;
padding: 30px;
box-shadow: 0 4px 10px rgba(0,0,0,0.05);
transition: background 0.3s;
}}
h1 {{
font-size: 2rem;
margin-bottom: 10px;
display: flex;
align-items: center;
}}
h1 .rocket {{
font-size: 1.5rem;
margin-right: 8px;
}}
.badge {{
background: var(--accent);
color: #fff;
font-size: 0.8rem;
padding: 2px 8px;
border-radius: 12px;
margin-left: auto;
}}
.info {{
margin: 20px 0;
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
}}
.info p {{
background: var(--accent-light);
padding: 12px;
border-radius: 6px;
}}
.info p strong {{
display: block;
margin-bottom: 4px;
}}
.connect-btn {{
display: inline-block;
background: var(--accent);
color: #fff;
text-decoration: none;
padding: 10px 20px;
border-radius: 6px;
font-weight: bold;
transition: background 0.3s;
}}
.connect-btn:hover {{
background: #003ba1;
}}
@media (max-width: 480px) {{
.info {{
grid-template-columns: 1fr;
}}
.toggle {{
top: auto;
bottom: 20px;
}}
}}
</style>
</head>
<body data-theme="light">
<button class="toggle" onclick="toggleTheme()">切换 夜/日 间模式</button>
<div class="container">
<h1><span class="rocket">🚀</span>Nostr Relay Server<span class="badge">Ver.{}</span></h1>
<div class="info">
<p><strong>WebSocket URL</strong><br>wss://nostr-relay.moe.gift</p>
<p><strong>Status</strong><br>✅ Running</p>
</div>
<p>使用任意兼容 Nostr 协议的客户端连接到上面的 WebSocket 地址,即可发布和接收事件。</p>
</div>
<script>
function toggleTheme() {{
const html = document.body;
const next = html.getAttribute("data-theme") === "light" ? "dark" : "light";
html.setAttribute("data-theme", next);
localStorage.setItem("theme", next);
}}
// 页面加载时恢复上次主题
(function(){{
const saved = localStorage.getItem("theme");
if (saved) document.body.setAttribute("data-theme", saved);
}})();
</script>
</body>
</html>
"#,
env!("CARGO_PKG_VERSION")
);
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/html; charset=utf-8\r\n\
Content-Length: {}\r\n\
Access-Control-Allow-Origin: *\r\n\
\r\n\
{}",
html.len(),
html
);
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
+6
View File
@@ -0,0 +1,6 @@
mod constants;
mod database;
mod handlers;
mod models;
mod nostr;
mod ws_logic;
+51 -1287
View File
File diff suppressed because it is too large Load Diff
+49
View File
@@ -0,0 +1,49 @@
use crate::nostr::Filter;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::mpsc;
use uuid::Uuid;
// 服务器信息结构体(用于 NIP-11)
#[derive(Serialize)]
pub struct ServerInfo {
pub contact: &'static str,
pub description: &'static str,
pub limitation: Limitation,
pub name: &'static str,
pub pubkey: &'static str,
pub software: &'static str,
pub supported_nips: Vec<u32>,
pub version: &'static str,
pub auth_required: bool,
}
// 限制(NIP-11
#[derive(Serialize)]
pub struct Limitation {
pub max_event_tags: u32,
pub max_event_time_newer_than_now: u32,
pub max_event_time_older_than_now: u32,
pub max_filters: u32,
pub max_limit: u32,
pub max_message_length: u32,
pub max_subid_length: u32,
pub max_subscriptions: u32,
pub min_prefix: u32,
}
// 客户端订阅信息
#[derive(Debug, Clone)]
pub struct Subscription {
pub filters: Vec<Filter>,
}
// 客户端连接状态
pub struct ClientConnection {
pub id: Uuid,
pub connected_at: u64,
pub subscriptions: HashMap<String, Subscription>,
pub sender: mpsc::Sender<String>, // 用于向该客户端发送消息的通道
pub authenticated: bool,
pub pubkey: Option<String>, // 认证后存储客户端的公钥
}
+360
View File
@@ -0,0 +1,360 @@
use anyhow::Result;
use log::{debug, error, info};
use secp256k1::{
Message as SecpMessage, Secp256k1, XOnlyPublicKey, schnorr::Signature as SchnorrSignature,
};
use serde_json::{Value, json};
use sha2::{Digest, Sha256};
use sqlx::SqlitePool;
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::{RwLock, mpsc};
use crate::constants::SERVER_INFO;
use crate::models::ClientConnection;
use crate::nostr::NostrEvent;
use crate::nostr::messages::RelayMessage;
pub trait NostrEventExt {
fn serialize_for_id(&self) -> String; // 用于计算事件 ID 的序列化
fn verify(&self) -> bool; // 验证 ID、时间戳、标签数量和签名
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error>; // 保存事件到数据库
async fn handle_auth_event(
&self,
client_conn: &Arc<RwLock<ClientConnection>>,
to_client_msg_tx: &mpsc::Sender<String>,
) -> Result<(), anyhow::Error>; // 处理 NIP-42 认证事件
}
impl NostrEventExt for NostrEvent {
// 根据 Nostr 协议规则序列化事件,用于计算事件 ID
fn serialize_for_id(&self) -> String {
let serialized = json!([
0, // 用于 ID 计算的协议版本,Nostr 当前为 0
self.pubkey,
self.created_at,
self.kind,
self.tags,
self.content
])
.to_string();
serialized
}
// 验证事件的有效性(ID、时间戳、标签数量、签名)
fn verify(&self) -> bool {
debug!("Verifying event: {}", self.id);
// 1. 验证 ID:重新计算 ID 并与事件中提供的 ID 比较
let mut hasher = Sha256::new();
hasher.update(self.serialize_for_id().as_bytes());
let computed_id = hex::encode(hasher.finalize());
if computed_id != self.id {
debug!(
"Event ID verification failed: computed {} != provided {}",
computed_id, self.id
);
return false;
}
// 2. 验证时间戳:检查事件时间是否在可接受的范围内 (NIP-1)
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO) // 如果 SystemTime 在 UNIX_EPOCH 之前,返回 Duration::ZERO
.as_secs();
if self.created_at > now + SERVER_INFO.limitation.max_event_time_newer_than_now as u64 {
debug!(
"Timestamp too far in future: created_at={} now={}",
self.created_at, now
);
return false;
}
if self.created_at
< now.saturating_sub(SERVER_INFO.limitation.max_event_time_older_than_now as u64)
{
debug!(
"Timestamp too far in past: created_at={} now={}",
self.created_at, now
);
return false;
}
// 3. 验证标签数量:检查是否超出服务器限制
if self.tags.len() > SERVER_INFO.limitation.max_event_tags as usize {
debug!(
"Too many tags: {} > {}",
self.tags.len(),
SERVER_INFO.limitation.max_event_tags
);
return false;
}
// 4. 解析公钥:从十六进制字符串解析 XOnlyPublicKey
let pubkey_bytes: Vec<u8> = match hex::decode(&self.pubkey) {
Ok(bytes) => {
if bytes.len() != 32 {
// 公钥长度必须为 32 字节
debug!("Pubkey hex is not 32 bytes after decoding");
return false;
}
bytes
}
Err(e) => {
debug!("Failed to decode pubkey hex: {}", e);
return false;
}
};
let x_only_public_key: XOnlyPublicKey = match pubkey_bytes
.try_into() // 将 Vec<u8> 尝试转换为 [u8; 32]
.map(|bytes: [u8; 32]| XOnlyPublicKey::from_byte_array(bytes))
{
Ok(Ok(key)) => key,
_ => {
debug!("Failed to parse pubkey as XOnlyPublicKey");
return false;
}
};
// 5. 解析签名:从十六进制字符串解析 SchnorrSignature
let sig_bytes: Vec<u8> = match hex::decode(&self.sig) {
Ok(bytes) => {
if bytes.len() != 64 {
// Schnorr 签名长度必须为 64 字节
debug!("Signature hex is not 64 bytes after decoding");
return false;
}
bytes
}
Err(e) => {
debug!("Failed to decode signature hex: {}", e);
return false;
}
};
let signature: SchnorrSignature = match SchnorrSignature::from_slice(&sig_bytes) {
Ok(sig) => sig,
Err(e) => {
debug!("Failed to parse Schnorr signature: {}", e);
return false;
}
};
// 6. 准备消息哈希:事件 ID 本身就是消息的哈希
let event_hash: [u8; 32] = match hex::decode(&self.id) {
Ok(bytes) => {
let mut hash = [0u8; 32];
if bytes.len() != 32 {
debug!("Event ID is not 32 bytes after decoding");
return false;
}
hash.copy_from_slice(&bytes); // 将解码后的字节复制到固定大小数组
hash
}
Err(e) => {
debug!("Failed to decode event ID hex: {}", e);
return false;
}
};
// 7. 创建 SecpMessage 对象进行签名验证
let message: SecpMessage = secp256k1::Message::from_digest(event_hash);
// 8. 验证签名:使用 secp256k1-rs 库进行验证
let secp = Secp256k1::verification_only();
match secp.verify_schnorr(&signature, message.as_ref(), &x_only_public_key) {
Ok(_) => {
debug!("Event verified: {}", self.id);
true
}
Err(e) => {
debug!("Event verification failed for {}: {}", self.id, e);
false
}
}
}
// 保存事件到数据库,并处理可替换事件和删除事件
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
let tags_json = serde_json::to_string(&self.tags).unwrap();
match self.kind {
// NIP-09 事件删除 (Event Deletion)
5 => {
if !self.tags.is_empty() {
for tag in &self.tags {
if tag.get(0).map(|s| s == "e").unwrap_or(false) && tag.len() > 1 {
let event_id_to_delete = &tag[1]; // 获取要删除的事件 ID
debug!(
"Attempting to delete event with ID: {} by request of pubkey {}",
event_id_to_delete, self.pubkey
);
let sql = "DELETE FROM events WHERE id = ? AND pubkey = ?";
let result = sqlx::query(sql)
.bind(event_id_to_delete)
.bind(&self.pubkey)
.execute(pool)
.await?;
if result.rows_affected() > 0 {
info!(
"Deleted event {} by pubkey (kind 5): {}",
event_id_to_delete, self.pubkey
);
} else {
debug!(
"Could not delete event {} for pubkey {}. It might not exist or unauthorized.",
event_id_to_delete, self.pubkey
);
}
}
}
}
}
// NIP-01 可替换事件 (Replaceable Events): kind 0 (Metadata), kind 3 (Contact List)
// NIP-16 可替换事件 (Replaceable Events): kinds 10000 to 19999
// NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999
// 对于所有这些可替换事件,新事件会替换掉旧事件。
// 这里的简化处理是,对于所有可替换事件,都基于 (pubkey, kind) 来删除旧事件。
// 对于 NIP-33 事件,严格来说还需要匹配 'd' tag。
// 但考虑到数据库操作的复杂性以及原始代码的实现,这里仍采用 (pubkey, kind) 的方式。
// 一个更健壮的 NIP-33 实现可能需要单独的字段来存储 'd' tag 或更复杂的 SQL。
0 | 3 | _
if (self.kind >= 10000 && self.kind < 20000)
|| (self.kind >= 30000 && self.kind < 40000) =>
{
debug!(
"Attempting to delete previous replaceable event for pubkey: {}, kind: {}",
self.pubkey, self.kind
);
let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ?";
sqlx::query(sql)
.bind(&self.pubkey)
.bind(self.kind)
.execute(pool)
.await?;
}
_ => { /* 非可替换事件不需要在插入前删除 */ }
}
// 插入新事件
let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);";
sqlx::query(sql)
.bind(&self.id)
.bind(&self.pubkey)
.bind(self.created_at as i64)
.bind(self.kind)
.bind(tags_json)
.bind(&self.content)
.bind(&self.sig)
.execute(pool)
.await?;
Ok(())
}
// 处理 NIP-42 认证事件
async fn handle_auth_event(
&self,
client_conn: &Arc<RwLock<ClientConnection>>,
to_client_msg_tx: &mpsc::Sender<String>,
) -> Result<(), anyhow::Error> {
// 1. 验证事件类型是否为 NIP-42 认证事件 (kind 22242)
if self.kind != 22242 {
RelayMessage::send_ok(
&self.id,
false,
"AUTH event must be kind 22242".to_string(), // 根据协议,OK 消息应包含事件 ID
to_client_msg_tx,
)
.await;
return Ok(());
}
// 2. 验证事件签名是否有效
if !self.verify() {
RelayMessage::send_notice("Invalid AUTH event signature".to_string(), to_client_msg_tx)
.await;
return Ok(());
}
// 3. 提取 "challenge" 标签
let mut challenge = None;
let mut relay_url = None; // NIP-42 也包含 relay url,但在这个验证中不是强制的。
for tag in &self.tags {
if tag.len() >= 2 {
match tag[0].as_str() {
"relay" => relay_url = Some(&tag[1]),
"challenge" => challenge = Some(&tag[1]),
_ => {}
}
}
}
let challenge = match challenge {
Some(c) => c,
None => {
RelayMessage::send_notice(
"AUTH event missing challenge tag".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
};
// 4. 验证 challenge 是否匹配客户端连接 ID,并且未过期
let is_valid_challenge = {
let conn = client_conn.read().await;
if conn.id.to_string() == *challenge {
// 检查 challenge 是否在 15 分钟内有效 (可配置)
let connected_at_duration = UNIX_EPOCH + Duration::from_secs(conn.connected_at);
match SystemTime::now().duration_since(connected_at_duration) {
Ok(elapsed) => elapsed <= Duration::from_secs(15 * 60),
Err(_) => false, // 如果当前时间在连接时间之前,则视为无效
}
} else {
false
}
};
if !is_valid_challenge {
RelayMessage::send_notice("Invalid or expired challenge".to_string(), to_client_msg_tx)
.await;
return Ok(());
}
// 5. 验证时间戳 (NIP-42 规定 `created_at` 必须在当前时间前后 60 秒内)
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
if self.created_at < now.saturating_sub(60) || self.created_at > now + 60 {
RelayMessage::send_notice(
"AUTH event timestamp out of acceptable range (must be within 60s of now)"
.to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
// 认证成功,更新客户端连接状态
{
let mut conn = client_conn.write().await;
conn.authenticated = true;
conn.pubkey = Some(self.pubkey.clone());
}
info!("Client authenticated with pubkey: {}", self.pubkey);
// 根据 NIP-42,认证成功也应该回复 OK 消息
RelayMessage::send_ok(
&self.id,
true,
"Authentication successful.".to_string(),
to_client_msg_tx,
)
.await;
Ok(())
}
}
+227
View File
@@ -0,0 +1,227 @@
use anyhow::Result;
use log::debug;
use sqlx::sqlite::SqlitePool;
use sqlx::{Execute, QueryBuilder, Row};
use std::collections::HashMap;
use crate::constants::{DEFAULT_LIMIT, MAX_LIMIT};
use crate::nostr::Filter;
use crate::nostr::NostrEvent;
// 定义 Filter 的扩展特性
pub trait FilterExt {
async fn select(&self, pool: &SqlitePool) -> Result<Vec<NostrEvent>, sqlx::Error>;
fn matches(&self, event: &NostrEvent) -> bool; // 检查事件是否匹配过滤器
}
impl FilterExt for Filter {
// 根据过滤器从 SQLite 数据库中查询事件
async fn select(&self, pool: &SqlitePool) -> Result<Vec<NostrEvent>, sqlx::Error> {
let mut sql = QueryBuilder::new(
"SELECT id, pubkey, created_at, kind, tags, content, sig FROM events WHERE 1=1 AND kind != 22242", // 排除 AUTH 事件
);
// ID 过滤 (NIP-01 支持前缀匹配)
if let Some(ids) = &self.ids {
if !ids.is_empty() {
sql.push(" AND (");
let mut separated = sql.separated(" OR ");
for id_prefix in ids {
if id_prefix.len() < 64 {
// 如果是 ID 前缀,使用 LIKE
separated
.push("id LIKE ")
.push_bind(format!("{}%", id_prefix));
} else {
// 否则进行精确匹配
separated.push("id = ").push_bind(id_prefix);
}
}
separated.push_unseparated(")");
}
}
// 作者过滤 (NIP-01 支持前缀匹配)
if let Some(authors) = &self.authors {
if !authors.is_empty() {
sql.push(" AND (");
let mut separated = sql.separated(" OR ");
for author_prefix in authors {
if author_prefix.len() < 64 {
// 如果是公钥前缀,使用 LIKE
separated
.push("pubkey LIKE ")
.push_bind(format!("{}%", author_prefix));
} else {
// 否则进行精确匹配
separated.push("pubkey = ").push_bind(author_prefix);
}
}
separated.push_unseparated(")");
}
}
// 类型过滤
if let Some(kinds) = &self.kinds {
if !kinds.is_empty() {
sql.push(" AND kind IN (");
let mut separated = sql.separated(",");
for kind in kinds {
separated.push_bind(kind);
}
separated.push_unseparated(")");
}
}
// 时间过滤 (since)
if let Some(since) = self.since {
sql.push(" AND created_at >= ").push_bind(since as i64);
}
// 时间过滤 (until)
if let Some(until) = self.until {
sql.push(" AND created_at <= ").push_bind(until as i64);
}
// 标签过滤 (NIP-12 Generic Tag Queries)
for (tag_name, tag_values) in &self.tag_filters {
if tag_name.starts_with('#') && !tag_values.is_empty() {
let tag_letter = &tag_name[1..]; // 提取标签字母 (e.g., 'e', 'p', 'd')
// 使用 JSON 函数查询 tags 数组
// SELECT 1 FROM json_each(tags) AS tag_arr WHERE tag_arr.value->>0 = 'p' AND tag_arr.value->>1 IN ('...', '...')
sql.push(" AND EXISTS (SELECT 1 FROM json_each(tags) AS tag_arr WHERE json_array_length(tag_arr.value) > 1 AND json_extract(tag_arr.value, '$[0]') = ");
sql.push_bind(tag_letter);
sql.push(" AND json_extract(tag_arr.value, '$[1]') IN (");
let mut separated = sql.separated(",");
for value in tag_values {
separated.push_bind(value);
}
separated.push_unseparated("))");
}
}
sql.push(" ORDER BY created_at DESC"); // 按创建时间降序排序
let limit = self.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT); // 应用限制,不超过最大限制
sql.push(" LIMIT ").push_bind(limit as i64);
let query = sql.build();
debug!("SQL: {}", query.sql());
let rows = query.fetch_all(pool).await?; // 执行查询
debug!("Selected {} events", rows.len());
// 将查询结果映射为 NostrEvent 结构体
let events: Vec<NostrEvent> = rows
.into_iter()
.filter_map(|row| {
// 尝试解析 tags 字段,如果失败则跳过该行
let tags_str: String = row.get(4);
let tags_parsed = serde_json::from_str::<Vec<Vec<String>>>(&tags_str);
match tags_parsed {
Ok(tags_vec) => Some(NostrEvent {
id: row.get(0),
pubkey: row.get(1),
created_at: row.get::<i64, _>(2) as u64,
kind: row.get(3),
tags: tags_vec,
content: row.get(5),
sig: row.get(6),
}),
Err(e) => {
log::error!(
"Failed to parse tags for event ID {}: {}",
row.get::<String, _>(0),
e
);
None // 错误的标签数据导致事件无法完整构建,跳过
}
}
})
.collect();
Ok(events)
}
// 在内存中检查一个事件是否匹配当前过滤器
fn matches(&self, event: &NostrEvent) -> bool {
// ID 匹配 (支持 NIP-01 前缀匹配)
if let Some(ids) = &self.ids {
if !ids.is_empty() {
let matched_id = ids.iter().any(|id_prefix| {
if id_prefix.len() < 64 {
// 如果是前缀
event.id.starts_with(id_prefix)
} else {
// 精确匹配
event.id == *id_prefix
}
});
if !matched_id {
return false;
}
}
}
// 作者匹配 (支持 NIP-01 前缀匹配)
if let Some(authors) = &self.authors {
if !authors.is_empty() {
let matched_author = authors.iter().any(|author_prefix| {
if author_prefix.len() < 64 {
event.pubkey.starts_with(author_prefix)
} else {
event.pubkey == *author_prefix
}
});
if !matched_author {
return false;
}
}
}
// 类型匹配
if let Some(kinds) = &self.kinds {
if !kinds.is_empty() && !kinds.contains(&event.kind) {
return false;
}
}
// 时间匹配 (since)
if let Some(since) = self.since {
if event.created_at < since {
return false;
}
}
// 时间匹配 (until)
if let Some(until) = self.until {
if event.created_at > until {
return false;
}
}
// 标签匹配 (NIP-12 Generic Tag Queries)
for (tag_name, tag_values) in &self.tag_filters {
if tag_name.starts_with('#') && !tag_values.is_empty() {
let tag_letter = &tag_name[1..];
let mut found_match_for_this_tag_filter = false;
for event_tag in &event.tags {
if event_tag.len() >= 2 && event_tag[0] == tag_letter {
if tag_values.contains(&event_tag[1]) {
found_match_for_this_tag_filter = true;
break;
}
}
}
if !found_match_for_this_tag_filter {
return false; // 如果有任何一个 #tag 过滤器不匹配,则事件不匹配
}
}
}
true // 所有过滤器都匹配
}
}
+72
View File
@@ -0,0 +1,72 @@
use crate::nostr::NostrEvent;
use log::error;
use serde_json;
use tokio::sync::mpsc;
pub struct RelayMessage;
impl RelayMessage {
// 发送 OK 消息 (NIP-20)
pub async fn send_ok(
event_id: &str, // 接受事件 ID 字符串切片
accept: bool,
message: String,
to_client_msg_tx: &mpsc::Sender<String>,
) {
let msg = format!("[\"OK\", \"{}\", {}, \"{}\"]", event_id, accept, message);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send OK message (event_id: {}): {}", event_id, e);
}
}
// 发送 EVENT 消息 (NIP-01)
pub async fn send_event(
event: &NostrEvent,
sub_id: &str,
to_client_msg_tx: &mpsc::Sender<String>,
) {
// 尝试将 NostrEvent 序列化为 JSON 字符串
let event_json = match serde_json::to_string(event) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize event {} for sending: {}", event.id, e);
return; // 序列化失败则不发送
}
};
let msg = format!("[\"EVENT\", \"{}\", {}]", sub_id, event_json);
if let Err(e) = to_client_msg_tx.send(msg).await {
// 此错误通常表示客户端的接收端已关闭(客户端已断开)
error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e);
}
}
// 发送 EOSE 消息 (NIP-15)
pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
let msg = format!("[\"EOSE\", \"{}\"]", sub_id);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e);
}
}
// 发送 CLOSED 消息 (NIP-20)
pub async fn send_closed(
sub_id: &str,
message: String,
to_client_msg_tx: &mpsc::Sender<String>,
) {
let msg = format!("[\"CLOSED\", \"{}\", \"{}\"]", sub_id, message);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e);
}
}
// 发送 NOTICE 消息 (NIP-01)
pub async fn send_notice(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
let msg = format!("[\"NOTICE\", \"{}\"]", message);
if let Err(e) = to_client_msg_tx.send(msg).await {
error!(
"Failed to send NOTICE message (message: {}): {}",
message, e
);
}
}
}
+69
View File
@@ -0,0 +1,69 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub mod event;
pub mod filter;
pub mod messages;
pub mod utils;
// Nostr 事件结构体
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct NostrEvent {
pub id: String, // 32 字节 ID 的十六进制字符串
pub pubkey: String, // 32 字节公钥的十六进制字符串
pub created_at: u64, // Unix 时间戳(秒)
pub kind: u32, // 事件类型(整数 0~65535
pub tags: Vec<Vec<String>>, // 二维字符串数组,表示事件标签
pub content: String, // 事件内容
pub sig: String, // 64 字节签名(Schnorr 签名)的十六进制字符串
}
// 从 Relay 发送给 Client 的消息类型(用于反序列化)
#[derive(Debug, Deserialize)]
#[serde(untagged)] // 用于处理 JSON 数组的第一项作为标签的情况
pub enum RelayMessage {
Event {
sub_id: String,
event: NostrEvent,
}, // ["EVENT", <sub_id>, <event>]
Ok {
event_id: String,
accept: bool,
message: String,
}, // ["OK", <event_id>, bool, <message>]
Eose {
sub_id: String,
}, // ["EOSE", <sub_id>]
Closed {
sub_id: String,
message: String,
}, // ["CLOSED", <sub_id>, <message>]
Notice {
message: String,
}, // ["NOTICE", <message>]
}
// 从 Client 发送给 Relay 的消息类型(用于反序列化)
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum ClientMessage {
REQ { sub_id: String, filter: Vec<Filter> }, // ["REQ", <sub_id>, <filter1>...]
Event { event: NostrEvent }, // ["EVENT", <event>]
CLOSE { sub_id: String }, // ["CLOSE", <sub_id>]
AUTH { event: NostrEvent }, // ["AUTH", <auth_event>] (NIP-42)
}
// Nostr 订阅过滤器结构体
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Filter {
pub ids: Option<Vec<String>>, // Event ID 列表s (NIP-01 prefixes allowed)
pub authors: Option<Vec<String>>, // Pubkey 列表 (NIP-01 prefixes allowed)
pub kinds: Option<Vec<u32>>, // Kind 类型列表
#[serde(flatten)] // 将 HashMap 的键值对直接扁平化到父结构体中
pub tag_filters: HashMap<String, Vec<String>>,
// 用 HashMap 表示 #<letter> 标签过滤器,例如 #e, #p 等。
// Key 是字符串包含 #,Value 是对应标签值的列表。
pub since: Option<u64>, // Unix 时间戳,表示事件创建时间在此之后
pub until: Option<u64>, // Unix 时间戳,表示事件创建时间在此之前
pub limit: Option<u64>, // 最大返回事件数量
}
+60
View File
@@ -0,0 +1,60 @@
use crate::constants::SERVER_INFO;
use log::debug;
use serde_json::Value;
// 从 JSON 字符串形式的 tags 中提取所有 'p' 标签的值(公钥)
pub fn extract_p_tags_from_json(tags_json: &str) -> Result<Vec<String>, serde_json::Error> {
let parsed: Value = serde_json::from_str(tags_json)?; // 解析 JSON 字符串
let mut trust_accounts: Vec<String> = parsed
.as_array() // 确保是 JSON 数组
.unwrap_or(&vec![]) // 如果不是数组,则返回空 Vec
.iter()
.filter_map(|item| {
item.as_array().and_then(|arr| {
// 检查是否是 "p" 标签且至少有两个元素 (['p', pubkey, ...])
if arr.len() >= 2 && arr[0].as_str() == Some("p") {
arr[1].as_str().map(|s| s.to_string()) // 提取第二个元素(公钥)
} else {
None
}
})
})
.collect();
debug!(
"Extracted {} trust accounts from JSON.",
trust_accounts.len()
);
// 确保服务器本身的公钥始终在信任列表中
let server_pubkey = SERVER_INFO.pubkey.to_string();
if !trust_accounts.contains(&server_pubkey) {
trust_accounts.push(server_pubkey);
}
Ok(trust_accounts)
}
// 从 Vec<Vec<String>> 形式的 tags 中提取所有 'p' 标签的值(公钥)
pub fn extract_p_tags_from_vec(tags: &[Vec<String>]) -> Vec<String> {
let mut trust_accounts: Vec<String> = tags
.iter()
.filter_map(|tag| {
// 检查标签是否至少有两个元素且第一个是 "p"
if tag.len() >= 2 && tag[0] == "p" {
Some(tag[1].clone()) // 提取第二个元素(公钥)
} else {
None
}
})
.collect();
debug!(
"Extracted {} trust accounts from Vec.",
trust_accounts.len()
);
// 确保服务器本身的公钥始终在信任列表中
let server_pubkey = SERVER_INFO.pubkey.to_string();
if !trust_accounts.contains(&server_pubkey) {
trust_accounts.push(server_pubkey);
}
trust_accounts
}
+399
View File
@@ -0,0 +1,399 @@
use anyhow::{Result, anyhow};
use futures_util::{SinkExt, StreamExt};
use log::{debug, error, info};
use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::net::TcpStream;
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message};
use uuid::Uuid;
use crate::constants::{CLIENT_CHANNEL_SIZE, MAX_FILTERS_PER_REQ, MAX_SUBSCRIPTIONS, SERVER_INFO};
use crate::models::{ClientConnection, Subscription};
use crate::nostr::event::NostrEventExt;
use crate::nostr::filter::FilterExt;
use crate::nostr::messages::RelayMessage;
use crate::nostr::utils::extract_p_tags_from_vec;
use crate::nostr::{ClientMessage, Filter, NostrEvent};
pub async fn handle_ws_connection(
stream: TcpStream,
event_tx: broadcast::Sender<String>,
mut event_rx: broadcast::Receiver<String>, // 客户端专属的事件接收器
pool: Arc<sqlx::SqlitePool>,
clients: Arc<RwLock<HashMap<Uuid, Arc<RwLock<ClientConnection>>>>>,
trust_accounts: Arc<RwLock<HashSet<String>>>,
) {
let ws_stream = match accept_async(stream).await {
Ok(stream) => stream,
Err(e) => {
error!("WebSocket handshake failed: {}", e);
return;
}
};
let (mut ws_sender, mut ws_reciver) = ws_stream.split();
let (client_tx, mut client_rx) = mpsc::channel::<String>(CLIENT_CHANNEL_SIZE); // 创建用于向此客户端发送消息的 MPSC 通道
let client_id = Uuid::new_v4();
let client_conn = Arc::new(RwLock::new(ClientConnection {
id: client_id,
connected_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs(),
subscriptions: HashMap::new(),
sender: client_tx.clone(),
authenticated: false,
pubkey: None,
}));
// 注册客户端到全局的客户端列表中
{
let mut clients_map = clients.write().await;
clients_map.insert(client_id, client_conn.clone());
}
info!("Client {} connected (WS)", client_id);
let auth_msg = format!("[\"AUTH\", \"{}\"]", client_id); // 发送 AUTH 挑战 (NIP-42)
if let Err(e) = ws_sender.send(Message::Text(auth_msg.into())).await {
error!(
"Failed to send AUTH challenge to client {}: {}",
client_id, e
);
}
debug!("Sent AUTH challenge to client {}", client_id);
let client_conn_for_send = client_conn.clone();
// 启动一个独立的 Task 用于向客户端发送消息
let send_task = tokio::spawn(async move {
loop {
tokio::select! {
// 处理订阅事件:从广播通道接收事件,检查是否匹配客户端的订阅过滤器
Ok(event_msg) = event_rx.recv() => {
let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent
let conn = client_conn_for_send.read().await;
for (sub_id, subscription) in &conn.subscriptions {
if subscription.filters.iter().any(|filter| filter.matches(&event)) {
RelayMessage::send_event(&event, sub_id, &conn.sender).await;
}
}
}
// 处理直接发送给客户端的消息:从 client_rx 接收消息
Some(msg) = client_rx.recv() => {
debug!("Client {} sending message: {}", client_id, msg);
if ws_sender
.send(Message::Text(msg.into()))
.await
.is_err()
{
info!("Client {} sender disconnected", client_id);
break;
}
}
else => {
info!("Client {} send task terminated unexpectedly", client_id);
break;
}
}
}
});
// 循环接收来自客户端的 WebSocket 消息
while let Some(msg) = ws_reciver.next().await {
match msg {
Ok(Message::Text(text)) => {
debug!("Client {} received text: {}", client_id, text);
let pool_clone = pool.clone();
let client_conn_for_msg = client_conn.clone();
let trust_accounts_clone = trust_accounts.clone();
let client_tx = client_tx.clone();
let event_tx = event_tx.clone();
// 在一个新的 Task 中处理消息,避免阻塞主循环
tokio::spawn(async move {
if let Err(e) = handle_message(
&text,
&pool_clone,
&client_tx,
&client_conn_for_msg,
&event_tx,
trust_accounts_clone,
)
.await
{
error!("Error handling message for client {}: {}", client_id, e);
RelayMessage::send_notice(format!("Invalid message: {}", e), &client_tx)
.await;
}
});
}
Ok(Message::Close(_)) => {
info!("Client {} disconnected via CLOSE message", client_id);
break;
}
Err(e) => {
error!("Client {} WebSocket error: {}", client_id, e);
break;
}
_ => {}
}
}
// 清理:客户端断开连接时,从全局列表中移除并中止发送任务
{
let mut clients_map = clients.write().await;
clients_map.remove(&client_id);
}
send_task.abort();
info!(
"Client {} connection fully closed and cleaned up.",
client_id
);
}
// 处理从客户端接收到的 Nostr 协议消息(REQ, EVENT, CLOSE, AUTH
pub async fn handle_message(
text: &str,
pool: &sqlx::SqlitePool,
to_client_msg_tx: &mpsc::Sender<String>,
client_conn: &Arc<RwLock<ClientConnection>>,
event_tx: &broadcast::Sender<String>,
trust_accounts: Arc<RwLock<HashSet<String>>>,
) -> Result<(), anyhow::Error> {
let v: Value = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?;
let arr = match v {
Value::Array(a) => a,
_ => return Err(anyhow!("Expected JSON array, got: {}", v)),
};
if arr.is_empty() {
return Err(anyhow!("Empty array message"));
}
let command = arr[0]
.as_str()
.ok_or_else(|| anyhow!("First element must be a string"))?;
match command {
"REQ" => {
if arr.len() < 3 {
RelayMessage::send_notice(
"Not enough array elements for REQ: REQ <sub_id> <filter1> [filter2...]"
.to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
let sub_id = arr
.get(1)
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("REQ missing sub_id"))?
.to_string();
// 检查客户端是否超出最大订阅数量
{
let conn = client_conn.read().await;
if conn.subscriptions.len() >= MAX_SUBSCRIPTIONS {
RelayMessage::send_closed(
&sub_id,
format!("Maximum subscriptions ({}) exceeded", MAX_SUBSCRIPTIONS),
to_client_msg_tx,
)
.await;
return Ok(());
}
}
let mut filters = Vec::new();
for i in 2..arr.len() {
if filters.len() >= MAX_FILTERS_PER_REQ {
RelayMessage::send_closed(
&sub_id,
format!("Maximum filters ({}) exceeded", MAX_FILTERS_PER_REQ),
to_client_msg_tx,
)
.await;
return Ok(());
}
let filter: Filter = serde_json::from_value(arr[i].clone())
.map_err(|e| anyhow!("Filter parse error: {}", e))?;
filters.push(filter);
}
debug!("REQ subscription: {}, filters: {:?}", sub_id, filters);
// 将订阅信息添加到客户端连接中
{
let mut conn = client_conn.write().await;
conn.subscriptions.insert(
sub_id.clone(),
Subscription {
filters: filters.clone(),
},
);
}
// 查询历史事件并发送
for filter in &filters {
let events = filter.select(pool).await?; // 使用 FilterExt
for event in events {
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await;
}
}
// 发送 EOSE 消息,表示历史事件已发送完毕
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await;
Ok(())
}
"EVENT" => {
let event: NostrEvent = serde_json::from_value(arr[1].clone())
.map_err(|e| anyhow!("Event parse error: {}", e))?;
debug!("EVENT received: {:?}", event);
// 验证事件签名和基本有效性 (时间戳、标签数量等)
if !event.verify() {
// 使用 NostrEventExt
RelayMessage::send_ok(
&event.id,
false,
"invalid: signature verification failed or event malformed".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
let pool_clone = pool.clone();
let to_client_msg_tx_clone = to_client_msg_tx.clone();
let event_tx_clone = event_tx.clone();
let event_clone = event.clone();
let client_conn_clone = client_conn.clone();
let trust_accounts_clone = trust_accounts.clone();
// 在一个新的 Task 中处理事件的保存和广播,避免阻塞 WebSocket 接收循环
tokio::spawn(async move {
// 权限检查:是否需要认证,以及是否在信任列表中
if SERVER_INFO.auth_required {
let conn = client_conn_clone.read().await;
if !conn.authenticated {
RelayMessage::send_closed(
// NIP-42 Auth: Forbid unauthenticated EVENT
&conn.id.to_string(), // 使用 client_id 作为 sub_id 来匹配 AUTH 挑战
"auth-required: Authentication required to publish events".to_string(),
&to_client_msg_tx_clone,
)
.await;
debug!("Client {}: Not authenticated, event rejected.", conn.id);
return;
}
let trust_accounts_guard = trust_accounts_clone.read().await;
if !trust_accounts_guard.contains(&event_clone.pubkey) {
RelayMessage::send_closed(
// NIP-42 Auth: Forbid unauthorized EVENT
&conn.id.to_string(),
"restricted: You are not in the trust list to publish events"
.to_string(),
&to_client_msg_tx_clone,
)
.await;
debug!(
"Client {}: Pubkey {} not in trust list, event rejected.",
conn.id, event_clone.pubkey
);
return;
}
}
// 保存事件到数据库,并发送 OK 响应
match event_clone.save(&pool_clone).await {
// 使用 NostrEventExt
Ok(_) => {
RelayMessage::send_ok(
&event_clone.id,
true,
"Event received and saved".to_string(),
&to_client_msg_tx_clone,
)
.await;
// 广播事件给所有订阅的客户端
if let Err(e) =
event_tx_clone.send(serde_json::to_string(&event_clone).unwrap())
{
error!("Failed to broadcast event {}: {}", event_clone.id, e);
}
// 如果是 Kind 3 事件(联系人列表)且来自服务器公钥,则更新信任列表
if event_clone.kind == 3 && event_clone.pubkey == SERVER_INFO.pubkey {
let mut ts = trust_accounts_clone.write().await;
let new_trust_accounts = extract_p_tags_from_vec(&event_clone.tags); // 使用 Nostr 工具函数
let mut new_trust_accounts_set: HashSet<String> =
new_trust_accounts.into_iter().collect();
new_trust_accounts_set.insert(SERVER_INFO.pubkey.to_string()); // 确保服务器公钥始终在信任列表中
*ts = new_trust_accounts_set;
debug!("Trust list updated to {} accounts.", ts.len());
}
}
Err(e) => {
RelayMessage::send_ok(
&event_clone.id,
false,
e.to_string(),
&to_client_msg_tx_clone,
)
.await;
error!("Failed to save event {}: {}", event_clone.id, e);
}
}
});
Ok(())
}
"CLOSE" => {
let sub_id = arr
.get(1)
.and_then(Value::as_str)
.ok_or_else(|| anyhow!("CLOSE missing sub_id"))?
.to_string();
// 从客户端连接中移除订阅
{
let mut conn = client_conn.write().await;
conn.subscriptions.remove(&sub_id);
}
debug!("CLOSE subscription: {}", sub_id);
RelayMessage::send_closed(
&sub_id,
"Subscription cancelled".to_string(),
to_client_msg_tx,
)
.await;
Ok(())
}
"AUTH" => {
if arr.len() < 2 {
RelayMessage::send_notice(
"AUTH message requires an event: [\"AUTH\", <auth_event>]".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
let auth_event: NostrEvent = serde_json::from_value(arr[1].clone())
.map_err(|e| anyhow!("AUTH event parse error: {}", e))?;
// 处理认证事件
auth_event
.handle_auth_event(client_conn, to_client_msg_tx) // 使用 NostrEventExt
.await?;
Ok(())
}
other => {
return Err(anyhow!("Unknown command: {}", other));
}
}
}