Add: HTML UI, Add: More Query

This commit is contained in:
2025-06-15 22:19:33 +08:00
parent 3fe12a5fbd
commit 1bc3afbff1
+358 -112
View File
@@ -2,6 +2,7 @@ use anyhow::Result;
use anyhow::anyhow;
use futures_util::{SinkExt, StreamExt};
use hex;
use httparse::{EMPTY_HEADER, Request};
use log::{Level, debug, error, info, log_enabled};
use secp256k1::{Message as SecpMessage, PublicKey, Secp256k1, ecdsa::Signature};
use serde::{Deserialize, Serialize};
@@ -13,7 +14,10 @@ use sqlx::Row;
use sqlx::query;
use sqlx::sqlite::SqlitePool;
use std::env;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::{collections::HashMap, sync::Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::{
net::{TcpListener, TcpStream},
@@ -21,8 +25,16 @@ use tokio::{
};
use tokio_tungstenite::{WebSocketStream, accept_async, tungstenite::protocol::Message};
use uuid::Uuid;
use httparse::{Request, EMPTY_HEADER};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const DEFAULT_BIND_ADDR: &str = "0.0.0.0:8080";
const DEFAULT_DB_PATH: &str = "nostr.db";
const MAX_EVENT_TAGS: u32 = 5000;
const MAX_LIMIT: u64 = 500;
const DEFAULT_LIMIT: u64 = 10;
const MAX_FILTERS_PER_REQ: usize = 100;
const BROADCAST_CHANNEL_SIZE: usize = 100;
const CLIENT_CHANNEL_SIZE: usize = 32;
const MAX_SUBSCRIPTIONS: usize = 20;
#[derive(Debug, Serialize, Deserialize)]
pub struct NostrEvent {
@@ -38,11 +50,25 @@ pub struct NostrEvent {
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum RelayMessage {
Event { sub_id: String, event: NostrEvent, }, // ["EVENT", <sub_id>, <event>]
Ok { event_id: String, accept: bool, message: String, }, // ["OK", <event_id>, bool, <message>]
Eose { sub_id: String, }, // ["EOSE", <sub_id>]
Closed { sub_id: String, message: String, }, // ["CLOSED", <sub_id>, <message>]
Notice { message: String, }, // ["NOTICE", <message>]
Event {
sub_id: String,
event: NostrEvent,
}, // ["EVENT", <sub_id>, <event>]
Ok {
event_id: String,
accept: bool,
message: String,
}, // ["OK", <event_id>, bool, <message>]
Eose {
sub_id: String,
}, // ["EOSE", <sub_id>]
Closed {
sub_id: String,
message: String,
}, // ["CLOSED", <sub_id>, <message>]
Notice {
message: String,
}, // ["NOTICE", <message>]
}
#[derive(Debug, Deserialize)]
@@ -90,36 +116,32 @@ struct ServerInfo {
version: &'static str,
}
struct ClientSub {
sub_id: String,
filter: Filter,
tx: broadcast::Sender<String>,
}
#[tokio::main]
async fn main() {
async fn main() -> Result<()> {
env_logger::init();
let db_path = env::var("DB_PATH").expect("DB_PATH must be set");
let db_path = env::var("DB_PATH").unwrap_or_else(|_| "nostr.db".to_string());
let db_url = format!("sqlite://{}", db_path);
let pool = SqlitePool::connect(&db_url).await.unwrap();
let pool = SqlitePool::connect(&db_url).await?;
init_database(&pool).await?;
let pool = Arc::new(pool);
info!("Connected to SQLite");
let addr = "0.0.0.0:8080";
let addr = env::var("BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string());
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
info!("Listening on: {}", addr);
let (tx, _) = broadcast::channel::<String>(100);
let (event_tx, _) = broadcast::channel::<String>(100);
while let Ok((stream, addr)) = listener.accept().await {
let tx = tx.clone();
let rx = tx.subscribe();
while let Ok((stream, client_addr)) = listener.accept().await {
let event_tx = event_tx.clone();
let event_rx = event_tx.subscribe();
let pool = pool.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection_multiplex(stream, tx, rx, pool).await {
if let Err(e) = handle_connection_multiplex(stream, event_tx, event_rx, pool).await {
error!("Error handling connection: {}", e);
}
});
}
Ok(())
}
async fn handle_connection_multiplex(
@@ -149,17 +171,17 @@ async fn handle_connection_multiplex(
Some((name, val))
})
.collect::<std::collections::HashMap<_, _>>();
if method == "GET"
&& header_map
.get("upgrade")
.map(|&v| v.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
&& header_map
.get("upgrade")
.map(|&v| v.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
{
handle_ws_connection(stream, tx, rx, pool).await;
}else {
} else {
if let Some(accept) = header_map.get("accept") {
if accept.contains("application/json") {
if accept.contains("application/json") || accept.contains("application/json") {
handle_http_info(stream).await?;
} else if accept.contains("text/html") {
handle_http(stream).await?;
@@ -197,45 +219,187 @@ async fn handle_http_info(mut stream: TcpStream) -> Result<(), anyhow::Error> {
supported_nips: vec![1, 2, 5, 65],
version: env!("CARGO_PKG_VERSION"),
};
let json = serde_json::to_string(&info)
.expect("Failed to serialize server info");
let json = serde_json::to_string(&info).expect("Failed to serialize server info");
let response = format!(
"HTTP/1.1 200 OK\r\n\
"HTTP/1.1 200 OK\r\n\
Content-Type: application/nostr+json\r\n\
Access-Control-Allow-Origin: *\r\n\
\r\n\
{}",
json);
json
);
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
async fn handle_http(mut stream: TcpStream) -> Result<(), anyhow::Error> {
let mut buffer = vec![0; 1024];
stream.read(&mut buffer).await?;
let html = format!(
r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Nostr Relay Dashboard</title>
<style>
:root {{
--bg: #fff;
--fg: #333;
--card: #f9f9f9;
--accent: #0052cc;
--accent-light: #e6ebff;
}}
[data-theme="dark"] {{
--bg: #1e1e1e;
--fg: #ddd;
--card: #2a2a2a;
--accent: #4e8cff;
--accent-light: #3a3f58;
}}
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{
background: var(--bg);
color: var(--fg);
font-family: "Segoe UI", Roboto, sans-serif;
line-height: 1.6;
padding: 20px;
transition: background 0.3s, color 0.3s;
}}
.toggle {{
position: fixed;
top: 20px; right: 20px;
background: var(--card);
border: none;
padding: 8px 12px;
border-radius: 4px;
cursor: pointer;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
transition: background 0.3s;
}}
.container {{
max-width: 600px;
margin: 60px auto 0;
background: var(--card);
border-radius: 8px;
padding: 30px;
box-shadow: 0 4px 10px rgba(0,0,0,0.05);
transition: background 0.3s;
}}
h1 {{
font-size: 2rem;
margin-bottom: 10px;
display: flex;
align-items: center;
}}
h1 .rocket {{
font-size: 1.5rem;
margin-right: 8px;
}}
.badge {{
background: var(--accent);
color: #fff;
font-size: 0.8rem;
padding: 2px 8px;
border-radius: 12px;
margin-left: auto;
}}
.info {{
margin: 20px 0;
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
}}
.info p {{
background: var(--accent-light);
padding: 12px;
border-radius: 6px;
}}
.info p strong {{
display: block;
margin-bottom: 4px;
}}
.connect-btn {{
display: inline-block;
background: var(--accent);
color: #fff;
text-decoration: none;
padding: 10px 20px;
border-radius: 6px;
font-weight: bold;
transition: background 0.3s;
}}
.connect-btn:hover {{
background: #003ba1;
}}
@media (max-width: 480px) {{
.info {{
grid-template-columns: 1fr;
}}
.toggle {{
top: auto;
bottom: 20px;
}}
}}
</style>
</head>
<body data-theme="light">
<button class="toggle" onclick="toggleTheme()">切换 夜/日 间模式</button>
<div class="container">
<h1><span class="rocket">🚀</span>Nostr Relay Server<span class="badge">Ver.{}</span></h1>
<div class="info">
<p><strong>WebSocket URL</strong><br>wss://nostr-relay.moe.gift</p>
<p><strong>Status</strong><br>✅ Running</p>
</div>
<p>使用任意兼容 Nostr 协议的客户端连接到上面的 WebSocket 地址,即可发布和接收事件。</p>
</div>
<script>
function toggleTheme() {{
const html = document.body;
const next = html.getAttribute("data-theme") === "light" ? "dark" : "light";
html.setAttribute("data-theme", next);
localStorage.setItem("theme", next);
}}
// 页面加载时恢复上次主题
(function(){{
const saved = localStorage.getItem("theme");
if (saved) document.body.setAttribute("data-theme", saved);
}})();
</script>
</body>
</html>
"#,
env!("CARGO_PKG_VERSION")
);
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/html\r\n\
Access-Control-Allow-Origin: *\r\n\
\r\n\
A Simple Nostr Relay by laoXong Version:{}",
env!("CARGO_PKG_VERSION"));
"HTTP/1.1 200 OK\r\n\
Content-Type: text/html; charset=utf-8\r\n\
Content-Length: {}\r\n\
Access-Control-Allow-Origin: *\r\n\
\r\n\
{}",
html.len(),
html
);
stream.write_all(response.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
async fn handle_ws_connection(
stream: TcpStream,
tx: broadcast::Sender<String>,
mut rx: broadcast::Receiver<String>,
event_tx: broadcast::Sender<String>,
mut event_rx: broadcast::Receiver<String>,
pool: Arc<SqlitePool>,
) {
let mut ws_stream = match accept_async(stream).await {
let ws_stream = match accept_async(stream).await {
Ok(stream) => stream,
Err(e) => {
error!("WebSocket handshake failed: {}", e);
@@ -243,12 +407,12 @@ async fn handle_ws_connection(
}
};
let (mut ws_sender, mut ws_reciver) = ws_stream.split();
let (to_client_msg_tx, mut to_client_msg_rx) = mpsc::channel::<String>(32);
let (client_tx, mut client_rx) = mpsc::channel::<String>(32);
let send_task = tokio::spawn(async move {
loop {
tokio::select! {
//Subscribed events
Ok(msg) = rx.recv() => {
Ok(msg) = event_rx.recv() => {
if ws_sender
.send(Message::Text(msg.into()))
.await
@@ -258,7 +422,7 @@ async fn handle_ws_connection(
}
}
//Send message to client
Some(msg) = to_client_msg_rx.recv() => {
Some(msg) = client_rx.recv() => {
debug!("Sending message to client: {}", msg);
if ws_sender
.send(Message::Text(msg.into()))
@@ -279,13 +443,13 @@ async fn handle_ws_connection(
Ok(Message::Text(text)) => {
debug!("Received text: {}", text);
let pool = pool.clone();
match handle_message(&text, &pool, &to_client_msg_tx).await {
match handle_message(&text, &pool, &client_tx).await {
Ok(_) => {
debug!("Message handled successfully");
}
Err(e) => {
error!("Error handling message: {}", e);
to_client_msg_tx
client_tx
.send(format!("Error: {}", e))
.await
.unwrap();
@@ -323,7 +487,11 @@ async fn handle_message(
{
"REQ" => {
if arr.len() < 3 {
RelayMessage::any("Not enough array elements".to_string(), to_client_msg_tx).await;
RelayMessage::send_message(
"Not enough array elements".to_string(),
to_client_msg_tx,
)
.await;
return Ok(());
}
let sub_id = arr
@@ -334,11 +502,11 @@ async fn handle_message(
let filters: Filter = serde_json::from_value(arr[2].clone())
.map_err(|e| anyhow!("Filter parse error: {}", e))?;
debug!("REQ subscription: {}, filters: {:?}", sub_id, filters);
let r = filters.select(&sub_id, pool).await?;
let r = filters.select(pool).await?;
for event in r {
RelayMessage::EVENT(&event, &sub_id, to_client_msg_tx).await;
RelayMessage::send_event(&event, &sub_id, to_client_msg_tx).await;
}
RelayMessage::EOSE(&sub_id, to_client_msg_tx).await;
RelayMessage::send_eose(&sub_id, to_client_msg_tx).await;
Ok(())
}
@@ -351,14 +519,22 @@ async fn handle_message(
let to_client_msg_tx = to_client_msg_tx.clone();
tokio::spawn(async move {
if let Err(e) = event.save(&pool).await {
RelayMessage::OK(&event, false, e.to_string(), &to_client_msg_tx).await;
RelayMessage::send_ok(&event, false, e.to_string(), &to_client_msg_tx)
.await;
error!("Failed to save event: {}", e);
} else {
RelayMessage::OK(&event, true, "Saved".to_string(), &to_client_msg_tx).await;
RelayMessage::send_ok(&event, true, "Saved".to_string(), &to_client_msg_tx)
.await;
}
});
} else {
RelayMessage::OK(&event, false, "Invalid signature".to_string(), to_client_msg_tx).await;
RelayMessage::send_ok(
&event,
false,
"Invalid signature".to_string(),
to_client_msg_tx,
)
.await;
}
Ok(())
}
@@ -378,6 +554,37 @@ async fn handle_message(
}
}
async fn init_database(pool: &SqlitePool) -> Result<()> {
let create_events_table = r#"
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
pubkey TEXT NOT NULL,
created_at INTEGER NOT NULL,
kind INTEGER NOT NULL,
tags TEXT NOT NULL,
content TEXT NOT NULL,
sig TEXT NOT NULL,
indexed_at INTEGER DEFAULT (unixepoch())
);
"#;
let create_indexes = vec![
"CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey);",
"CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind);",
"CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at);",
"CREATE INDEX IF NOT EXISTS idx_events_pubkey_kind ON events(pubkey, kind);",
];
query(create_events_table).execute(pool).await?;
for index_sql in create_indexes {
query(index_sql).execute(pool).await?;
}
info!("Database initialized successfully");
Ok(())
}
impl NostrEvent {
fn serialize(&self) -> String {
let serialized = json!([
@@ -396,11 +603,24 @@ impl NostrEvent {
let mut hasher = Sha256::new();
hasher.update(&self.serialize());
let result = hasher.finalize();
if hex::encode(result) == self.id {
return true;
} else {
if hex::encode(result) != self.id {
return false;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if self.created_at > now + 900 {
return false;
}
// 验证标签数量
if self.tags.len() > MAX_EVENT_TAGS as usize {
return false;
}
return true;
}
async fn save(&self, pool: &SqlitePool) -> Result<(), sqlx::Error> {
@@ -408,10 +628,7 @@ impl NostrEvent {
match self.kind {
3 => {
let sql = "DELETE FROM events WHERE id = ? AND kind = 3";
sqlx::query(sql)
.bind(&self.id)
.execute(pool)
.await?;
sqlx::query(sql).bind(&self.id).execute(pool).await?;
}
5 => {
@@ -419,10 +636,7 @@ impl NostrEvent {
for tag in &self.tags {
if tag.get(0).map(|s| s == "e").unwrap_or(false) && tag.len() > 1 {
let sql = "DELETE FROM events WHERE id = ?";
sqlx::query(sql)
.bind(&tag[1])
.execute(pool)
.await?;
sqlx::query(sql).bind(&tag[1]).execute(pool).await?;
}
}
}
@@ -430,9 +644,9 @@ impl NostrEvent {
10002 => {
if !self.tags.is_empty() {
sqlx::query("DELETE FROM events WHERE id = ? AND kind = 10002")
.bind(&self.id)
.execute(pool)
.await?;
.bind(&self.id)
.execute(pool)
.await?;
}
}
_ => {}
@@ -453,49 +667,74 @@ impl NostrEvent {
}
impl Filter {
async fn select(
&self,
sub_id: &str,
pool: &SqlitePool,
) -> Result<Vec<NostrEvent>, sqlx::Error> {
let mut sql: QueryBuilder<sqlx::Sqlite> = QueryBuilder::new(r#"SELECT id, pubkey, created_at, kind, tags, content, sig
FROM events
WHERE 1=1
"#);
async fn select(&self, pool: &SqlitePool) -> Result<Vec<NostrEvent>, sqlx::Error> {
let mut sql = QueryBuilder::new(
"SELECT id, pubkey, created_at, kind, tags, content, sig FROM events WHERE 1=1",
);
// ID过滤
if let Some(ids) = &self.ids {
if let Some(pubkey) = &self.authors {
sql.push(" AND pubkey in (");
if !ids.is_empty() {
sql.push(" AND id IN (");
let mut separated = sql.separated(",");
for pubkey in pubkey {
separated.push_bind(pubkey);
for id in ids {
separated.push_bind(id);
}
separated.push_unseparated(")");
}
}
if let Some(pubkeys) = &self.authors {
if let Some(kinds) = &self.kinds {
if !kinds.is_empty() {
sql.push(" AND kind in (");
let mut separated = sql.separated(",");
for kind in kinds {
separated.push_bind(kind);
}
separated.push_unseparated(")");
// 作者过滤
if let Some(authors) = &self.authors {
if !authors.is_empty() {
sql.push(" AND pubkey IN (");
let mut separated = sql.separated(",");
for author in authors {
separated.push_bind(author);
}
separated.push_unseparated(")");
}
}
// 类型过滤
if let Some(kinds) = &self.kinds {
if !kinds.is_empty() {
sql.push(" AND kind IN (");
let mut separated = sql.separated(",");
for kind in kinds {
separated.push_bind(kind);
}
separated.push_unseparated(")");
}
}
// 时间过滤
if let Some(since) = self.since {
sql.push(" AND created_at >= ").push_bind(since as i64);
}
if let Some(until) = self.until {
sql.push(" AND created_at <= ").push_bind(until as i64);
}
// TODO: 实现标签过滤
// for (tag_name, tag_values) in &self.tag_filters {
// // 实现 #e, #p 等标签过滤
// }
sql.push(" ORDER BY created_at DESC");
let max_limit = self.limit.unwrap_or(10).min(500);
sql.push(" LIMIT ").push_bind(max_limit as i64);
let limit = self.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT);
sql.push(" LIMIT ").push_bind(limit as i64);
let query = sql.build();
debug!("SQL: {}", query.sql());
let result = query.fetch_all(pool).await?;
debug!("Selected {} events", result.len());
let events: Vec<NostrEvent> = result
.iter()
let rows = query.fetch_all(pool).await?;
debug!("Selected {} events", rows.len());
let events: Vec<NostrEvent> = rows
.into_iter()
.map(|row| NostrEvent {
id: row.get(0),
pubkey: row.get(1),
@@ -507,11 +746,11 @@ impl Filter {
sig: row.get(6),
})
.collect();
debug!("Selected {:?} event", events);
Ok(events)
}
fn check(&self, event: &NostrEvent) -> bool {
fn matchs(&self, event: &NostrEvent) -> bool {
if let Some(ids) = &self.ids {
if ids.contains(&event.id) {
return true;
@@ -532,18 +771,25 @@ impl Filter {
}
impl RelayMessage {
async fn OK(event: &NostrEvent, accept: bool, message: String, to_client_msg_tx: &mpsc::Sender<String>) {
let msg:String = "[\"OK\", \"".to_string() + &event.id + "\", " + &accept.to_string() + ", \"" + &message + "\"]";
async fn send_ok(
event: &NostrEvent,
accept: bool,
message: String,
to_client_msg_tx: &mpsc::Sender<String>,
) {
let msg: String = "[\"OK\", \"".to_string()
+ &event.id
+ "\", "
+ &accept.to_string()
+ ", \""
+ &message
+ "\"]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
}
async fn EVENT(
event: &NostrEvent,
sub_id: &str,
to_client_msg_tx: &mpsc::Sender<String>,
) {
async fn send_event(event: &NostrEvent, sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
let event = serde_json::to_string(event).unwrap();
let msg = "[\"EVENT\", \"".to_string() + sub_id + "\", " + &event + "]";
if let Err(e) = to_client_msg_tx.send(msg).await {
@@ -551,16 +797,16 @@ impl RelayMessage {
}
}
async fn EOSE(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
async fn send_eose(sub_id: &str, to_client_msg_tx: &mpsc::Sender<String>) {
let msg = "[\"EOSE\", \"".to_string() + sub_id + "\"]";
if let Err(e) = to_client_msg_tx.send(msg).await {
error!("Failed to send message: {}", e);
}
}
async fn any(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
async fn send_message(message: String, to_client_msg_tx: &mpsc::Sender<String>) {
if let Err(e) = to_client_msg_tx.send(message).await {
error!("Failed to send message: {}", e);
}
}
}
}