diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..593ee96 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM rust:alpine AS builder +RUN apk add --no-cache musl-dev musl-utils musl gcc +WORKDIR /usr/src/app + +COPY Cargo.toml Cargo.lock ./ + +COPY src ./src +ENV RUSTFLAGS="-C target-feature=+crt-static" + +RUN rustup target add x86_64-unknown-linux-musl && cargo build --release --target x86_64-unknown-linux-musl + +FROM alpine:latest +RUN apk add --no-cache libgcc + +COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/nostr-relay /usr/local/bin/ +CMD ["nostr-relay"] \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5bf706f..a107668 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,9 +8,14 @@ use secp256k1::{Message as SecpMessage, PublicKey, Secp256k1, ecdsa::Signature}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use sha2::{Digest, Sha256}; +use sqlx::Execute; +use sqlx::QueryBuilder; use sqlx::Row; +use sqlx::query; use sqlx::sqlite::SqlitePool; +use std::env; use std::error::Error; +use std::result; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc; use tokio::{ @@ -65,14 +70,13 @@ pub struct Filter { #[tokio::main] async fn main() { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - let pool = SqlitePool::connect("sqlite:///home/laoxong/Code/nostr.db") - .await - .unwrap(); + env_logger::init(); + let db_path = env::var("DB_PATH").expect("DB_PATH must be set"); + let db_url = format!("sqlite://{}", db_path); + let pool = SqlitePool::connect(&db_url).await.unwrap(); let pool = Arc::new(pool); - info!("Connected to SQLite"); - let addr = "127.0.0.1:8080"; + let addr = "0.0.0.0:8080"; let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); info!("Listening on: {}", addr); let (tx, _) = broadcast::channel::(100); @@ -116,6 +120,7 @@ async fn handle_connection( } //Send message to client Some(msg) = to_client_msg_rx.recv() => { + debug!("Sending message to client: {}", msg); if ws_sender .send(Message::Text(msg.into())) .await @@ -133,14 +138,18 @@ async fn handle_connection( while let Some(msg) = ws_reciver.next().await { match msg { Ok(Message::Text(text)) => { - info!("Received text: {}", text); + debug!("Received text: {}", text); let pool = pool.clone(); - match handle_message(&text, &pool).await { + match handle_message(&text, &pool, &to_client_msg_tx).await { Ok(_) => { - info!("Message handled successfully"); + debug!("Message handled successfully"); } Err(e) => { error!("Error handling message: {}", e); + to_client_msg_tx + .send(format!("Error: {}", e)) + .await + .unwrap(); } } } @@ -153,7 +162,11 @@ async fn handle_connection( } } -async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Error> { +async fn handle_message( + text: &str, + pool: &SqlitePool, + to_client_msg_tx: &mpsc::Sender, +) -> Result<(), anyhow::Error> { let v = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?; let arr = match v { @@ -177,15 +190,22 @@ async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Err .to_string(); let filters: Filter = serde_json::from_value(arr[2].clone()) .map_err(|e| anyhow!("Filter parse error: {}", e))?; - info!("REQ subscription: {}, filters: {:?}", sub_id, filters); - + debug!("REQ subscription: {}, filters: {:?}", sub_id, filters); + let r = filters.select(&sub_id, pool).await?; + for event in r { + let event = serde_json::to_string(&event).unwrap(); + let msg = "[\"EVENT\", \"".to_string() + &sub_id + "\", " + &event + "]"; + if let Err(e) = to_client_msg_tx.send(msg).await { + error!("Failed to send message: {}", e); + } + } Ok(()) } "EVENT" => { let event: NostrEvent = serde_json::from_value(arr[1].clone()) .map_err(|e| anyhow!("Event parse error: {}", e))?; - info!("EVENT received: {:?}", event); + debug!("EVENT received: {:?}", event); if event.verify() { let pool = pool.clone(); tokio::spawn(async move { @@ -203,7 +223,7 @@ async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Err .and_then(Value::as_str) .ok_or_else(|| anyhow!("CLOSE missing sub_id"))? .to_string(); - info!("CLOSE subscription: {}", sub_id); + debug!("CLOSE subscription: {}", sub_id); Ok(()) } @@ -239,6 +259,13 @@ impl NostrEvent { async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> { let tags_json = serde_json::to_string(&self.tags).unwrap(); + if self.kind == 3 { + let sql = "DELETE FROM events WHERE id = ? AND kind = 3"; + sqlx::query(sql) + .bind(&self.id) + .execute(pool) + .await?; + } let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)"; sqlx::query(sql) .bind(&self.id) @@ -255,8 +282,73 @@ impl NostrEvent { } impl Filter { - fn select(&self, pool: &SqlitePool) -> Result, sqlx::Error> { - todo!() + async fn select( + &self, + sub_id: &str, + pool: &SqlitePool, + ) -> Result, sqlx::Error> { + let mut sql: QueryBuilder = QueryBuilder::new(r#"SELECT id, pubkey, created_at, kind, tags, content, sig + FROM events + WHERE 1=1 + "#); + + if let Some(ids) = &self.ids { + if !&self.ids.is_none() { + sql.push(" AND id in ("); + let mut separated = sql.separated(","); + for id in ids { + separated.push(&format!("'{}'", id)); + } + separated.push_unseparated(")"); + } + } + + if let Some(pubkeys) = &self.authors { + if !pubkeys.is_empty() { + sql.push(" AND pubkey in ("); + let mut separated = sql.separated(","); + for pubkey in pubkeys { + separated.push(&format!("'{}'", pubkey)); + } + separated.push_unseparated(")"); + } + } + + if let Some(kinds) = &self.kinds { + if !kinds.is_empty() { + sql.push(" AND kind in ("); + let mut separated = sql.separated(","); + for kind in kinds { + separated.push(&format!("{}", kind)); + } + separated.push_unseparated(")"); + } + } + + sql.push(" ORDER BY created_at DESC"); + if let Some(limit) = &self.limit { + sql.push(" LIMIT ").push_bind(*limit as i64); + } else { + sql.push(" LIMIT 10"); + } + let query = sql.build(); + debug!("SQL: {}", query.sql()); + let result = query.fetch_all(pool).await?; + let events: Vec = result + .iter() + .map(|row| NostrEvent { + id: row.get(0), + pubkey: row.get(1), + created_at: row.get::(2) as u64, + kind: row.get(3), + tags: serde_json::from_str::>>(&row.get::(4)) + .unwrap_or_default(), + content: row.get(5), + sig: row.get(6), + }) + .collect(); + debug!("Selected {:?} event", events); + Ok(events) } fn check(&self, event: &NostrEvent) -> bool { @@ -278,4 +370,3 @@ impl Filter { return false; } } -