RelayMessage

This commit is contained in:
2025-06-02 21:07:42 +08:00
parent d5758404f7
commit 2251bd30e8
5 changed files with 57 additions and 100 deletions
-20
View File
@@ -1,20 +0,0 @@
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum ClientMessage {
REQ(String, String, Vec<Filter>), //["REQ", <sub_id>, <Fittet>]
Event(String, NostrEvent), //["EVENT", event]
CLOSE(String, String), //["CLSOE", "<sub_id>"]
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Filter {
pub ids: Option<Vec<String>>, // event ids 列表
pub authors: Option<Vec<String>>, // pubkeys 列表,小写字符串
pub kinds: Option<Vec<u32>>, // kind 类型列表,数字
#[serde(flatten)]
pub tag_filters: HashMap<String, Vec<String>>,
// 用 HashMap 表示 #<letter> 标签过滤器,例如 #e, #p 等,key 是字符串包含#value 是对应列表
pub since: Option<u64>, // Unix 时间戳,秒
pub until: Option<u64>, // Unix 时间戳,秒
pub limit: Option<u64>, // 最大返回数
}
-3
View File
@@ -1,3 +0,0 @@
pub use self::nostr_event::{NostrEvent, Filter};
pub use self::relay_message::{RelayMessage, ClientMessage};
mod event;
-56
View File
@@ -1,56 +0,0 @@
use serde::{Deserialize, Serialize};
use sqlx::Row;
use std::error::Error;
use sha2::{Digest, Sha256};
#[derive(Debug, Serialize, Deserialize)]
pub struct NostrEvent {
pub id: String, // 32 bytes hex string of sha256
pub pubkey: String, // 32 bytes hex pubkey
pub created_at: u64, // unix timestamp seconds
pub kind: u32, // integer 0~65535
pub tags: Vec<Vec<String>>, // 二维字符串数组
pub content: String, // 事件内容
pub sig: String, // 64 bytes hex signature
}
impl NostrEvent {
fn serialize(&self) -> String {
let serialized = json!([
0,
self.pubkey,
self.created_at,
self.kind,
self.tags,
self.content
]).to_string();
serialized
}
fn verify(&self) -> bool {
let mut hasher = Sha256::new();
hasher.update(&self.serialize());
let result = hasher.finalize();
if hex::encode(result) == self.id {
return true;
} else {
return false;
}
}
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
let tags_json = serde_json::to_string(&self.tags).unwrap();
let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)";
sqlx::query(sql)
.bind(&self.id)
.bind(&self.pubkey)
.bind(self.created_at as i64)
.bind(self.kind)
.bind(tags_json)
.bind(&self.content)
.bind(&self.sig)
.execute(pool)
.await?;
Ok(())
}
}
-9
View File
@@ -1,9 +0,0 @@
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum RelayMessage {
Event(String, String, NostrEvent), // ["EVENT", <sub_id>, <event>]
Ok(String, String, bool, String), // ["OK", <event_id>, bool, <message>]
Eose(String, String), // ["EOSE", <sub_id>]
Closed(String, String, String), // ["CLOSED", <sub_id>, <message>]
Notice(String, String), // ["NOTICE", <message>]
}
+57 -12
View File
@@ -38,11 +38,11 @@ pub struct NostrEvent {
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum RelayMessage {
Event(String, String, NostrEvent), // ["EVENT", <sub_id>, <event>]
Ok(String, String, bool, String), // ["OK", <event_id>, bool, <message>]
Eose(String, String), // ["EOSE", <sub_id>]
Closed(String, String, String), // ["CLOSED", <sub_id>, <message>]
Notice(String, String), // ["NOTICE", <message>]
Event { sub_id: String, event: NostrEvent, }, // ["EVENT", <sub_id>, <event>]
Ok { event_id: String, accept: bool, message: String, }, // ["OK", <event_id>, bool, <message>]
Eose { sub_id: String, }, // ["EOSE", <sub_id>]
Closed { sub_id: String, message: String, }, // ["CLOSED", <sub_id>, <message>]
Notice { message: String, }, // ["NOTICE", <message>]
}
#[derive(Debug, Deserialize)]
@@ -90,6 +90,12 @@ struct ServerInfo {
version: &'static str,
}
struct ClientSub {
sub_id: String,
filter: Filter,
tx: broadcast::Sender<String>,
}
#[tokio::main]
async fn main() {
env_logger::init();
@@ -192,7 +198,6 @@ async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> {
version: env!("CARGO_PKG_VERSION"),
};
// 序列化为 JSON
let json = serde_json::to_string(&info)
.expect("Failed to serialize server info");
let response = format!(
@@ -317,6 +322,10 @@ async fn handle_message(
.ok_or_else(|| anyhow!("First element must be a string"))?
{
"REQ" => {
if arr.len() < 3 {
RelayMessage::any("Not enough array elements".to_string(), to_client_msg_tx).await;
return Ok(());
}
let sub_id = arr
.get(1)
.and_then(Value::as_str)
@@ -327,12 +336,9 @@ async fn handle_message(
debug!("REQ subscription: {}, filters: {:?}", sub_id, filters);
let r = filters.select(&sub_id, pool).await?;
for event in r {
let event = serde_json::to_string(&event).unwrap();
let msg = "[\"EVENT\", \"".to_string() + &sub_id + "\", " + &event + "]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
RelayMessage::EVENT(&event, &sub_id, to_client_msg_tx).await;
}
RelayMessage::EOSE(&sub_id, to_client_msg_tx).await;
Ok(())
}
@@ -342,11 +348,17 @@ async fn handle_message(
debug!("EVENT received: {:?}", event);
if event.verify() {
let pool = pool.clone();
let to_client_msg_tx = to_client_msg_tx.clone();
tokio::spawn(async move {
if let Err(e) = event.save(&pool).await {
RelayMessage::OK(&event, false, e.to_string(), &to_client_msg_tx).await;
error!("Failed to save event: {}", e);
} else {
RelayMessage::OK(&event, true, "".to_string(), &to_client_msg_tx).await;
}
});
} else {
RelayMessage::OK(&event, false, "Invalid signature".to_string(), to_client_msg_tx).await;
}
Ok(())
}
@@ -425,7 +437,7 @@ impl NostrEvent {
}
_ => {}
}
let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)";
let sql = "INSERT IGNORE INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?);";
sqlx::query(sql)
.bind(&self.id)
.bind(&self.pubkey)
@@ -526,3 +538,36 @@ impl Filter {
return false;
}
}
impl RelayMessage {
async fn OK(event: &NostrEvent, accept: bool, message: String, to_client_msg_tx: &mpsc::Sender<String>) {
let msg:String = "[\"OK\", \"".to_string() + &event.id + "\", " + &accept.to_string() + ", \"" + &message + "\"]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
}
async fn EVENT(
event: &NostrEvent,
sub_id: &str,
to_client_msg_tx: &mpsc::Sender<String>,
) {
let msg = "[\"EVENT\", \"".to_string() + sub_id + "\", " + &event.serialize() + "]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
}
async fn EOSE(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
let msg = "[\"EOSE\", \"".to_string() + sub_id + "\"]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
}
async fn any(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
if let Err(e) = to_client_msg_tx.send(message).await {
error!("Failed to send message: {}", e);
}
}
}