From 6c42d5d654ef45aba2ad0f7733168d532bfdc6b2 Mon Sep 17 00:00:00 2001 From: laoXong Date: Mon, 4 Aug 2025 01:50:00 +0800 Subject: [PATCH] Add: Split main.rs --- .gitignore | 3 +- Cargo.lock | 174 +++--- src/constants.rs | 35 ++ src/database.rs | 62 ++ src/handlers.rs | 257 ++++++++ src/lib.rs | 6 + src/main.rs | 1338 ++--------------------------------------- src/models.rs | 49 ++ src/nostr/event.rs | 360 +++++++++++ src/nostr/filter.rs | 227 +++++++ src/nostr/messages.rs | 72 +++ src/nostr/mod.rs | 69 +++ src/nostr/utils.rs | 60 ++ src/ws_logic.rs | 399 ++++++++++++ 14 files changed, 1741 insertions(+), 1370 deletions(-) create mode 100644 src/constants.rs create mode 100644 src/database.rs create mode 100644 src/handlers.rs create mode 100644 src/models.rs create mode 100644 src/nostr/event.rs create mode 100644 src/nostr/filter.rs create mode 100644 src/nostr/messages.rs create mode 100644 src/nostr/mod.rs create mode 100644 src/nostr/utils.rs create mode 100644 src/ws_logic.rs diff --git a/.gitignore b/.gitignore index 92ab075..ba31e9e 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,5 @@ target/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ +.zed/ diff --git a/Cargo.lock b/Cargo.lock index 6ddbbc1..1a559e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,9 +13,9 @@ dependencies = [ [[package]] name = "adler2" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" [[package]] name = "aho-corasick" @@ -34,9 +34,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anstream" -version = "0.6.18" +version = "0.6.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" dependencies = [ "anstyle", "anstyle-parse", @@ -49,33 +49,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" [[package]] name = "anstyle-parse" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" dependencies = [ "windows-sys 0.59.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.8" +version = "3.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa" +checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" dependencies = [ "anstyle", "once_cell_polyfill", @@ -105,9 +105,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "backtrace" @@ -132,9 +132,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.7.3" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" +checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" [[package]] name = "bitcoin-io" @@ -172,9 +172,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.17.0" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" [[package]] name = "byteorder" @@ -190,24 +190,24 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.25" +version = "1.2.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0fc897dc1e865cc67c0e05a836d9d3f1df3cbe442aa4a9473b18e12624a4951" +checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2" dependencies = [ "shlex", ] [[package]] name = "cfg-if" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" [[package]] name = "colorchoice" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" [[package]] name = "concurrent-queue" @@ -513,7 +513,7 @@ checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi 0.11.1+wasi-snapshot-preview1", ] [[package]] @@ -536,9 +536,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "hashbrown" -version = "0.15.3" +version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" dependencies = [ "allocator-api2", "equivalent", @@ -734,14 +734,25 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown", ] +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -756,9 +767,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jiff" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a194df1107f33c79f4f93d02c80798520551949d59dfad22b6157048a88cca93" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" dependencies = [ "jiff-static", "log", @@ -769,9 +780,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c6e1db7ed32c6c71b759497fae34bf7933636f75a251b9e736555da426f6442" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" dependencies = [ "proc-macro2", "quote", @@ -799,9 +810,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.172" +version = "0.2.174" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" [[package]] name = "libm" @@ -854,15 +865,15 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.4" +version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "miniz_oxide" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", ] @@ -874,7 +885,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.59.0", ] @@ -889,7 +900,7 @@ dependencies = [ "httparse", "log", "once_cell", - "rand 0.9.1", + "rand 0.9.2", "secp256k1", "serde", "serde_json", @@ -1054,9 +1065,9 @@ checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] name = "portable-atomic" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" [[package]] name = "portable-atomic-util" @@ -1105,9 +1116,9 @@ dependencies = [ [[package]] name = "r-efi" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" [[package]] name = "rand" @@ -1122,9 +1133,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", @@ -1170,9 +1181,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.12" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" +checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" dependencies = [ "bitflags", ] @@ -1242,9 +1253,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" [[package]] name = "rustls" @@ -1305,7 +1316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c3c81b43dc2d8877c216a3fccf76677ee1ebccd429566d3e67447290d0c42b2" dependencies = [ "bitcoin_hashes", - "rand 0.9.1", + "rand 0.9.2", "secp256k1-sys", ] @@ -1340,9 +1351,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7" dependencies = [ "itoa", "memchr", @@ -1420,30 +1431,27 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" [[package]] name = "smallvec" -version = "1.15.0" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" dependencies = [ "serde", ] [[package]] name = "socket2" -version = "0.5.10" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1680,9 +1688,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.101" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -1747,20 +1755,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1866,9 +1876,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.28" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", @@ -1877,9 +1887,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", ] @@ -1895,7 +1905,7 @@ dependencies = [ "http", "httparse", "log", - "rand 0.9.1", + "rand 0.9.2", "sha1", "thiserror", "utf-8", @@ -1994,9 +2004,9 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "wasi" -version = "0.11.0+wasi-snapshot-preview1" +version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" @@ -2249,9 +2259,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74c7b26e3480b707944fc872477815d29a8e429d2f93a1ce000f5fa84a15cbcd" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" dependencies = [ "memchr", ] @@ -2297,18 +2307,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.25" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.25" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" dependencies = [ "proc-macro2", "quote", diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..907cd44 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,35 @@ +use crate::models::{Limitation, ServerInfo}; +use once_cell::sync::Lazy; +use std::env; + +pub const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080"; +pub const DEFAULT_DB_PATH: &str = "nostr.db"; +pub const MAX_EVENT_TAGS: u32 = 5000; +pub const MAX_LIMIT: u64 = 500; +pub const DEFAULT_LIMIT: u64 = 10; +pub const MAX_FILTERS_PER_REQ: usize = 100; +pub const BROADCAST_CHANNEL_SIZE: usize = 100; +pub const CLIENT_CHANNEL_SIZE: usize = 32; +pub const MAX_SUBSCRIPTIONS: usize = 20; + +pub static SERVER_INFO: Lazy = Lazy::new(|| ServerInfo { + contact: "https://www.moec.top/", + description: "Powered by laoXong.", + limitation: Limitation { + max_event_tags: MAX_EVENT_TAGS, + max_event_time_newer_than_now: 900, + max_event_time_older_than_now: 315576000, + max_filters: MAX_FILTERS_PER_REQ as u32, // usize to u32 cast + max_limit: MAX_LIMIT as u32, // u64 to u32 cast + max_message_length: 524288, + max_subid_length: 100, + max_subscriptions: MAX_SUBSCRIPTIONS as u32, // usize to u32 cast + min_prefix: 10, + }, + name: "A rust nostr relay by laoXong", + pubkey: "63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6", + software: "https://git.moe.gift/laoxong/nostr-relay", + supported_nips: vec![1, 2, 5, 42, 65], + version: env!("CARGO_PKG_VERSION"), // 从 Cargo.toml 获取版本 + auth_required: env::var("AUTH_REQUIRED").unwrap_or_else(|_| "False".to_string()) == "True", +}); diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..d3f377b --- /dev/null +++ b/src/database.rs @@ -0,0 +1,62 @@ +use crate::constants::SERVER_INFO; +use crate::nostr::utils::extract_p_tags_from_json; +use anyhow::Result; +use log::error; +use sqlx::sqlite::SqlitePool; +use sqlx::{Row, query}; +use std::collections::HashSet; + +pub async fn init_database(pool: &SqlitePool) -> Result<()> { + let create_events_table = r#" + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + pubkey TEXT NOT NULL, + created_at INTEGER NOT NULL, + kind INTEGER NOT NULL, + tags TEXT NOT NULL, + content TEXT NOT NULL, + sig TEXT NOT NULL, + indexed_at INTEGER DEFAULT (unixepoch()) + ); + "#; + + let create_indexes = vec![ + "CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);", + "CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);", + "CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);", + "CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);", + "CREATE INDEX IF NOT EXISTS idx_events_kind_created_at_desc ON events(kind, created_at DESC);", + ]; + + query(create_events_table).execute(pool).await?; + + for index_sql in create_indexes { + query(index_sql).execute(pool).await?; + } + + log::info!("Database initialized successfully"); + Ok(()) +} + +// 获取信任账户列表 +pub async fn get_trust_accounts(pool: &SqlitePool) -> Vec { + let pubkey = SERVER_INFO.pubkey; // 获取服务器公钥 + // 查询最新的 kind 3 事件(联系人列表),获取其中的 p 标签作为信任账户 + let sql = + "SELECT tags FROM events WHERE kind = 3 AND pubkey = ? ORDER BY created_at DESC LIMIT 1"; + + let row = match sqlx::query(sql).bind(pubkey).fetch_optional(pool).await { + Ok(row) => row, + Err(e) => { + error!("Failed to execute query for trust accounts: {}", e); + return Vec::new(); + } + }; + match row { + Some(row) => { + let tags_json: String = row.get(0); + extract_p_tags_from_json(&tags_json).unwrap_or_else(|_| Vec::new()) + } + None => Vec::new(), + } +} diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..a8c3b6d --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,257 @@ +use anyhow::{Result, anyhow}; +use httparse::{EMPTY_HEADER, Request}; +use log::{error, info}; +use sqlx::SqlitePool; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::sync::RwLock; +use tokio::sync::broadcast; + +use crate::constants::SERVER_INFO; +use crate::models::ClientConnection; +use crate::ws_logic::handle_ws_connection; + +// 处理多路复用连接(HTTP 或 WebSocket) +pub async fn handle_connection_multiplex( + stream: TcpStream, + tx: broadcast::Sender, + rx: broadcast::Receiver, + pool: Arc, + clients: Arc>>>>, + trust_accounts: Arc>>, +) -> Result<(), anyhow::Error> { + // 分配足够大的缓冲区,一次读完整个 HTTP 头 + let mut buf = vec![0u8; 4096]; + let n = stream.peek(&mut buf).await?; + let req_bytes = &buf[..n]; + + let mut headers: [httparse::Header; 32] = [EMPTY_HEADER; 32]; + let mut req = Request::new(&mut headers); + let status = req.parse(req_bytes)?; // 解析 HTTP 请求 + if status.is_partial() { + // 如果缓冲区不够大,没能完全解析请求头 + anyhow::bail!("Request headers too large"); + } + let method = req.method.unwrap_or(""); + // 将请求头转换为 HashMap 便于查找 + let header_map = req + .headers + .iter() + .filter_map(|h| { + let name = h.name.to_ascii_lowercase(); + let val = std::str::from_utf8(h.value).ok()?; + Some((name, val)) + }) + .collect::>(); + + // 判断是否是 WebSocket 升级请求 + if method == "GET" + && header_map + .get("upgrade") + .map(|&v| v.eq_ignore_ascii_case("websocket")) + .unwrap_or(false) + { + handle_ws_connection(stream, tx, rx, pool, clients, trust_accounts).await; + } else { + // 处理普通 HTTP 请求 + if let Some(accept) = header_map.get("accept") { + if accept.contains("application/json") || accept.contains("application/nostr+json") { + handle_http_info(stream).await?; // 返回 Nostr Relay Info (NIP-11) + } else if accept.contains("text/html") { + handle_http_dashboard(stream).await?; // 返回 HTML 仪表盘 + } else { + handle_http_info(stream).await?; // 默认返回信息 + } + } else { + handle_http_info(stream).await?; // 默认返回信息 + } + } + + Ok(()) +} + +// 处理 HTTP 信息请求 (NIP-11) +pub async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> { + // 读完请求体的剩余部分,保证连接可以关闭 + let mut buffer = vec![0; 1024]; + let _ = stream.read(&mut buffer).await?; + + let json = serde_json::to_string(&*SERVER_INFO).expect("Failed to serialize server info"); + let response = format!( + "HTTP/1.1 200 OK\r\n\ + Content-Type: application/nostr+json\r\n\ + Access-Control-Allow-Origin: *\r\n\ + Content-Length: {}\r\n\ + \r\n\ + {}", + json.len(), + json + ); + stream.write_all(response.as_bytes()).await?; + stream.flush().await?; + + Ok(()) +} + +// 处理 HTTP 仪表盘请求 +pub async fn handle_http_dashboard(mut stream: TcpStream) -> Result<(), anyhow::Error> { + // 读完请求体的剩余部分,保证连接可以关闭 + let mut buffer = vec![0; 1024]; + let _ = stream.read(&mut buffer).await?; + + // HTML 页面内容 + let html = format!( + r#" + + + + + Nostr Relay Dashboard + + + + +
+

🚀Nostr Relay ServerVer.{}

+
+

WebSocket URL
wss://nostr-relay.moe.gift

+

Status
✅ Running

+
+

使用任意兼容 Nostr 协议的客户端连接到上面的 WebSocket 地址,即可发布和接收事件。

+
+ + + +"#, + env!("CARGO_PKG_VERSION") + ); + + let response = format!( + "HTTP/1.1 200 OK\r\n\ + Content-Type: text/html; charset=utf-8\r\n\ + Content-Length: {}\r\n\ + Access-Control-Allow-Origin: *\r\n\ + \r\n\ + {}", + html.len(), + html + ); + + stream.write_all(response.as_bytes()).await?; + stream.flush().await?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index e69de29..83d2fcb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -0,0 +1,6 @@ +mod constants; +mod database; +mod handlers; +mod models; +mod nostr; +mod ws_logic; diff --git a/src/main.rs b/src/main.rs index 9159c3c..4eeea20 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,1316 +1,80 @@ +mod constants; +mod database; +mod handlers; +mod models; +mod ws_logic; + +mod nostr; + use anyhow::Result; -use anyhow::anyhow; -use futures_util::{SinkExt, StreamExt}; -use hex; -use httparse::{EMPTY_HEADER, Request}; -use log::{Level, debug, error, info, log_enabled}; -use once_cell::sync::Lazy; -use secp256k1::{Message as SecpMessage, PublicKey, Secp256k1, ecdsa::Signature, XOnlyPublicKey, schnorr::Signature as SchnorrSignature}; -use serde::de; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; -use sha2::{Digest, Sha256}; -use sqlx::Execute; -use sqlx::QueryBuilder; -use sqlx::Row; -use sqlx::query; +use log::info; use sqlx::sqlite::SqlitePool; +use std::collections::HashMap; use std::env; -use std::time::Duration; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::Mutex; -use tokio::sync::{RwLock, mpsc}; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::broadcast, -}; -use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message}; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::{RwLock, broadcast}; + +use constants::{BROADCAST_CHANNEL_SIZE, DEFAULT_BIND_ADDR, DEFAULT_DB_PATH}; +use database::get_trust_accounts; +use models::ClientConnection; use uuid::Uuid; -const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080"; -const DEFAULT_DB_PATH: &str = "nostr.db"; -const MAX_EVENT_TAGS: u32 = 5000; -const MAX_LIMIT: u64 = 500; -const DEFAULT_LIMIT: u64 = 10; -const MAX_FILTERS_PER_REQ: usize = 100; -const BROADCAST_CHANNEL_SIZE: usize = 100; -const CLIENT_CHANNEL_SIZE: usize = 32; -const MAX_SUBSCRIPTIONS: usize = 20; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct NostrEvent { - pub id: String, // 32 bytes hex string of sha256 - pub pubkey: String, // 32 bytes hex pubkey - pub created_at: u64, // unix timestamp seconds - pub kind: u32, // integer 0~65535 - pub tags: Vec>, // 二维字符串数组 - pub content: String, // 事件内容 - pub sig: String, // 64 bytes hex signature -} - -#[derive(Debug, Deserialize)] -#[serde(untagged)] -pub enum RelayMessage { - Event { - sub_id: String, - event: NostrEvent, - }, // ["EVENT", , ] - Ok { - event_id: String, - accept: bool, - message: String, - }, // ["OK", , bool, ] - Eose { - sub_id: String, - }, // ["EOSE", ] - Closed { - sub_id: String, - message: String, - }, // ["CLOSED", , ] - Notice { - message: String, - }, // ["NOTICE", ] -} - -#[derive(Debug, Deserialize)] -#[serde(untagged)] -pub enum ClientMessage { - REQ { sub_id: String, filter: Vec }, - Event { event: NostrEvent }, - CLOSE { sub_id: String }, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Filter { - pub ids: Option>, // event ids 列表 - pub authors: Option>, // pubkeys 列表,小写字符串 - pub kinds: Option>, // kind 类型列表,数字 - #[serde(flatten)] - pub tag_filters: HashMap>, - // 用 HashMap 表示 # 标签过滤器,例如 #e, #p 等,key 是字符串包含#,value 是对应列表 - pub since: Option, // Unix 时间戳,秒 - pub until: Option, // Unix 时间戳,秒 - pub limit: Option, // 最大返回数 -} -#[derive(Serialize)] -struct Limitation { - max_event_tags: u32, - max_event_time_newer_than_now: u32, - max_event_time_older_than_now: u32, - max_filters: u32, - max_limit: u32, - max_message_length: u32, - max_subid_length: u32, - max_subscriptions: u32, - min_prefix: u32, -} - -#[derive(Serialize)] -struct ServerInfo { - contact: &'static str, - description: &'static str, - limitation: Limitation, - name: &'static str, - pubkey: &'static str, - software: &'static str, - supported_nips: Vec, - version: &'static str, - auth_required: bool, -} - -#[derive(Debug, Clone)] -struct Subscription { - filters: Vec, -} - -struct ClientConnection { - id: Uuid, - connected_at: u64, - subscriptions: HashMap, - sender: mpsc::Sender, - authenticated: bool, - pubkey: Option, -} - -static SERVER_INFO: Lazy = Lazy::new(|| ServerInfo { - contact: "https://www.moec.top/", - description: "Powered by laoXong.", - limitation: Limitation { - max_event_tags: 5000, - max_event_time_newer_than_now: 900, - max_event_time_older_than_now: 315576000, - max_filters: 100, - max_limit: 500, - max_message_length: 524288, - max_subid_length: 100, - max_subscriptions: 20, - min_prefix: 10, - }, - name: "A rust nostr relay by laoXong", - pubkey: "63abd4f817e39cca4e6abb6e6cf3e133bb718cf8ec28b38c1645e84d7a6190c6", - software: "https://git.moe.gift/laoxong/nostr-relay", - supported_nips: vec![1, 2, 5, 42, 65], - version: env!("CARGO_PKG_VERSION"), - auth_required: env::var("AUTH_REQUIRED").unwrap_or_else(|_| "False".to_string()) == "True", -}); - #[tokio::main] async fn main() -> Result<()> { env_logger::init(); - let db_path = env::var("DB_PATH").unwrap_or_else(|_| "nostr.db".to_string()); + info!("Nostr Relay starting up..."); + + // 数据库初始化 + let db_path = env::var("DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string()); let db_url = format!("sqlite://{}", db_path); let pool = SqlitePool::connect(&db_url).await?; - info!("Database pool connected successfully"); - init_database(&pool).await?; - let pool = Arc::new(pool); + info!("Database pool connected successfully to {}", db_url); + database::init_database(&pool).await?; // 使用 database 模块的函数 + let pool = Arc::new(pool); // 将 SqlitePool 包裹在 Arc 中,以便多线程共享 info!("Connected to SQLite"); + + // 服务器监听设置 let addr = env::var("BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string()); let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); info!("Listening on: {}", addr); - let (event_tx, _) = broadcast::channel::(100); + // 广播通道用于事件分发,以及客户端连接和信任账户的共享状态 + let (event_tx, _) = broadcast::channel::(BROADCAST_CHANNEL_SIZE); let clients: Arc>>>> = Arc::new(RwLock::new(HashMap::new())); - let accounts = get_trust_accounts(&pool).await; - let trust_accounts: HashSet<_> = accounts.into_iter().collect(); - let trust_accounts = Arc::new(RwLock::new(trust_accounts)); + let accounts = get_trust_accounts(&pool).await; + let trust_accounts: std::collections::HashSet<_> = accounts.into_iter().collect(); + let trust_accounts = Arc::new(RwLock::new(trust_accounts)); + info!( + "Loaded {} trust accounts.", + trust_accounts.read().await.len() + ); + + // 接受传入连接并为每个连接 spawn 一个处理任务 while let Ok((stream, client_addr)) = listener.accept().await { - let event_tx = event_tx.clone(); - let event_rx = event_tx.subscribe(); - let pool = pool.clone(); - let clients = clients.clone(); - let trust_accounts = trust_accounts.clone(); + info!("Accepted connection from: {}", client_addr); + let event_tx_clone = event_tx.clone(); + let event_rx_clone = event_tx.subscribe(); + let pool_clone = pool.clone(); + let clients_clone = clients.clone(); + let trust_accounts_clone = trust_accounts.clone(); + tokio::spawn(async move { - if let Err(e) = handle_connection_multiplex( + if let Err(e) = handlers::handle_connection_multiplex( stream, - event_tx, - event_rx, - pool, - clients, - trust_accounts, + event_tx_clone, + event_rx_clone, + pool_clone, + clients_clone, + trust_accounts_clone, ) .await { - error!("Error handling connection: {}", e); + log::error!("Error handling connection from {}: {}", client_addr, e); } }); } Ok(()) } - -async fn handle_connection_multiplex( - stream: TcpStream, - tx: broadcast::Sender, - rx: broadcast::Receiver, - pool: Arc, - clients: Arc>>>>, - trust_accounts: Arc>>, -) -> Result<(), anyhow::Error> { - // 分配足够大的缓冲区,一次读完整个头 - let mut buf = vec![0u8; 4096]; - let n = stream.peek(&mut buf).await?; - let req_bytes = &buf[..n]; - - let mut headers: [httparse::Header; 32] = [EMPTY_HEADER; 32]; - let mut req = Request::new(&mut headers); - let status = req.parse(req_bytes)?; - if status.is_partial() { - anyhow::bail!("Request headers too large"); - } - let method = req.method.unwrap_or(""); - let header_map = req - .headers - .iter() - .filter_map(|h| { - let name = h.name.to_ascii_lowercase(); - let val = std::str::from_utf8(h.value).ok()?; - Some((name, val)) - }) - .collect::>(); - - if method == "GET" - && header_map - .get("upgrade") - .map(|&v| v.eq_ignore_ascii_case("websocket")) - .unwrap_or(false) - { - handle_ws_connection(stream, tx, rx, pool, clients, trust_accounts).await; - } else { - if let Some(accept) = header_map.get("accept") { - if accept.contains("application/json") || accept.contains("application/json") { - handle_http_info(stream).await?; - } else if accept.contains("text/html") { - handle_http(stream).await?; - } else { - handle_http_info(stream).await?; - } - } else { - handle_http_info(stream).await?; - } - } - - Ok(()) -} - -async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> { - let mut buffer = vec![0; 1024]; - stream.read(&mut buffer).await?; - - let json = serde_json::to_string(&*SERVER_INFO).expect("Failed to serialize server info"); - let response = format!( - "HTTP/1.1 200 OK\r\n\ - Content-Type: application/nostr+json\r\n\ - Access-Control-Allow-Origin: *\r\n\ - \r\n\ - {}", - json - ); - stream.write_all(response.as_bytes()).await?; - stream.flush().await?; - - Ok(()) -} - -async fn handle_http(mut stream: TcpStream) -> Result<(), anyhow::Error> { - let mut buffer = vec![0; 1024]; - stream.read(&mut buffer).await?; - let html = format!( - r#" - - - - - Nostr Relay Dashboard - - - - -
-

🚀Nostr Relay ServerVer.{}

-
-

WebSocket URL
wss://nostr-relay.moe.gift

-

Status
✅ Running

-
-

使用任意兼容 Nostr 协议的客户端连接到上面的 WebSocket 地址,即可发布和接收事件。

-
- - - - -"#, - env!("CARGO_PKG_VERSION") - ); - - let response = format!( - "HTTP/1.1 200 OK\r\n\ -Content-Type: text/html; charset=utf-8\r\n\ -Content-Length: {}\r\n\ -Access-Control-Allow-Origin: *\r\n\ -\r\n\ -{}", - html.len(), - html - ); - - stream.write_all(response.as_bytes()).await?; - stream.flush().await?; - - Ok(()) -} - -async fn handle_ws_connection( - stream: TcpStream, - event_tx: broadcast::Sender, - mut event_rx: broadcast::Receiver, - pool: Arc, - clients: Arc>>>>, - trust_accounts: Arc>>, -) { - let ws_stream = match accept_async(stream).await { - Ok(stream) => stream, - Err(e) => { - error!("WebSocket handshake failed: {}", e); - return; - } - }; - let (mut ws_sender, mut ws_reciver) = ws_stream.split(); - let (client_tx, mut client_rx) = mpsc::channel::(CLIENT_CHANNEL_SIZE); - - let client_id = Uuid::new_v4(); - let client_conn = Arc::new(RwLock::new(ClientConnection { - id: client_id.clone(), - connected_at: SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(), - subscriptions: HashMap::new(), - sender: client_tx.clone(), - authenticated: false, - pubkey: None, - })); - // 注册客户端 - { - let mut clients_map = clients.write().await; - clients_map.insert(client_id, client_conn.clone()); - } - - info!("Client {} connected", client_id); - let auth_msg = format!("[\"AUTH\", \"{}\"]", client_id); - if let Err(e) = ws_sender.send(Message::Text(auth_msg.into())).await { - error!("Failed to send AUTH challenge: {}", e); - } - debug!("Sent AUTH challenge to client {}", client_id); - let client_conn_for_send = client_conn.clone(); - let send_task = tokio::spawn(async move { - loop { - tokio::select! { - //Subscribed events - Ok(event_msg) = event_rx.recv() => { - let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); - let conn = client_conn_for_send.read().await; - for (sub_id, subscription) in &conn.subscriptions { - if subscription.filters.iter().any(|filter| filter.matches(&event)) { - let msg = format!( - "[\"EVENT\", \"{}\", {}]", - sub_id, - serde_json::to_string(&event).unwrap() - ); - if ws_sender.send(Message::Text(msg.into())).await.is_err() { - break; - } - } - } - } - //Send message to client - Some(msg) = client_rx.recv() => { - debug!("Sending message to client: {}", msg); - if ws_sender - .send(Message::Text(msg.into())) - .await - .is_err() - { - break; - } - } - else => { - break; - } - } - } - }); - while let Some(msg) = ws_reciver.next().await { - match msg { - Ok(Message::Text(text)) => { - debug!("Received text: {}", text); - let pool = pool.clone(); - let client_conn_for_msg = client_conn.clone(); - let trust_accounts = trust_accounts.clone(); - match handle_message( - &text, - &pool, - &client_tx, - &client_conn_for_msg, - &event_tx, - trust_accounts, - ) - .await - { - Ok(_) => { - debug!("Message handled successfully"); - } - Err(e) => { - error!("Error handling message: {}", e); - client_tx.send(format!("Error: {}", e)).await.unwrap(); - } - } - } - Ok(Message::Close(_)) => { - info!("Client {} disconnected", client_id); - { - let mut clients_map = clients.write().await; - clients_map.remove(&client_id); - } - send_task.abort(); - break; - } - Err(_e) => { - { - let mut clients_map = clients.write().await; - clients_map.remove(&client_id); - } - send_task.abort(); - break; - } - _ => {} - } - } -} - -async fn handle_message( - text: &str, - pool: &SqlitePool, - to_client_msg_tx: &mpsc::Sender, - client_conn: &Arc>, - event_tx: &broadcast::Sender, - trust_accounts: Arc>>, -) -> Result<(), anyhow::Error> { - let v = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?; - - let arr = match v { - Value::Array(a) => a, - _ => return Err(anyhow!("Expected JSON array, got: {}", v)), - }; - - if arr.is_empty() { - return Err(anyhow!("Empty array message")); - } - - match arr[0] - .as_str() - .ok_or_else(|| anyhow!("First element must be a string"))? - { - "REQ" => { - if arr.len() < 3 { - RelayMessage::send_notice( - "Not enough array elements".to_string(), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - let sub_id = arr - .get(1) - .and_then(Value::as_str) - .ok_or_else(|| anyhow!("REQ missing sub_id"))? - .to_string(); - - // Check if the client has exceeded the maximum number of subscriptions - { - let conn = client_conn.read().await; - if conn.subscriptions.len() >= MAX_SUBSCRIPTIONS { - RelayMessage::send_closed( - &sub_id, - format!("Maximum subscriptions ({}) exceeded", MAX_SUBSCRIPTIONS), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - } - let mut filters = Vec::new(); - for i in 2..arr.len() { - if filters.len() >= MAX_FILTERS_PER_REQ { - RelayMessage::send_closed( - &sub_id, - format!("Maximum filters ({}) exceeded", MAX_FILTERS_PER_REQ), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - let filter: Filter = serde_json::from_value(arr[i].clone()) - .map_err(|e| anyhow!("Filter parse error: {}", e))?; - filters.push(filter); - } - - debug!("REQ subscription: {}, filters: {:?}", sub_id, filters); - { - let mut conn = client_conn.write().await; - conn.subscriptions.insert( - sub_id.clone(), - Subscription { - filters: filters.clone(), - }, - ); - } - - // 查询历史事件 - for filter in &filters { - let events = filter.select(pool).await?; - for event in events { - RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await; - } - } - - // 发送 EOSE - RelayMessage::send_eose(&sub_id, to_client_msg_tx).await; - Ok(()) - } - - "EVENT" => { - let event: NostrEvent = serde_json::from_value(arr[1].clone()) - .map_err(|e| anyhow!("Event parse error: {}", e))?; - debug!("EVENT received: {:?}", event); - if event.verify() { - let pool = pool.clone(); - let to_client_msg_tx = to_client_msg_tx.clone(); - let event_tx = event_tx.clone(); - let event = event.clone(); - let conn = client_conn.read().await; - if SERVER_INFO.auth_required { - let trust_accounts = trust_accounts.read().await; - if !trust_accounts.contains(&event.pubkey) { - RelayMessage::send_closed( - &conn.id.to_string(), - "restricted: you are not in the trust list".to_string(), - &to_client_msg_tx, - ) - .await; - debug!( - "Sending message to Client {}:restricted: client not in the trust list", - conn.id - ); - return Ok(()); - } - if !conn.authenticated { - RelayMessage::send_closed( - &conn.id.to_string(), - "auth-required:Authentication required".to_string(), - &to_client_msg_tx, - ) - .await; - debug!("Sending message to Client {}: not authenticated", conn.id); - return Ok(()); - } - } - tokio::spawn(async move { - match event.save(&pool).await { - Ok(_) => { - RelayMessage::send_ok( - &event, - true, - "Saved".to_string(), - &to_client_msg_tx, - ) - .await; - // 广播事件给所有客户端 - if let Err(e) = event_tx.send(serde_json::to_string(&event).unwrap()) { - error!("Failed to broadcast event: {}", e); - } - //Update trust list - if event.kind == 3 && event.pubkey == SERVER_INFO.pubkey { - let mut ts = trust_accounts.write().await; - let new_trust_accounts = extract_p_tags_from_vec(&event.tags); - let mut new_trust_accounts: HashSet = - new_trust_accounts.into_iter().collect(); - new_trust_accounts.insert(SERVER_INFO.pubkey.to_string()); - *ts = new_trust_accounts; - debug!("Trust list updated: {}", ts.len()); - } - } - Err(e) => { - RelayMessage::send_ok(&event, false, e.to_string(), &to_client_msg_tx) - .await; - error!("Failed to save event: {}", e); - } - } - }); - } else { - RelayMessage::send_ok( - &event, - false, - "Failed to save event".to_string(), - to_client_msg_tx, - ) - .await; - } - Ok(()) - } - - "CLOSE" => { - let sub_id = arr - .get(1) - .and_then(Value::as_str) - .ok_or_else(|| anyhow!("CLOSE missing sub_id"))? - .to_string(); - - { - let mut conn = client_conn.write().await; - conn.subscriptions.remove(&sub_id); - } - - debug!("CLOSE subscription: {}", sub_id); - - Ok(()) - } - - "AUTH" => { - if arr.len() < 2 { - RelayMessage::send_notice( - "AUTH message requires an event".to_string(), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - - let auth_event: NostrEvent = serde_json::from_value(arr[1].clone()) - .map_err(|e| anyhow!("AUTH event parse error: {}", e))?; - - auth_event - .handle_auth_event(client_conn, to_client_msg_tx) - .await?; - Ok(()) - } - other => { - return Err(anyhow!("Unknown command: {}", other)); - } - } -} - -async fn init_database(pool: &SqlitePool) -> Result<()> { - let create_events_table = r#" - CREATE TABLE IF NOT EXISTS events ( - id TEXT PRIMARY KEY, - pubkey TEXT NOT NULL, - created_at INTEGER NOT NULL, - kind INTEGER NOT NULL, - tags TEXT NOT NULL, - content TEXT NOT NULL, - sig TEXT NOT NULL, - indexed_at INTEGER DEFAULT (unixepoch()) - ); - "#; - - let create_indexes = vec![ - "CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);", - "CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);", - "CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);", - "CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);", - ]; - - query(create_events_table).execute(pool).await?; - - for index_sql in create_indexes { - query(index_sql).execute(pool).await?; - } - - info!("Database initialized successfully"); - Ok(()) -} - -impl NostrEvent { - fn serialize(&self) -> String { - let serialized = json!([ - 0, - self.pubkey, - self.created_at, - self.kind, - self.tags, - self.content - ]) - .to_string(); - serialized - } - - fn verify(&self) -> bool { - debug!("Verifying event: {}", self.id); - - // 1. 验证ID - let mut hasher = Sha256::new(); - hasher.update(self.serialize().as_bytes()); - let result = hasher.finalize(); - if hex::encode(result) != self.id { - debug!("ID verification failed"); - return false; - } - - // 2. 验证时间戳 - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); - if self.created_at > now + 900 { - debug!("Timestamp verification failed"); - return false; - } - - // 3. 验证标签数量 - if self.tags.len() > MAX_EVENT_TAGS as usize { - debug!("Too many tags"); - return false; - } - - // 4. 解析公钥 - let pubkey_bytes: Vec = match hex::decode(&self.pubkey) { - Ok(bytes) => bytes, - Err(e) => { - debug!("Failed to decode pubkey: {}", e); - return false; - } - }; - let x_only_public_key: XOnlyPublicKey = match pubkey_bytes.try_into().map(|bytes: [u8; 32]| XOnlyPublicKey::from_byte_array(bytes)) { - Ok(Ok(key)) => key, - _ => { - debug!("Failed to parse pubkey"); - return false; - } - }; - - // 5. 解析签名 - let sig_bytes: Vec = match hex::decode(&self.sig) { - Ok(bytes) => bytes, - Err(_) => { - debug!("Failed to decode signature hex"); - return false; - }, - }; - let signature: SchnorrSignature = match SchnorrSignature::from_slice(&sig_bytes) { - Ok(sig) => sig, - Err(_) => { - debug!("Failed to parse Schnorr signature"); - return false; - }, - }; - - let event_hash: [u8; 32] = match hex::decode(&self.id) { - Ok(bytes) => { - let mut hash = [0u8; 32]; // Note: SHA256 output is 32 bytes - if bytes.len() != 32 { - debug!("Event ID is not 32 bytes after decoding"); - return false; - } - hash.copy_from_slice(&bytes); - hash - } - Err(_) => { - debug!("Failed to decode event ID hex"); - return false; - }, - }; - - // 6. 创建消息对象 - let message: SecpMessage = secp256k1::Message::from_digest(event_hash); - - // 7. 验证签名 - let secp = Secp256k1::verification_only(); - match secp.verify_schnorr(&signature, message.as_ref(), &x_only_public_key) { - Ok(_) => { - debug!("Event verified: {}", self.id); - true - }, - Err(_) => { - debug!("Event verification failed: {}", self.id); - false - }, - } - } - - async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> { - let tags_json = serde_json::to_string(&self.tags).unwrap(); - match self.kind { - 3 => { - let sql = "DELETE FROM events WHERE id = ? AND kind = 3"; - sqlx::query(sql).bind(&self.id).execute(pool).await?; - } - - 5 => { - if !self.tags.is_empty() { - for tag in &self.tags { - if tag.get(0).map(|s| s == "e").unwrap_or(false) && tag.len() > 1 { - let sql = "DELETE FROM events WHERE id = ?"; - sqlx::query(sql).bind(&tag[1]).execute(pool).await?; - } - } - } - } - 10002 => { - if !self.tags.is_empty() { - sqlx::query("DELETE FROM events WHERE id = ? AND kind = 10002") - .bind(&self.id) - .execute(pool) - .await?; - } - } - _ => {} - } - let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);"; - sqlx::query(sql) - .bind(&self.id) - .bind(&self.pubkey) - .bind(self.created_at as i64) - .bind(self.kind) - .bind(tags_json) - .bind(&self.content) - .bind(&self.sig) - .execute(pool) - .await?; - Ok(()) - } - async fn handle_auth_event( - &self, - client_conn: &Arc>, - to_client_msg_tx: &mpsc::Sender, - ) -> Result<(), anyhow::Error> { - // 验证认证事件 - if self.kind != 22242 { - RelayMessage::send_ok( - &self, - false, - "AUTH event must be kind 22242".to_string(), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - - // 验证事件签名 - if !self.verify() { - RelayMessage::send_notice("Invalid AUTH event signature".to_string(), to_client_msg_tx) - .await; - return Ok(()); - } - - // 检查挑战 - let mut relay_url = None; - let mut challenge = None; - - for tag in &self.tags { - if tag.len() >= 2 { - match tag[0].as_str() { - "relay" => relay_url = Some(&tag[1]), - "challenge" => challenge = Some(&tag[1]), - _ => {} - } - } - } - - let challenge = match challenge { - Some(c) => c, - None => { - RelayMessage::send_notice( - "AUTH event missing challenge tag".to_string(), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - }; - - // 验证挑战是否匹配 - - let is_valid_challenge = { - let conn = client_conn.read().await; - if conn.id.to_string() == *challenge { - match SystemTime::now() - .duration_since(UNIX_EPOCH + Duration::from_secs(conn.connected_at)) - { - Ok(elapsed) => elapsed <= Duration::from_secs(15 * 60), - Err(_) => false, - } - } else { - false - } - }; - - if !is_valid_challenge { - RelayMessage::send_notice("Invalid or expired challenge".to_string(), to_client_msg_tx) - .await; - return Ok(()); - } - - // 验证时间戳 - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); - if self.created_at < now - 600 || self.created_at > now + 60 { - RelayMessage::send_notice( - "AUTH event timestamp out of acceptable range".to_string(), - to_client_msg_tx, - ) - .await; - return Ok(()); - } - - // 认证成功,更新客户端状态 - { - let mut conn = client_conn.write().await; - conn.authenticated = true; - conn.pubkey = Some(self.pubkey.clone()); - } - - info!("Client authenticated with pubkey: {}", self.pubkey); - RelayMessage::send_notice("Authentication successful".to_string(), to_client_msg_tx).await; - - Ok(()) - } -} - -impl Filter { - async fn select(&self, pool: &SqlitePool) -> Result, sqlx::Error> { - let mut sql = QueryBuilder::new( - "SELECT id, pubkey, created_at, kind, tags, content, sig FROM events WHERE kind != 22242", - ); - - // ID过滤 - if let Some(ids) = &self.ids { - if !ids.is_empty() { - sql.push(" AND id IN ("); - let mut separated = sql.separated(","); - for id in ids { - separated.push_bind(id); - } - separated.push_unseparated(")"); - } - } - - // 作者过滤 - if let Some(authors) = &self.authors { - if !authors.is_empty() { - sql.push(" AND pubkey IN ("); - let mut separated = sql.separated(","); - for author in authors { - separated.push_bind(author); - } - separated.push_unseparated(")"); - } - } - - // 类型过滤 - if let Some(kinds) = &self.kinds { - if !kinds.is_empty() { - sql.push(" AND kind IN ("); - let mut separated = sql.separated(","); - for kind in kinds { - separated.push_bind(kind); - } - separated.push_unseparated(")"); - } - } - - // 时间过滤 - if let Some(since) = self.since { - sql.push(" AND created_at >= ").push_bind(since as i64); - } - - if let Some(until) = self.until { - sql.push(" AND created_at <= ").push_bind(until as i64); - } - - for (tag_name, tag_values) in &self.tag_filters { - if tag_name.starts_with('#') && !tag_values.is_empty() { - // 使用 JSON 查询来匹配标签 - sql.push(" AND EXISTS ("); - sql.push("SELECT 1 FROM json_each(tags) AS tag_array "); - sql.push("WHERE json_extract(tag_array.value, '$[0]') = "); - sql.push_bind(&tag_name[1..]); // 去掉 # 前缀 - sql.push(" AND json_extract(tag_array.value, '$[1]') IN ("); - let mut separated = sql.separated(","); - for value in tag_values { - separated.push_bind(value); - } - separated.push_unseparated("))"); - } - } - - sql.push(" ORDER BY created_at DESC"); - - let limit = self.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT); - sql.push(" LIMIT ").push_bind(limit as i64); - - let query = sql.build(); - debug!("SQL: {}", query.sql()); - - let rows = query.fetch_all(pool).await?; - debug!("Selected {} events", rows.len()); - - let events: Vec = rows - .into_iter() - .map(|row| NostrEvent { - id: row.get(0), - pubkey: row.get(1), - created_at: row.get::(2) as u64, - kind: row.get(3), - tags: serde_json::from_str::>>(&row.get::(4)) - .unwrap_or_default(), - content: row.get(5), - sig: row.get(6), - }) - .collect(); - - Ok(events) - } - - fn matches(&self, event: &NostrEvent) -> bool { - // ID 匹配 - if let Some(ids) = &self.ids { - if !ids.is_empty() && !ids.contains(&event.id) { - return false; - } - } - - // 作者匹配 - if let Some(authors) = &self.authors { - if !authors.is_empty() && !authors.contains(&event.pubkey) { - return false; - } - } - - // 类型匹配 - if let Some(kinds) = &self.kinds { - if !kinds.is_empty() && !kinds.contains(&event.kind) { - return false; - } - } - - // 时间匹配 - if let Some(since) = self.since { - if event.created_at < since { - return false; - } - } - - if let Some(until) = self.until { - if event.created_at > until { - return false; - } - } - - // 标签匹配 - for (tag_name, tag_values) in &self.tag_filters { - if tag_name.starts_with('#') && !tag_values.is_empty() { - let tag_letter = &tag_name[1..]; - let mut found = false; - - for tag in &event.tags { - if tag.get(0).map(|s| s == tag_letter).unwrap_or(false) && tag.len() > 1 { - if tag_values.contains(&tag[1]) { - found = true; - break; - } - } - } - - if !found { - return false; - } - } - } - - return true; - } -} - -impl RelayMessage { - async fn send_ok( - event: &NostrEvent, - accept: bool, - message: String, - to_client_msg_tx: &mpsc::Sender, - ) { - let msg: String = "[\"OK\", \"".to_string() - + &event.id - + "\", " - + &accept.to_string() - + ", \"" - + &message - + "\"]"; - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } - } - - async fn send_event(event: &NostrEvent, sub_id: &str, to_client_msg_tx: &mpsc::Sender) { - let event = serde_json::to_string(event).unwrap(); - let msg = "[\"EVENT\", \"".to_string() + sub_id + "\", " + &event + "]"; - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } - } - - async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender) { - let msg = "[\"EOSE\", \"".to_string() + sub_id + "\"]"; - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } - } - - async fn send_closed(sub_id: &str, message: String, to_client_msg_tx: &mpsc::Sender) { - let msg = format!("[\"CLOSED\", \"{}\", \"{}\"]", sub_id, message); - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } - } - - async fn send_notice(message: String, to_client_msg_tx: &mpsc::Sender) { - let msg = "[\"NOTICE\", \"".to_string() + &message + "\"]"; - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } - } -} - -async fn get_trust_accounts(pool: &SqlitePool) -> Vec { - let pubkey = SERVER_INFO.pubkey; - let sql = - "SELECT tags FROM events WHERE kind = 3 AND pubkey = ? ORDER BY created_at DESC LIMIT 1"; - - let row = match sqlx::query(sql).bind(pubkey).fetch_optional(pool).await { - Ok(row) => row, - Err(e) => { - error!("Failed to execute query: {}", e); - return Vec::new(); - } - }; - match row { - Some(row) => { - let tags_json: String = row.get(0); - extract_p_tags_from_json(&tags_json).unwrap_or_else(|_| Vec::new()) - } - None => Vec::new(), - } -} - -fn extract_p_tags_from_json(tags_json: &str) -> Result, serde_json::Error> { - let parsed: Value = serde_json::from_str(tags_json)?; - - let mut trust_accounts: Vec = parsed - .as_array() - .unwrap_or(&vec![]) - .iter() - .filter_map(|item| { - item.as_array().and_then(|arr| { - // 检查是否是 "p" 标签且至少有两个元素 - if arr.len() >= 2 && arr[0].as_str() == Some("p") { - arr[1].as_str().map(|s| s.to_string()) - } else { - None - } - }) - }) - .collect(); - debug!("Trust accounts: {}", trust_accounts.len()); - trust_accounts.push(SERVER_INFO.pubkey.clone().to_string()); - Ok(trust_accounts) -} - -fn extract_p_tags_from_vec(tags: &[Vec]) -> Vec { - tags.iter() - .filter_map(|tag| { - if tag.len() >= 2 && tag[0] == "p" { - Some(tag[1].clone()) - } else { - None - } - }) - .collect() -} diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..0f8b66f --- /dev/null +++ b/src/models.rs @@ -0,0 +1,49 @@ +use crate::nostr::Filter; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tokio::sync::mpsc; +use uuid::Uuid; + +// 服务器信息结构体(用于 NIP-11) +#[derive(Serialize)] +pub struct ServerInfo { + pub contact: &'static str, + pub description: &'static str, + pub limitation: Limitation, + pub name: &'static str, + pub pubkey: &'static str, + pub software: &'static str, + pub supported_nips: Vec, + pub version: &'static str, + pub auth_required: bool, +} + +// 限制(NIP-11) +#[derive(Serialize)] +pub struct Limitation { + pub max_event_tags: u32, + pub max_event_time_newer_than_now: u32, + pub max_event_time_older_than_now: u32, + pub max_filters: u32, + pub max_limit: u32, + pub max_message_length: u32, + pub max_subid_length: u32, + pub max_subscriptions: u32, + pub min_prefix: u32, +} + +// 客户端订阅信息 +#[derive(Debug, Clone)] +pub struct Subscription { + pub filters: Vec, +} + +// 客户端连接状态 +pub struct ClientConnection { + pub id: Uuid, + pub connected_at: u64, + pub subscriptions: HashMap, + pub sender: mpsc::Sender, // 用于向该客户端发送消息的通道 + pub authenticated: bool, + pub pubkey: Option, // 认证后存储客户端的公钥 +} diff --git a/src/nostr/event.rs b/src/nostr/event.rs new file mode 100644 index 0000000..6b90a2d --- /dev/null +++ b/src/nostr/event.rs @@ -0,0 +1,360 @@ +use anyhow::Result; +use log::{debug, error, info}; +use secp256k1::{ + Message as SecpMessage, Secp256k1, XOnlyPublicKey, schnorr::Signature as SchnorrSignature, +}; +use serde_json::{Value, json}; +use sha2::{Digest, Sha256}; +use sqlx::SqlitePool; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::{RwLock, mpsc}; + +use crate::constants::SERVER_INFO; +use crate::models::ClientConnection; +use crate::nostr::NostrEvent; +use crate::nostr::messages::RelayMessage; + +pub trait NostrEventExt { + fn serialize_for_id(&self) -> String; // 用于计算事件 ID 的序列化 + fn verify(&self) -> bool; // 验证 ID、时间戳、标签数量和签名 + async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error>; // 保存事件到数据库 + async fn handle_auth_event( + &self, + client_conn: &Arc>, + to_client_msg_tx: &mpsc::Sender, + ) -> Result<(), anyhow::Error>; // 处理 NIP-42 认证事件 +} + +impl NostrEventExt for NostrEvent { + // 根据 Nostr 协议规则序列化事件,用于计算事件 ID + fn serialize_for_id(&self) -> String { + let serialized = json!([ + 0, // 用于 ID 计算的协议版本,Nostr 当前为 0 + self.pubkey, + self.created_at, + self.kind, + self.tags, + self.content + ]) + .to_string(); + serialized + } + + // 验证事件的有效性(ID、时间戳、标签数量、签名) + fn verify(&self) -> bool { + debug!("Verifying event: {}", self.id); + + // 1. 验证 ID:重新计算 ID 并与事件中提供的 ID 比较 + let mut hasher = Sha256::new(); + hasher.update(self.serialize_for_id().as_bytes()); + let computed_id = hex::encode(hasher.finalize()); + if computed_id != self.id { + debug!( + "Event ID verification failed: computed {} != provided {}", + computed_id, self.id + ); + return false; + } + + // 2. 验证时间戳:检查事件时间是否在可接受的范围内 (NIP-1) + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) // 如果 SystemTime 在 UNIX_EPOCH 之前,返回 Duration::ZERO + .as_secs(); + if self.created_at > now + SERVER_INFO.limitation.max_event_time_newer_than_now as u64 { + debug!( + "Timestamp too far in future: created_at={} now={}", + self.created_at, now + ); + return false; + } + if self.created_at + < now.saturating_sub(SERVER_INFO.limitation.max_event_time_older_than_now as u64) + { + debug!( + "Timestamp too far in past: created_at={} now={}", + self.created_at, now + ); + return false; + } + + // 3. 验证标签数量:检查是否超出服务器限制 + if self.tags.len() > SERVER_INFO.limitation.max_event_tags as usize { + debug!( + "Too many tags: {} > {}", + self.tags.len(), + SERVER_INFO.limitation.max_event_tags + ); + return false; + } + + // 4. 解析公钥:从十六进制字符串解析 XOnlyPublicKey + let pubkey_bytes: Vec = match hex::decode(&self.pubkey) { + Ok(bytes) => { + if bytes.len() != 32 { + // 公钥长度必须为 32 字节 + debug!("Pubkey hex is not 32 bytes after decoding"); + return false; + } + bytes + } + Err(e) => { + debug!("Failed to decode pubkey hex: {}", e); + return false; + } + }; + let x_only_public_key: XOnlyPublicKey = match pubkey_bytes + .try_into() // 将 Vec 尝试转换为 [u8; 32] + .map(|bytes: [u8; 32]| XOnlyPublicKey::from_byte_array(bytes)) + { + Ok(Ok(key)) => key, + _ => { + debug!("Failed to parse pubkey as XOnlyPublicKey"); + return false; + } + }; + + // 5. 解析签名:从十六进制字符串解析 SchnorrSignature + let sig_bytes: Vec = match hex::decode(&self.sig) { + Ok(bytes) => { + if bytes.len() != 64 { + // Schnorr 签名长度必须为 64 字节 + debug!("Signature hex is not 64 bytes after decoding"); + return false; + } + bytes + } + Err(e) => { + debug!("Failed to decode signature hex: {}", e); + return false; + } + }; + let signature: SchnorrSignature = match SchnorrSignature::from_slice(&sig_bytes) { + Ok(sig) => sig, + Err(e) => { + debug!("Failed to parse Schnorr signature: {}", e); + return false; + } + }; + + // 6. 准备消息哈希:事件 ID 本身就是消息的哈希 + let event_hash: [u8; 32] = match hex::decode(&self.id) { + Ok(bytes) => { + let mut hash = [0u8; 32]; + if bytes.len() != 32 { + debug!("Event ID is not 32 bytes after decoding"); + return false; + } + hash.copy_from_slice(&bytes); // 将解码后的字节复制到固定大小数组 + hash + } + Err(e) => { + debug!("Failed to decode event ID hex: {}", e); + return false; + } + }; + + // 7. 创建 SecpMessage 对象进行签名验证 + let message: SecpMessage = secp256k1::Message::from_digest(event_hash); + + // 8. 验证签名:使用 secp256k1-rs 库进行验证 + let secp = Secp256k1::verification_only(); + match secp.verify_schnorr(&signature, message.as_ref(), &x_only_public_key) { + Ok(_) => { + debug!("Event verified: {}", self.id); + true + } + Err(e) => { + debug!("Event verification failed for {}: {}", self.id, e); + false + } + } + } + + // 保存事件到数据库,并处理可替换事件和删除事件 + async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> { + let tags_json = serde_json::to_string(&self.tags).unwrap(); + + match self.kind { + // NIP-09 事件删除 (Event Deletion) + 5 => { + if !self.tags.is_empty() { + for tag in &self.tags { + if tag.get(0).map(|s| s == "e").unwrap_or(false) && tag.len() > 1 { + let event_id_to_delete = &tag[1]; // 获取要删除的事件 ID + debug!( + "Attempting to delete event with ID: {} by request of pubkey {}", + event_id_to_delete, self.pubkey + ); + let sql = "DELETE FROM events WHERE id = ? AND pubkey = ?"; + let result = sqlx::query(sql) + .bind(event_id_to_delete) + .bind(&self.pubkey) + .execute(pool) + .await?; + if result.rows_affected() > 0 { + info!( + "Deleted event {} by pubkey (kind 5): {}", + event_id_to_delete, self.pubkey + ); + } else { + debug!( + "Could not delete event {} for pubkey {}. It might not exist or unauthorized.", + event_id_to_delete, self.pubkey + ); + } + } + } + } + } + // NIP-01 可替换事件 (Replaceable Events): kind 0 (Metadata), kind 3 (Contact List) + // NIP-16 可替换事件 (Replaceable Events): kinds 10000 to 19999 + // NIP-33 参数化可替换事件 (Parameterized Replaceable Events): kinds 30000 to 39999 + // 对于所有这些可替换事件,新事件会替换掉旧事件。 + // 这里的简化处理是,对于所有可替换事件,都基于 (pubkey, kind) 来删除旧事件。 + // 对于 NIP-33 事件,严格来说还需要匹配 'd' tag。 + // 但考虑到数据库操作的复杂性以及原始代码的实现,这里仍采用 (pubkey, kind) 的方式。 + // 一个更健壮的 NIP-33 实现可能需要单独的字段来存储 'd' tag 或更复杂的 SQL。 + 0 | 3 | _ + if (self.kind >= 10000 && self.kind < 20000) + || (self.kind >= 30000 && self.kind < 40000) => + { + debug!( + "Attempting to delete previous replaceable event for pubkey: {}, kind: {}", + self.pubkey, self.kind + ); + let sql = "DELETE FROM events WHERE pubkey = ? AND kind = ?"; + sqlx::query(sql) + .bind(&self.pubkey) + .bind(self.kind) + .execute(pool) + .await?; + } + _ => { /* 非可替换事件不需要在插入前删除 */ } + } + + // 插入新事件 + let sql = "INSERT OR IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);"; + sqlx::query(sql) + .bind(&self.id) + .bind(&self.pubkey) + .bind(self.created_at as i64) + .bind(self.kind) + .bind(tags_json) + .bind(&self.content) + .bind(&self.sig) + .execute(pool) + .await?; + Ok(()) + } + + // 处理 NIP-42 认证事件 + async fn handle_auth_event( + &self, + client_conn: &Arc>, + to_client_msg_tx: &mpsc::Sender, + ) -> Result<(), anyhow::Error> { + // 1. 验证事件类型是否为 NIP-42 认证事件 (kind 22242) + if self.kind != 22242 { + RelayMessage::send_ok( + &self.id, + false, + "AUTH event must be kind 22242".to_string(), // 根据协议,OK 消息应包含事件 ID + to_client_msg_tx, + ) + .await; + return Ok(()); + } + + // 2. 验证事件签名是否有效 + if !self.verify() { + RelayMessage::send_notice("Invalid AUTH event signature".to_string(), to_client_msg_tx) + .await; + return Ok(()); + } + + // 3. 提取 "challenge" 标签 + let mut challenge = None; + let mut relay_url = None; // NIP-42 也包含 relay url,但在这个验证中不是强制的。 + + for tag in &self.tags { + if tag.len() >= 2 { + match tag[0].as_str() { + "relay" => relay_url = Some(&tag[1]), + "challenge" => challenge = Some(&tag[1]), + _ => {} + } + } + } + + let challenge = match challenge { + Some(c) => c, + None => { + RelayMessage::send_notice( + "AUTH event missing challenge tag".to_string(), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + }; + + // 4. 验证 challenge 是否匹配客户端连接 ID,并且未过期 + let is_valid_challenge = { + let conn = client_conn.read().await; + if conn.id.to_string() == *challenge { + // 检查 challenge 是否在 15 分钟内有效 (可配置) + let connected_at_duration = UNIX_EPOCH + Duration::from_secs(conn.connected_at); + match SystemTime::now().duration_since(connected_at_duration) { + Ok(elapsed) => elapsed <= Duration::from_secs(15 * 60), + Err(_) => false, // 如果当前时间在连接时间之前,则视为无效 + } + } else { + false + } + }; + + if !is_valid_challenge { + RelayMessage::send_notice("Invalid or expired challenge".to_string(), to_client_msg_tx) + .await; + return Ok(()); + } + + // 5. 验证时间戳 (NIP-42 规定 `created_at` 必须在当前时间前后 60 秒内) + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs(); + if self.created_at < now.saturating_sub(60) || self.created_at > now + 60 { + RelayMessage::send_notice( + "AUTH event timestamp out of acceptable range (must be within 60s of now)" + .to_string(), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + + // 认证成功,更新客户端连接状态 + { + let mut conn = client_conn.write().await; + conn.authenticated = true; + conn.pubkey = Some(self.pubkey.clone()); + } + + info!("Client authenticated with pubkey: {}", self.pubkey); + // 根据 NIP-42,认证成功也应该回复 OK 消息 + RelayMessage::send_ok( + &self.id, + true, + "Authentication successful.".to_string(), + to_client_msg_tx, + ) + .await; + + Ok(()) + } +} diff --git a/src/nostr/filter.rs b/src/nostr/filter.rs new file mode 100644 index 0000000..471bace --- /dev/null +++ b/src/nostr/filter.rs @@ -0,0 +1,227 @@ +use anyhow::Result; +use log::debug; +use sqlx::sqlite::SqlitePool; +use sqlx::{Execute, QueryBuilder, Row}; +use std::collections::HashMap; + +use crate::constants::{DEFAULT_LIMIT, MAX_LIMIT}; +use crate::nostr::Filter; +use crate::nostr::NostrEvent; + +// 定义 Filter 的扩展特性 +pub trait FilterExt { + async fn select(&self, pool: &SqlitePool) -> Result, sqlx::Error>; + fn matches(&self, event: &NostrEvent) -> bool; // 检查事件是否匹配过滤器 +} + +impl FilterExt for Filter { + // 根据过滤器从 SQLite 数据库中查询事件 + async fn select(&self, pool: &SqlitePool) -> Result, sqlx::Error> { + let mut sql = QueryBuilder::new( + "SELECT id, pubkey, created_at, kind, tags, content, sig FROM events WHERE 1=1 AND kind != 22242", // 排除 AUTH 事件 + ); + + // ID 过滤 (NIP-01 支持前缀匹配) + if let Some(ids) = &self.ids { + if !ids.is_empty() { + sql.push(" AND ("); + let mut separated = sql.separated(" OR "); + for id_prefix in ids { + if id_prefix.len() < 64 { + // 如果是 ID 前缀,使用 LIKE + separated + .push("id LIKE ") + .push_bind(format!("{}%", id_prefix)); + } else { + // 否则进行精确匹配 + separated.push("id = ").push_bind(id_prefix); + } + } + separated.push_unseparated(")"); + } + } + + // 作者过滤 (NIP-01 支持前缀匹配) + if let Some(authors) = &self.authors { + if !authors.is_empty() { + sql.push(" AND ("); + let mut separated = sql.separated(" OR "); + for author_prefix in authors { + if author_prefix.len() < 64 { + // 如果是公钥前缀,使用 LIKE + separated + .push("pubkey LIKE ") + .push_bind(format!("{}%", author_prefix)); + } else { + // 否则进行精确匹配 + separated.push("pubkey = ").push_bind(author_prefix); + } + } + separated.push_unseparated(")"); + } + } + + // 类型过滤 + if let Some(kinds) = &self.kinds { + if !kinds.is_empty() { + sql.push(" AND kind IN ("); + let mut separated = sql.separated(","); + for kind in kinds { + separated.push_bind(kind); + } + separated.push_unseparated(")"); + } + } + + // 时间过滤 (since) + if let Some(since) = self.since { + sql.push(" AND created_at >= ").push_bind(since as i64); + } + + // 时间过滤 (until) + if let Some(until) = self.until { + sql.push(" AND created_at <= ").push_bind(until as i64); + } + + // 标签过滤 (NIP-12 Generic Tag Queries) + for (tag_name, tag_values) in &self.tag_filters { + if tag_name.starts_with('#') && !tag_values.is_empty() { + let tag_letter = &tag_name[1..]; // 提取标签字母 (e.g., 'e', 'p', 'd') + + // 使用 JSON 函数查询 tags 数组 + // SELECT 1 FROM json_each(tags) AS tag_arr WHERE tag_arr.value->>0 = 'p' AND tag_arr.value->>1 IN ('...', '...') + sql.push(" AND EXISTS (SELECT 1 FROM json_each(tags) AS tag_arr WHERE json_array_length(tag_arr.value) > 1 AND json_extract(tag_arr.value, '$[0]') = "); + sql.push_bind(tag_letter); + sql.push(" AND json_extract(tag_arr.value, '$[1]') IN ("); + let mut separated = sql.separated(","); + for value in tag_values { + separated.push_bind(value); + } + separated.push_unseparated("))"); + } + } + + sql.push(" ORDER BY created_at DESC"); // 按创建时间降序排序 + + let limit = self.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT); // 应用限制,不超过最大限制 + sql.push(" LIMIT ").push_bind(limit as i64); + + let query = sql.build(); + debug!("SQL: {}", query.sql()); + + let rows = query.fetch_all(pool).await?; // 执行查询 + debug!("Selected {} events", rows.len()); + + // 将查询结果映射为 NostrEvent 结构体 + let events: Vec = rows + .into_iter() + .filter_map(|row| { + // 尝试解析 tags 字段,如果失败则跳过该行 + let tags_str: String = row.get(4); + let tags_parsed = serde_json::from_str::>>(&tags_str); + match tags_parsed { + Ok(tags_vec) => Some(NostrEvent { + id: row.get(0), + pubkey: row.get(1), + created_at: row.get::(2) as u64, + kind: row.get(3), + tags: tags_vec, + content: row.get(5), + sig: row.get(6), + }), + Err(e) => { + log::error!( + "Failed to parse tags for event ID {}: {}", + row.get::(0), + e + ); + None // 错误的标签数据导致事件无法完整构建,跳过 + } + } + }) + .collect(); + + Ok(events) + } + + // 在内存中检查一个事件是否匹配当前过滤器 + fn matches(&self, event: &NostrEvent) -> bool { + // ID 匹配 (支持 NIP-01 前缀匹配) + if let Some(ids) = &self.ids { + if !ids.is_empty() { + let matched_id = ids.iter().any(|id_prefix| { + if id_prefix.len() < 64 { + // 如果是前缀 + event.id.starts_with(id_prefix) + } else { + // 精确匹配 + event.id == *id_prefix + } + }); + if !matched_id { + return false; + } + } + } + + // 作者匹配 (支持 NIP-01 前缀匹配) + if let Some(authors) = &self.authors { + if !authors.is_empty() { + let matched_author = authors.iter().any(|author_prefix| { + if author_prefix.len() < 64 { + event.pubkey.starts_with(author_prefix) + } else { + event.pubkey == *author_prefix + } + }); + if !matched_author { + return false; + } + } + } + + // 类型匹配 + if let Some(kinds) = &self.kinds { + if !kinds.is_empty() && !kinds.contains(&event.kind) { + return false; + } + } + + // 时间匹配 (since) + if let Some(since) = self.since { + if event.created_at < since { + return false; + } + } + + // 时间匹配 (until) + if let Some(until) = self.until { + if event.created_at > until { + return false; + } + } + + // 标签匹配 (NIP-12 Generic Tag Queries) + for (tag_name, tag_values) in &self.tag_filters { + if tag_name.starts_with('#') && !tag_values.is_empty() { + let tag_letter = &tag_name[1..]; + let mut found_match_for_this_tag_filter = false; + + for event_tag in &event.tags { + if event_tag.len() >= 2 && event_tag[0] == tag_letter { + if tag_values.contains(&event_tag[1]) { + found_match_for_this_tag_filter = true; + break; + } + } + } + + if !found_match_for_this_tag_filter { + return false; // 如果有任何一个 #tag 过滤器不匹配,则事件不匹配 + } + } + } + + true // 所有过滤器都匹配 + } +} diff --git a/src/nostr/messages.rs b/src/nostr/messages.rs new file mode 100644 index 0000000..c7deb65 --- /dev/null +++ b/src/nostr/messages.rs @@ -0,0 +1,72 @@ +use crate::nostr::NostrEvent; +use log::error; +use serde_json; +use tokio::sync::mpsc; +pub struct RelayMessage; + +impl RelayMessage { + // 发送 OK 消息 (NIP-20) + pub async fn send_ok( + event_id: &str, // 接受事件 ID 字符串切片 + accept: bool, + message: String, + to_client_msg_tx: &mpsc::Sender, + ) { + let msg = format!("[\"OK\", \"{}\", {}, \"{}\"]", event_id, accept, message); + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send OK message (event_id: {}): {}", event_id, e); + } + } + + // 发送 EVENT 消息 (NIP-01) + pub async fn send_event( + event: &NostrEvent, + sub_id: &str, + to_client_msg_tx: &mpsc::Sender, + ) { + // 尝试将 NostrEvent 序列化为 JSON 字符串 + let event_json = match serde_json::to_string(event) { + Ok(json) => json, + Err(e) => { + error!("Failed to serialize event {} for sending: {}", event.id, e); + return; // 序列化失败则不发送 + } + }; + let msg = format!("[\"EVENT\", \"{}\", {}]", sub_id, event_json); + if let Err(e) = to_client_msg_tx.send(msg).await { + // 此错误通常表示客户端的接收端已关闭(客户端已断开) + error!("Failed to send EVENT message (sub_id: {}): {}", sub_id, e); + } + } + + // 发送 EOSE 消息 (NIP-15) + pub async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender) { + let msg = format!("[\"EOSE\", \"{}\"]", sub_id); + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send EOSE message (sub_id: {}): {}", sub_id, e); + } + } + + // 发送 CLOSED 消息 (NIP-20) + pub async fn send_closed( + sub_id: &str, + message: String, + to_client_msg_tx: &mpsc::Sender, + ) { + let msg = format!("[\"CLOSED\", \"{}\", \"{}\"]", sub_id, message); + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send CLOSED message (sub_id: {}): {}", sub_id, e); + } + } + + // 发送 NOTICE 消息 (NIP-01) + pub async fn send_notice(message: String, to_client_msg_tx: &mpsc::Sender) { + let msg = format!("[\"NOTICE\", \"{}\"]", message); + if let Err(e) = to_client_msg_tx.send(msg).await { + error!( + "Failed to send NOTICE message (message: {}): {}", + message, e + ); + } + } +} diff --git a/src/nostr/mod.rs b/src/nostr/mod.rs new file mode 100644 index 0000000..98f58c6 --- /dev/null +++ b/src/nostr/mod.rs @@ -0,0 +1,69 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +pub mod event; +pub mod filter; +pub mod messages; +pub mod utils; + +// Nostr 事件结构体 +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct NostrEvent { + pub id: String, // 32 字节 ID 的十六进制字符串 + pub pubkey: String, // 32 字节公钥的十六进制字符串 + pub created_at: u64, // Unix 时间戳(秒) + pub kind: u32, // 事件类型(整数 0~65535) + pub tags: Vec>, // 二维字符串数组,表示事件标签 + pub content: String, // 事件内容 + pub sig: String, // 64 字节签名(Schnorr 签名)的十六进制字符串 +} + +// 从 Relay 发送给 Client 的消息类型(用于反序列化) +#[derive(Debug, Deserialize)] +#[serde(untagged)] // 用于处理 JSON 数组的第一项作为标签的情况 +pub enum RelayMessage { + Event { + sub_id: String, + event: NostrEvent, + }, // ["EVENT", , ] + Ok { + event_id: String, + accept: bool, + message: String, + }, // ["OK", , bool, ] + Eose { + sub_id: String, + }, // ["EOSE", ] + Closed { + sub_id: String, + message: String, + }, // ["CLOSED", , ] + Notice { + message: String, + }, // ["NOTICE", ] +} + +// 从 Client 发送给 Relay 的消息类型(用于反序列化) +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub enum ClientMessage { + REQ { sub_id: String, filter: Vec }, // ["REQ", , ...] + Event { event: NostrEvent }, // ["EVENT", ] + CLOSE { sub_id: String }, // ["CLOSE", ] + AUTH { event: NostrEvent }, // ["AUTH", ] (NIP-42) +} + +// Nostr 订阅过滤器结构体 +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Filter { + pub ids: Option>, // Event ID 列表s (NIP-01 prefixes allowed) + pub authors: Option>, // Pubkey 列表 (NIP-01 prefixes allowed) + pub kinds: Option>, // Kind 类型列表 + #[serde(flatten)] // 将 HashMap 的键值对直接扁平化到父结构体中 + pub tag_filters: HashMap>, + // 用 HashMap 表示 # 标签过滤器,例如 #e, #p 等。 + // Key 是字符串包含 #,Value 是对应标签值的列表。 + pub since: Option, // Unix 时间戳,表示事件创建时间在此之后 + pub until: Option, // Unix 时间戳,表示事件创建时间在此之前 + pub limit: Option, // 最大返回事件数量 +} diff --git a/src/nostr/utils.rs b/src/nostr/utils.rs new file mode 100644 index 0000000..909c483 --- /dev/null +++ b/src/nostr/utils.rs @@ -0,0 +1,60 @@ +use crate::constants::SERVER_INFO; +use log::debug; +use serde_json::Value; + +// 从 JSON 字符串形式的 tags 中提取所有 'p' 标签的值(公钥) +pub fn extract_p_tags_from_json(tags_json: &str) -> Result, serde_json::Error> { + let parsed: Value = serde_json::from_str(tags_json)?; // 解析 JSON 字符串 + + let mut trust_accounts: Vec = parsed + .as_array() // 确保是 JSON 数组 + .unwrap_or(&vec![]) // 如果不是数组,则返回空 Vec + .iter() + .filter_map(|item| { + item.as_array().and_then(|arr| { + // 检查是否是 "p" 标签且至少有两个元素 (['p', pubkey, ...]) + if arr.len() >= 2 && arr[0].as_str() == Some("p") { + arr[1].as_str().map(|s| s.to_string()) // 提取第二个元素(公钥) + } else { + None + } + }) + }) + .collect(); + debug!( + "Extracted {} trust accounts from JSON.", + trust_accounts.len() + ); + // 确保服务器本身的公钥始终在信任列表中 + let server_pubkey = SERVER_INFO.pubkey.to_string(); + if !trust_accounts.contains(&server_pubkey) { + trust_accounts.push(server_pubkey); + } + Ok(trust_accounts) +} + +// 从 Vec> 形式的 tags 中提取所有 'p' 标签的值(公钥) +pub fn extract_p_tags_from_vec(tags: &[Vec]) -> Vec { + let mut trust_accounts: Vec = tags + .iter() + .filter_map(|tag| { + // 检查标签是否至少有两个元素且第一个是 "p" + if tag.len() >= 2 && tag[0] == "p" { + Some(tag[1].clone()) // 提取第二个元素(公钥) + } else { + None + } + }) + .collect(); + + debug!( + "Extracted {} trust accounts from Vec.", + trust_accounts.len() + ); + // 确保服务器本身的公钥始终在信任列表中 + let server_pubkey = SERVER_INFO.pubkey.to_string(); + if !trust_accounts.contains(&server_pubkey) { + trust_accounts.push(server_pubkey); + } + trust_accounts +} diff --git a/src/ws_logic.rs b/src/ws_logic.rs new file mode 100644 index 0000000..984be51 --- /dev/null +++ b/src/ws_logic.rs @@ -0,0 +1,399 @@ +use anyhow::{Result, anyhow}; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, error, info}; +use serde_json::Value; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::net::TcpStream; +use tokio::sync::{RwLock, broadcast, mpsc}; +use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message}; +use uuid::Uuid; + +use crate::constants::{CLIENT_CHANNEL_SIZE, MAX_FILTERS_PER_REQ, MAX_SUBSCRIPTIONS, SERVER_INFO}; +use crate::models::{ClientConnection, Subscription}; +use crate::nostr::event::NostrEventExt; +use crate::nostr::filter::FilterExt; +use crate::nostr::messages::RelayMessage; +use crate::nostr::utils::extract_p_tags_from_vec; +use crate::nostr::{ClientMessage, Filter, NostrEvent}; + +pub async fn handle_ws_connection( + stream: TcpStream, + event_tx: broadcast::Sender, + mut event_rx: broadcast::Receiver, // 客户端专属的事件接收器 + pool: Arc, + clients: Arc>>>>, + trust_accounts: Arc>>, +) { + let ws_stream = match accept_async(stream).await { + Ok(stream) => stream, + Err(e) => { + error!("WebSocket handshake failed: {}", e); + return; + } + }; + let (mut ws_sender, mut ws_reciver) = ws_stream.split(); + let (client_tx, mut client_rx) = mpsc::channel::(CLIENT_CHANNEL_SIZE); // 创建用于向此客户端发送消息的 MPSC 通道 + + let client_id = Uuid::new_v4(); + let client_conn = Arc::new(RwLock::new(ClientConnection { + id: client_id, + connected_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs(), + subscriptions: HashMap::new(), + sender: client_tx.clone(), + authenticated: false, + pubkey: None, + })); + // 注册客户端到全局的客户端列表中 + { + let mut clients_map = clients.write().await; + clients_map.insert(client_id, client_conn.clone()); + } + + info!("Client {} connected (WS)", client_id); + let auth_msg = format!("[\"AUTH\", \"{}\"]", client_id); // 发送 AUTH 挑战 (NIP-42) + if let Err(e) = ws_sender.send(Message::Text(auth_msg.into())).await { + error!( + "Failed to send AUTH challenge to client {}: {}", + client_id, e + ); + } + debug!("Sent AUTH challenge to client {}", client_id); + + let client_conn_for_send = client_conn.clone(); + // 启动一个独立的 Task 用于向客户端发送消息 + let send_task = tokio::spawn(async move { + loop { + tokio::select! { + // 处理订阅事件:从广播通道接收事件,检查是否匹配客户端的订阅过滤器 + Ok(event_msg) = event_rx.recv() => { + let event: NostrEvent = serde_json::from_str(&event_msg).unwrap(); // 假设广播的消息总是有效的 NostrEvent + let conn = client_conn_for_send.read().await; + for (sub_id, subscription) in &conn.subscriptions { + if subscription.filters.iter().any(|filter| filter.matches(&event)) { + RelayMessage::send_event(&event, sub_id, &conn.sender).await; + } + } + } + // 处理直接发送给客户端的消息:从 client_rx 接收消息 + Some(msg) = client_rx.recv() => { + debug!("Client {} sending message: {}", client_id, msg); + if ws_sender + .send(Message::Text(msg.into())) + .await + .is_err() + { + info!("Client {} sender disconnected", client_id); + break; + } + } + else => { + info!("Client {} send task terminated unexpectedly", client_id); + break; + } + } + } + }); + + // 循环接收来自客户端的 WebSocket 消息 + while let Some(msg) = ws_reciver.next().await { + match msg { + Ok(Message::Text(text)) => { + debug!("Client {} received text: {}", client_id, text); + let pool_clone = pool.clone(); + let client_conn_for_msg = client_conn.clone(); + let trust_accounts_clone = trust_accounts.clone(); + let client_tx = client_tx.clone(); + let event_tx = event_tx.clone(); + // 在一个新的 Task 中处理消息,避免阻塞主循环 + tokio::spawn(async move { + if let Err(e) = handle_message( + &text, + &pool_clone, + &client_tx, + &client_conn_for_msg, + &event_tx, + trust_accounts_clone, + ) + .await + { + error!("Error handling message for client {}: {}", client_id, e); + RelayMessage::send_notice(format!("Invalid message: {}", e), &client_tx) + .await; + } + }); + } + Ok(Message::Close(_)) => { + info!("Client {} disconnected via CLOSE message", client_id); + break; + } + Err(e) => { + error!("Client {} WebSocket error: {}", client_id, e); + break; + } + _ => {} + } + } + + // 清理:客户端断开连接时,从全局列表中移除并中止发送任务 + { + let mut clients_map = clients.write().await; + clients_map.remove(&client_id); + } + send_task.abort(); + info!( + "Client {} connection fully closed and cleaned up.", + client_id + ); +} + +// 处理从客户端接收到的 Nostr 协议消息(REQ, EVENT, CLOSE, AUTH) +pub async fn handle_message( + text: &str, + pool: &sqlx::SqlitePool, + to_client_msg_tx: &mpsc::Sender, + client_conn: &Arc>, + event_tx: &broadcast::Sender, + trust_accounts: Arc>>, +) -> Result<(), anyhow::Error> { + let v: Value = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?; + + let arr = match v { + Value::Array(a) => a, + _ => return Err(anyhow!("Expected JSON array, got: {}", v)), + }; + + if arr.is_empty() { + return Err(anyhow!("Empty array message")); + } + + let command = arr[0] + .as_str() + .ok_or_else(|| anyhow!("First element must be a string"))?; + + match command { + "REQ" => { + if arr.len() < 3 { + RelayMessage::send_notice( + "Not enough array elements for REQ: REQ [filter2...]" + .to_string(), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + let sub_id = arr + .get(1) + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("REQ missing sub_id"))? + .to_string(); + + // 检查客户端是否超出最大订阅数量 + { + let conn = client_conn.read().await; + if conn.subscriptions.len() >= MAX_SUBSCRIPTIONS { + RelayMessage::send_closed( + &sub_id, + format!("Maximum subscriptions ({}) exceeded", MAX_SUBSCRIPTIONS), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + } + let mut filters = Vec::new(); + for i in 2..arr.len() { + if filters.len() >= MAX_FILTERS_PER_REQ { + RelayMessage::send_closed( + &sub_id, + format!("Maximum filters ({}) exceeded", MAX_FILTERS_PER_REQ), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + let filter: Filter = serde_json::from_value(arr[i].clone()) + .map_err(|e| anyhow!("Filter parse error: {}", e))?; + filters.push(filter); + } + + debug!("REQ subscription: {}, filters: {:?}", sub_id, filters); + // 将订阅信息添加到客户端连接中 + { + let mut conn = client_conn.write().await; + conn.subscriptions.insert( + sub_id.clone(), + Subscription { + filters: filters.clone(), + }, + ); + } + + // 查询历史事件并发送 + for filter in &filters { + let events = filter.select(pool).await?; // 使用 FilterExt + for event in events { + RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await; + } + } + + // 发送 EOSE 消息,表示历史事件已发送完毕 + RelayMessage::send_eose(&sub_id, to_client_msg_tx).await; + Ok(()) + } + + "EVENT" => { + let event: NostrEvent = serde_json::from_value(arr[1].clone()) + .map_err(|e| anyhow!("Event parse error: {}", e))?; + debug!("EVENT received: {:?}", event); + + // 验证事件签名和基本有效性 (时间戳、标签数量等) + if !event.verify() { + // 使用 NostrEventExt + RelayMessage::send_ok( + &event.id, + false, + "invalid: signature verification failed or event malformed".to_string(), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + + let pool_clone = pool.clone(); + let to_client_msg_tx_clone = to_client_msg_tx.clone(); + let event_tx_clone = event_tx.clone(); + let event_clone = event.clone(); + let client_conn_clone = client_conn.clone(); + let trust_accounts_clone = trust_accounts.clone(); + + // 在一个新的 Task 中处理事件的保存和广播,避免阻塞 WebSocket 接收循环 + tokio::spawn(async move { + // 权限检查:是否需要认证,以及是否在信任列表中 + if SERVER_INFO.auth_required { + let conn = client_conn_clone.read().await; + if !conn.authenticated { + RelayMessage::send_closed( + // NIP-42 Auth: Forbid unauthenticated EVENT + &conn.id.to_string(), // 使用 client_id 作为 sub_id 来匹配 AUTH 挑战 + "auth-required: Authentication required to publish events".to_string(), + &to_client_msg_tx_clone, + ) + .await; + debug!("Client {}: Not authenticated, event rejected.", conn.id); + return; + } + let trust_accounts_guard = trust_accounts_clone.read().await; + if !trust_accounts_guard.contains(&event_clone.pubkey) { + RelayMessage::send_closed( + // NIP-42 Auth: Forbid unauthorized EVENT + &conn.id.to_string(), + "restricted: You are not in the trust list to publish events" + .to_string(), + &to_client_msg_tx_clone, + ) + .await; + debug!( + "Client {}: Pubkey {} not in trust list, event rejected.", + conn.id, event_clone.pubkey + ); + return; + } + } + + // 保存事件到数据库,并发送 OK 响应 + match event_clone.save(&pool_clone).await { + // 使用 NostrEventExt + Ok(_) => { + RelayMessage::send_ok( + &event_clone.id, + true, + "Event received and saved".to_string(), + &to_client_msg_tx_clone, + ) + .await; + // 广播事件给所有订阅的客户端 + if let Err(e) = + event_tx_clone.send(serde_json::to_string(&event_clone).unwrap()) + { + error!("Failed to broadcast event {}: {}", event_clone.id, e); + } + // 如果是 Kind 3 事件(联系人列表)且来自服务器公钥,则更新信任列表 + if event_clone.kind == 3 && event_clone.pubkey == SERVER_INFO.pubkey { + let mut ts = trust_accounts_clone.write().await; + let new_trust_accounts = extract_p_tags_from_vec(&event_clone.tags); // 使用 Nostr 工具函数 + let mut new_trust_accounts_set: HashSet = + new_trust_accounts.into_iter().collect(); + new_trust_accounts_set.insert(SERVER_INFO.pubkey.to_string()); // 确保服务器公钥始终在信任列表中 + *ts = new_trust_accounts_set; + debug!("Trust list updated to {} accounts.", ts.len()); + } + } + Err(e) => { + RelayMessage::send_ok( + &event_clone.id, + false, + e.to_string(), + &to_client_msg_tx_clone, + ) + .await; + error!("Failed to save event {}: {}", event_clone.id, e); + } + } + }); + Ok(()) + } + + "CLOSE" => { + let sub_id = arr + .get(1) + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("CLOSE missing sub_id"))? + .to_string(); + + // 从客户端连接中移除订阅 + { + let mut conn = client_conn.write().await; + conn.subscriptions.remove(&sub_id); + } + + debug!("CLOSE subscription: {}", sub_id); + RelayMessage::send_closed( + &sub_id, + "Subscription cancelled".to_string(), + to_client_msg_tx, + ) + .await; + Ok(()) + } + + "AUTH" => { + if arr.len() < 2 { + RelayMessage::send_notice( + "AUTH message requires an event: [\"AUTH\", ]".to_string(), + to_client_msg_tx, + ) + .await; + return Ok(()); + } + + let auth_event: NostrEvent = serde_json::from_value(arr[1].clone()) + .map_err(|e| anyhow!("AUTH event parse error: {}", e))?; + + // 处理认证事件 + auth_event + .handle_auth_event(client_conn, to_client_msg_tx) // 使用 NostrEventExt + .await?; + Ok(()) + } + other => { + return Err(anyhow!("Unknown command: {}", other)); + } + } +}