NIP-2
This commit is contained in:
+16
@@ -0,0 +1,16 @@
|
||||
FROM rust:alpine AS builder
|
||||
RUN apk add --no-cache musl-dev musl-utils musl gcc
|
||||
WORKDIR /usr/src/app
|
||||
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
|
||||
COPY src ./src
|
||||
ENV RUSTFLAGS="-C target-feature=+crt-static"
|
||||
|
||||
RUN rustup target add x86_64-unknown-linux-musl && cargo build --release --target x86_64-unknown-linux-musl
|
||||
|
||||
FROM alpine:latest
|
||||
RUN apk add --no-cache libgcc
|
||||
|
||||
COPY --from=builder /usr/src/app/target/x86_64-unknown-linux-musl/release/nostr-relay /usr/local/bin/
|
||||
CMD ["nostr-relay"]
|
||||
+108
-17
@@ -8,9 +8,14 @@ use secp256k1::{Message as SecpMessage, PublicKey, Secp256k1, ecdsa::Signature};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Value, json};
|
||||
use sha2::{Digest, Sha256};
|
||||
use sqlx::Execute;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Row;
|
||||
use sqlx::query;
|
||||
use sqlx::sqlite::SqlitePool;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::result;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::{
|
||||
@@ -65,14 +70,13 @@ pub struct Filter {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
let pool = SqlitePool::connect("sqlite:///home/laoxong/Code/nostr.db")
|
||||
.await
|
||||
.unwrap();
|
||||
env_logger::init();
|
||||
let db_path = env::var("DB_PATH").expect("DB_PATH must be set");
|
||||
let db_url = format!("sqlite://{}", db_path);
|
||||
let pool = SqlitePool::connect(&db_url).await.unwrap();
|
||||
let pool = Arc::new(pool);
|
||||
|
||||
info!("Connected to SQLite");
|
||||
let addr = "127.0.0.1:8080";
|
||||
let addr = "0.0.0.0:8080";
|
||||
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
|
||||
info!("Listening on: {}", addr);
|
||||
let (tx, _) = broadcast::channel::<String>(100);
|
||||
@@ -116,6 +120,7 @@ async fn handle_connection(
|
||||
}
|
||||
//Send message to client
|
||||
Some(msg) = to_client_msg_rx.recv() => {
|
||||
debug!("Sending message to client: {}", msg);
|
||||
if ws_sender
|
||||
.send(Message::Text(msg.into()))
|
||||
.await
|
||||
@@ -133,14 +138,18 @@ async fn handle_connection(
|
||||
while let Some(msg) = ws_reciver.next().await {
|
||||
match msg {
|
||||
Ok(Message::Text(text)) => {
|
||||
info!("Received text: {}", text);
|
||||
debug!("Received text: {}", text);
|
||||
let pool = pool.clone();
|
||||
match handle_message(&text, &pool).await {
|
||||
match handle_message(&text, &pool, &to_client_msg_tx).await {
|
||||
Ok(_) => {
|
||||
info!("Message handled successfully");
|
||||
debug!("Message handled successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error handling message: {}", e);
|
||||
to_client_msg_tx
|
||||
.send(format!("Error: {}", e))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -153,7 +162,11 @@ async fn handle_connection(
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Error> {
|
||||
async fn handle_message(
|
||||
text: &str,
|
||||
pool: &SqlitePool,
|
||||
to_client_msg_tx: &mpsc::Sender<String>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let v = serde_json::from_str(text).map_err(|e| anyhow!("Invalid JSON: {}", e))?;
|
||||
|
||||
let arr = match v {
|
||||
@@ -177,15 +190,22 @@ async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Err
|
||||
.to_string();
|
||||
let filters: Filter = serde_json::from_value(arr[2].clone())
|
||||
.map_err(|e| anyhow!("Filter parse error: {}", e))?;
|
||||
info!("REQ subscription: {}, filters: {:?}", sub_id, filters);
|
||||
|
||||
debug!("REQ subscription: {}, filters: {:?}", sub_id, filters);
|
||||
let r = filters.select(&sub_id, pool).await?;
|
||||
for event in r {
|
||||
let event = serde_json::to_string(&event).unwrap();
|
||||
let msg = "[\"EVENT\", \"".to_string() + &sub_id + "\", " + &event + "]";
|
||||
if let Err(e) = to_client_msg_tx.send(msg).await {
|
||||
error!("Failed to send message: {}", e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
"EVENT" => {
|
||||
let event: NostrEvent = serde_json::from_value(arr[1].clone())
|
||||
.map_err(|e| anyhow!("Event parse error: {}", e))?;
|
||||
info!("EVENT received: {:?}", event);
|
||||
debug!("EVENT received: {:?}", event);
|
||||
if event.verify() {
|
||||
let pool = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -203,7 +223,7 @@ async fn handle_message(text: &str, pool: &SqlitePool) -> Result<(), anyhow::Err
|
||||
.and_then(Value::as_str)
|
||||
.ok_or_else(|| anyhow!("CLOSE missing sub_id"))?
|
||||
.to_string();
|
||||
info!("CLOSE subscription: {}", sub_id);
|
||||
debug!("CLOSE subscription: {}", sub_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -239,6 +259,13 @@ impl NostrEvent {
|
||||
|
||||
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
|
||||
let tags_json = serde_json::to_string(&self.tags).unwrap();
|
||||
if self.kind == 3 {
|
||||
let sql = "DELETE FROM events WHERE id = ? AND kind = 3";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
let sql = "INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig) VALUES (?, ?, ?, ?, ?, ?, ?)";
|
||||
sqlx::query(sql)
|
||||
.bind(&self.id)
|
||||
@@ -255,8 +282,73 @@ impl NostrEvent {
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
fn select(&self, pool: &SqlitePool) -> Result<Vec<NostrEvent>, sqlx::Error> {
|
||||
todo!()
|
||||
async fn select(
|
||||
&self,
|
||||
sub_id: &str,
|
||||
pool: &SqlitePool,
|
||||
) -> Result<Vec<NostrEvent>, sqlx::Error> {
|
||||
let mut sql: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(r#"SELECT id, pubkey, created_at, kind, tags, content, sig
|
||||
FROM events
|
||||
WHERE 1=1
|
||||
"#);
|
||||
|
||||
if let Some(ids) = &self.ids {
|
||||
if !&self.ids.is_none() {
|
||||
sql.push(" AND id in (");
|
||||
let mut separated = sql.separated(",");
|
||||
for id in ids {
|
||||
separated.push(&format!("'{}'", id));
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(pubkeys) = &self.authors {
|
||||
if !pubkeys.is_empty() {
|
||||
sql.push(" AND pubkey in (");
|
||||
let mut separated = sql.separated(",");
|
||||
for pubkey in pubkeys {
|
||||
separated.push(&format!("'{}'", pubkey));
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(kinds) = &self.kinds {
|
||||
if !kinds.is_empty() {
|
||||
sql.push(" AND kind in (");
|
||||
let mut separated = sql.separated(",");
|
||||
for kind in kinds {
|
||||
separated.push(&format!("{}", kind));
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
}
|
||||
|
||||
sql.push(" ORDER BY created_at DESC");
|
||||
if let Some(limit) = &self.limit {
|
||||
sql.push(" LIMIT ").push_bind(*limit as i64);
|
||||
} else {
|
||||
sql.push(" LIMIT 10");
|
||||
}
|
||||
let query = sql.build();
|
||||
debug!("SQL: {}", query.sql());
|
||||
let result = query.fetch_all(pool).await?;
|
||||
let events: Vec<NostrEvent> = result
|
||||
.iter()
|
||||
.map(|row| NostrEvent {
|
||||
id: row.get(0),
|
||||
pubkey: row.get(1),
|
||||
created_at: row.get::<i64, _>(2) as u64,
|
||||
kind: row.get(3),
|
||||
tags: serde_json::from_str::<Vec<Vec<String>>>(&row.get::<String, _>(4))
|
||||
.unwrap_or_default(),
|
||||
content: row.get(5),
|
||||
sig: row.get(6),
|
||||
})
|
||||
.collect();
|
||||
debug!("Selected {:?} event", events);
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
fn check(&self, event: &NostrEvent) -> bool {
|
||||
@@ -278,4 +370,3 @@ impl Filter {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user