diff --git a/src/event/client_messages b/src/event/client_messages deleted file mode 100644 index 0915504..0000000 --- a/src/event/client_messages +++ /dev/null @@ -1,20 +0,0 @@ -#[derive(Debug, Deserialize)] -#[serde(untagged)] -pub enum ClientMessage { - REQ(String, String, Vec), //["REQ", , ] - Event(String, NostrEvent), //["EVENT", event] - CLOSE(String, String), //["CLSOE", ""] -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Filter { - pub ids: Option>, // event ids 列表 - pub authors: Option>, // pubkeys 列表,小写字符串 - pub kinds: Option>, // kind 类型列表,数字 - #[serde(flatten)] - pub tag_filters: HashMap>, - // 用 HashMap 表示 # 标签过滤器,例如 #e, #p 等,key 是字符串包含#,value 是对应列表 - pub since: Option, // Unix 时间戳,秒 - pub until: Option, // Unix 时间戳,秒 - pub limit: Option, // 最大返回数 -} \ No newline at end of file diff --git a/src/event/mod.rs b/src/event/mod.rs deleted file mode 100644 index 01f758b..0000000 --- a/src/event/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub use self::nostr_event::{NostrEvent, Filter}; -pub use self::relay_message::{RelayMessage, ClientMessage}; -mod event; \ No newline at end of file diff --git a/src/event/nostr_event.rs b/src/event/nostr_event.rs deleted file mode 100644 index 275be50..0000000 --- a/src/event/nostr_event.rs +++ /dev/null @@ -1,56 +0,0 @@ -use serde::{Deserialize, Serialize}; -use sqlx::Row; -use std::error::Error; -use sha2::{Digest, Sha256}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct NostrEvent { - pub id: String, // 32 bytes hex string of sha256 - pub pubkey: String, // 32 bytes hex pubkey - pub created_at: u64, // unix timestamp seconds - pub kind: u32, // integer 0~65535 - pub tags: Vec>, // 二维字符串数组 - pub content: String, // 事件内容 - pub sig: String, // 64 bytes hex signature -} - -impl NostrEvent { - fn serialize(&self) -> String { - let serialized = json!([ - 0, - self.pubkey, - self.created_at, - self.kind, - self.tags, - self.content - ]).to_string(); - serialized - } - - fn verify(&self) -> bool { - let mut hasher = Sha256::new(); - hasher.update(&self.serialize()); - let result = hasher.finalize(); - if hex::encode(result) == self.id { - return true; - } else { - return false; - } - } - - async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> { - let tags_json = serde_json::to_string(&self.tags).unwrap(); - let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)"; - sqlx::query(sql) - .bind(&self.id) - .bind(&self.pubkey) - .bind(self.created_at as i64) - .bind(self.kind) - .bind(tags_json) - .bind(&self.content) - .bind(&self.sig) - .execute(pool) - .await?; - Ok(()) - } -} \ No newline at end of file diff --git a/src/event/relay_messages b/src/event/relay_messages deleted file mode 100644 index b4e4393..0000000 --- a/src/event/relay_messages +++ /dev/null @@ -1,9 +0,0 @@ -#[derive(Debug, Deserialize)] -#[serde(untagged)] -pub enum RelayMessage { - Event(String, String, NostrEvent), // ["EVENT", , ] - Ok(String, String, bool, String), // ["OK", , bool, ] - Eose(String, String), // ["EOSE", ] - Closed(String, String, String), // ["CLOSED", , ] - Notice(String, String), // ["NOTICE", ] -} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 222c94e..c973eb0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,11 +38,11 @@ pub struct NostrEvent { #[derive(Debug, Deserialize)] #[serde(untagged)] pub enum RelayMessage { - Event(String, String, NostrEvent), // ["EVENT", , ] - Ok(String, String, bool, String), // ["OK", , bool, ] - Eose(String, String), // ["EOSE", ] - Closed(String, String, String), // ["CLOSED", , ] - Notice(String, String), // ["NOTICE", ] + Event { sub_id: String, event: NostrEvent, }, // ["EVENT", , ] + Ok { event_id: String, accept: bool, message: String, }, // ["OK", , bool, ] + Eose { sub_id: String, }, // ["EOSE", ] + Closed { sub_id: String, message: String, }, // ["CLOSED", , ] + Notice { message: String, }, // ["NOTICE", ] } #[derive(Debug, Deserialize)] @@ -90,6 +90,12 @@ struct ServerInfo { version: &'static str, } +struct ClientSub { + sub_id: String, + filter: Filter, + tx: broadcast::Sender, +} + #[tokio::main] async fn main() { env_logger::init(); @@ -192,7 +198,6 @@ async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> { version: env!("CARGO_PKG_VERSION"), }; - // 序列化为 JSON let json = serde_json::to_string(&info) .expect("Failed to serialize server info"); let response = format!( @@ -317,6 +322,10 @@ async fn handle_message( .ok_or_else(|| anyhow!("First element must be a string"))? { "REQ" => { + if arr.len() < 3 { + RelayMessage::any("Not enough array elements".to_string(), to_client_msg_tx).await; + return Ok(()); + } let sub_id = arr .get(1) .and_then(Value::as_str) @@ -327,12 +336,9 @@ async fn handle_message( debug!("REQ subscription: {}, filters: {:?}", sub_id, filters); let r = filters.select(&sub_id, pool).await?; for event in r { - let event = serde_json::to_string(&event).unwrap(); - let msg = "[\"EVENT\", \"".to_string() + &sub_id + "\", " + &event + "]"; - if let Err(e) = to_client_msg_tx.send(msg).await { - error!("Failed to send message: {}", e); - } + RelayMessage::EVENT(&event, &sub_id, to_client_msg_tx).await; } + RelayMessage::EOSE(&sub_id, to_client_msg_tx).await; Ok(()) } @@ -342,11 +348,17 @@ async fn handle_message( debug!("EVENT received: {:?}", event); if event.verify() { let pool = pool.clone(); + let to_client_msg_tx = to_client_msg_tx.clone(); tokio::spawn(async move { if let Err(e) = event.save(&pool).await { + RelayMessage::OK(&event, false, e.to_string(), &to_client_msg_tx).await; error!("Failed to save event: {}", e); + } else { + RelayMessage::OK(&event, true, "".to_string(), &to_client_msg_tx).await; } }); + } else { + RelayMessage::OK(&event, false, "Invalid signature".to_string(), to_client_msg_tx).await; } Ok(()) } @@ -425,7 +437,7 @@ impl NostrEvent { } _ => {} } - let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)"; + let sql = "INSERT IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);"; sqlx::query(sql) .bind(&self.id) .bind(&self.pubkey) @@ -526,3 +538,36 @@ impl Filter { return false; } } + +impl RelayMessage { + async fn OK(event: &NostrEvent, accept: bool, message: String, to_client_msg_tx: &mpsc::Sender) { + let msg:String = "[\"OK\", \"".to_string() + &event.id + "\", " + &accept.to_string() + ", \"" + &message + "\"]"; + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send message: {}", e); + } + } + + async fn EVENT( + event: &NostrEvent, + sub_id: &str, + to_client_msg_tx: &mpsc::Sender, + ) { + let msg = "[\"EVENT\", \"".to_string() + sub_id + "\", " + &event.serialize() + "]"; + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send message: {}", e); + } + } + + async fn EOSE(sub_id: &str, to_client_msg_tx: &mpsc::Sender) { + let msg = "[\"EOSE\", \"".to_string() + sub_id + "\"]"; + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send message: {}", e); + } + } + + async fn any(message: String, to_client_msg_tx: &mpsc::Sender) { + if let Err(e) = to_client_msg_tx.send(message).await { + error!("Failed to send message: {}", e); + } + } +} \ No newline at end of file