First init
This commit is contained in:
+23
@@ -0,0 +1,23 @@
|
||||
# ---> Rust
|
||||
# Generated by Cargo
|
||||
# will have compiled files and executables
|
||||
debug/
|
||||
target/
|
||||
|
||||
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
|
||||
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
|
||||
Cargo.lock
|
||||
|
||||
# These are backup files generated by rustfmt
|
||||
**/*.rs.bk
|
||||
|
||||
# MSVC Windows builds of rustc generate these, which store debugging information
|
||||
*.pdb
|
||||
|
||||
# RustRover
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
.idea/
|
||||
.vscode/
|
||||
+20
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "nostr-relay"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-tungstenite = "0.26"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
sha2 = "0.10"
|
||||
secp256k1 = { version = "0.31", features = ["rand"] }
|
||||
hex = "0.4"
|
||||
rand = "0.9"
|
||||
futures-util = "0.3"
|
||||
anyhow = "1.0.98"
|
||||
log = "0.4.27"
|
||||
env_logger = "0.11.8"
|
||||
uuid = { version = "1.6.2", features = ["v4"] }
|
||||
sqlx = { version = "0.7.1", features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
@@ -0,0 +1,20 @@
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ClientMessage {
|
||||
REQ(String, String, Vec<Filter>), //["REQ", <sub_id>, <Fittet>]
|
||||
Event(String, NostrEvent), //["EVENT", event]
|
||||
CLOSE(String, String), //["CLSOE", "<sub_id>"]
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Filter {
|
||||
pub ids: Option<Vec<String>>, // event ids 列表
|
||||
pub authors: Option<Vec<String>>, // pubkeys 列表,小写字符串
|
||||
pub kinds: Option<Vec<u32>>, // kind 类型列表,数字
|
||||
#[serde(flatten)]
|
||||
pub tag_filters: HashMap<String, Vec<String>>,
|
||||
// 用 HashMap 表示 #<letter> 标签过滤器,例如 #e, #p 等,key 是字符串包含#,value 是对应列表
|
||||
pub since: Option<u64>, // Unix 时间戳,秒
|
||||
pub until: Option<u64>, // Unix 时间戳,秒
|
||||
pub limit: Option<u64>, // 最大返回数
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
pub use self::nostr_event::{NostrEvent, Filter};
|
||||
pub use self::relay_message::{RelayMessage, ClientMessage};
|
||||
mod event;
|
||||
@@ -0,0 +1,56 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
use std::error::Error;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NostrEvent {
|
||||
pub id: String, // 32 bytes hex string of sha256
|
||||
pub pubkey: String, // 32 bytes hex pubkey
|
||||
pub created_at: u64, // unix timestamp seconds
|
||||
pub kind: u32, // integer 0~65535
|
||||
pub tags: Vec<Vec<String>>, // 二维字符串数组
|
||||
pub content: String, // 事件内容
|
||||
pub sig: String, // 64 bytes hex signature
|
||||
}
|
||||
|
||||
impl NostrEvent {
|
||||
fn serialize(&self) -> String {
|
||||
let serialized = json!([
|
||||
0,
|
||||
self.pubkey,
|
||||
self.created_at,
|
||||
self.kind,
|
||||
self.tags,
|
||||
self.content
|
||||
]).to_string();
|
||||
serialized
|
||||
}
|
||||
|
||||
fn verify(&self) -> bool {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(&self.serialize());
|
||||
let result = hasher.finalize();
|
||||
if hex::encode(result) == self.id {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
|
||||
let tags_json = serde_json::to_string(&self.tags).unwrap();
|
||||
let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.created_at as i64)
|
||||
.bind(self.kind)
|
||||
.bind(tags_json)
|
||||
.bind(&self.content)
|
||||
.bind(&self.sig)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum RelayMessage {
|
||||
Event(String, String, NostrEvent), // ["EVENT", <sub_id>, <event>]
|
||||
Ok(String, String, bool, String), // ["OK", <event_id>, bool, <message>]
|
||||
Eose(String, String), // ["EOSE", <sub_id>]
|
||||
Closed(String, String, String), // ["CLOSED", <sub_id>, <message>]
|
||||
Notice(String, String), // ["NOTICE", <message>]
|
||||
}
|
||||
+281
@@ -0,0 +1,281 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use hex;
|
||||
use log::{Level, debug, error, info, log_enabled};
|
||||
use rand::rand_core::le;
|
||||
use secp256k1::{Message as SecpMessage, PublicKey, Secp256k1, ecdsa::Signature};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Value, json};
|
||||
use sha2::{Digest, Sha256};
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqlitePool;
|
||||
use std::error::Error;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::broadcast,
|
||||
};
|
||||
use tokio_tungstenite::tungstenite::Utf8Bytes;
|
||||
use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NostrEvent {
|
||||
pub id: String, // 32 bytes hex string of sha256
|
||||
pub pubkey: String, // 32 bytes hex pubkey
|
||||
pub created_at: u64, // unix timestamp seconds
|
||||
pub kind: u32, // integer 0~65535
|
||||
pub tags: Vec<Vec<String>>, // 二维字符串数组
|
||||
pub content: String, // 事件内容
|
||||
pub sig: String, // 64 bytes hex signature
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum RelayMessage {
|
||||
Event(String, String, NostrEvent), // ["EVENT", <sub_id>, <event>]
|
||||
Ok(String, String, bool, String), // ["OK", <event_id>, bool, <message>]
|
||||
Eose(String, String), // ["EOSE", <sub_id>]
|
||||
Closed(String, String, String), // ["CLOSED", <sub_id>, <message>]
|
||||
Notice(String, String), // ["NOTICE", <message>]
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum ClientMessage {
|
||||
REQ { sub_id: String, filter: Vec<Filter> },
|
||||
Event { event: NostrEvent },
|
||||
CLOSE { sub_id: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Filter {
|
||||
pub ids: Option<Vec<String>>, // event ids 列表
|
||||
pub authors: Option<Vec<String>>, // pubkeys 列表,小写字符串
|
||||
pub kinds: Option<Vec<u32>>, // kind 类型列表,数字
|
||||
#[serde(flatten)]
|
||||
pub tag_filters: HashMap<String, Vec<String>>,
|
||||
// 用 HashMap 表示 #<letter> 标签过滤器,例如 #e, #p 等,key 是字符串包含#,value 是对应列表
|
||||
pub since: Option<u64>, // Unix 时间戳,秒
|
||||
pub until: Option<u64>, // Unix 时间戳,秒
|
||||
pub limit: Option<u64>, // 最大返回数
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
let pool = SqlitePool::connect("sqlite:///home/laoxong/Code/nostr.db")
|
||||
.await
|
||||
.unwrap();
|
||||
let pool = Arc::new(pool);
|
||||
|
||||
info!("Connected to SQLite");
|
||||
let addr = "127.0.0.1:8080";
|
||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
|
||||
info!("Listening on: {}", addr);
|
||||
let (tx, _) = broadcast::channel::<String>(100);
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
let tx = tx.clone();
|
||||
let rx = tx.subscribe();
|
||||
let pool = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
handle_connection(stream, tx, rx, pool).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
stream: TcpStream,
|
||||
tx: broadcast::Sender<String>,
|
||||
mut rx: broadcast::Receiver<String>,
|
||||
pool: Arc<SqlitePool>,
|
||||
) {
|
||||
let mut ws_stream = match accept_async(stream).await {
|
||||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
error!("WebSocket handshake failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let (mut ws_sender, mut ws_reciver) = ws_stream.split();
|
||||
let (to_client_msg_tx, mut to_client_msg_rx) = mpsc::channel::<String>(32);
|
||||
let send_task = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
//Subscribed events
|
||||
Ok(msg) = rx.recv() => {
|
||||
if ws_sender
|
||||
.send(Message::Text(msg.into()))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
//Send message to client
|
||||
Some(msg) = to_client_msg_rx.recv() => {
|
||||
if ws_sender
|
||||
.send(Message::Text(msg.into()))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
else => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
while let Some(msg) = ws_reciver.next().await {
|
||||
match msg {
|
||||
Ok(Message::Text(text)) => {
|
||||
info!("Received text: {}", text);
|
||||
let pool = pool.clone();
|
||||
match handle_message(&text, &pool).await {
|
||||
Ok(_) => {
|
||||
info!("Message handled successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error handling message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Message::Close(_)) => {
|
||||
info!("Client disconnected");
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Error> {
|
||||
let v = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?;
|
||||
|
||||
let arr = match v {
|
||||
Value::Array(a) => a,
|
||||
_ => return Err(anyhow!("Expected JSON array, got: {}", v)),
|
||||
};
|
||||
|
||||
if arr.is_empty() {
|
||||
return Err(anyhow!("Empty array message"));
|
||||
}
|
||||
|
||||
match arr[0]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow!("First element must be a string"))?
|
||||
{
|
||||
"REQ" => {
|
||||
let sub_id = arr
|
||||
.get(1)
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("REQ missing sub_id"))?
|
||||
.to_string();
|
||||
let filters: Filter = serde_json::from_value(arr[2].clone())
|
||||
.map_err(|e| anyhow!("Filter parse error: {}", e))?;
|
||||
info!("REQ subscription: {}, filters: {:?}", sub_id, filters);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
"EVENT" => {
|
||||
let event: NostrEvent = serde_json::from_value(arr[1].clone())
|
||||
.map_err(|e| anyhow!("Event parse error: {}", e))?;
|
||||
info!("EVENT received: {:?}", event);
|
||||
if event.verify() {
|
||||
let pool = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = event.save(&pool).await {
|
||||
error!("Failed to save event: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
"CLOSE" => {
|
||||
let sub_id = arr
|
||||
.get(1)
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("CLOSE missing sub_id"))?
|
||||
.to_string();
|
||||
info!("CLOSE subscription: {}", sub_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
other => Err(anyhow!("Unknown command: {}", other)),
|
||||
}
|
||||
}
|
||||
|
||||
impl NostrEvent {
|
||||
fn serialize(&self) -> String {
|
||||
let serialized = json!([
|
||||
0,
|
||||
self.pubkey,
|
||||
self.created_at,
|
||||
self.kind,
|
||||
self.tags,
|
||||
self.content
|
||||
])
|
||||
.to_string();
|
||||
serialized
|
||||
}
|
||||
|
||||
fn verify(&self) -> bool {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(&self.serialize());
|
||||
let result = hasher.finalize();
|
||||
if hex::encode(result) == self.id {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
|
||||
let tags_json = serde_json::to_string(&self.tags).unwrap();
|
||||
let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
.bind(&self.pubkey)
|
||||
.bind(self.created_at as i64)
|
||||
.bind(self.kind)
|
||||
.bind(tags_json)
|
||||
.bind(&self.content)
|
||||
.bind(&self.sig)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
fn select(&self, pool: &SqlitePool) -> Result<Vec<NostrEvent>, sqlx::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn check(&self, event: &NostrEvent) -> bool {
|
||||
if let Some(ids) = &self.ids {
|
||||
if ids.contains(&event.id) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if let Some(authors) = &self.authors {
|
||||
if authors.contains(&event.pubkey) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if let Some(kinds) = &self.kinds {
|
||||
if kinds.contains(&event.kind) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user