mirror of
https://github.com/laoxong/nofx.git
synced 2026-06-04 01:48:22 +08:00
3ca95b294d
* feat: integrate NOFXi agent into dev * Enhance NOFXi agent workflow and diagnostics
344 lines
10 KiB
Go
344 lines
10 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"nofx/safe"
|
|
"regexp"
|
|
"time"
|
|
)
|
|
|
|
type storeUserIDContextKey struct{}
|
|
|
|
// WithStoreUserID annotates an HTTP request context with the authenticated store user ID.
|
|
func WithStoreUserID(ctx context.Context, storeUserID string) context.Context {
|
|
return context.WithValue(ctx, storeUserIDContextKey{}, storeUserID)
|
|
}
|
|
|
|
func storeUserIDFromContext(ctx context.Context) string {
|
|
if v, ok := ctx.Value(storeUserIDContextKey{}).(string); ok && v != "" {
|
|
return v
|
|
}
|
|
return "default"
|
|
}
|
|
|
|
// validSymbolRe matches only alphanumeric trading symbols (e.g. BTCUSDT, ETH-USD).
|
|
var validSymbolRe = regexp.MustCompile(`^[A-Za-z0-9\-_]{1,20}$`)
|
|
|
|
// validIntervalRe matches only valid kline intervals (e.g. 1m, 5m, 1h, 4h, 1d, 1w).
|
|
var validIntervalRe = regexp.MustCompile(`^[0-9]{1,2}[mhHdDwWM]$`)
|
|
|
|
// binanceClient is a shared HTTP client for proxying Binance API requests.
|
|
// Reused across requests to benefit from connection pooling.
|
|
var binanceClient = &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
Transport: &http.Transport{
|
|
MaxIdleConns: 20,
|
|
MaxIdleConnsPerHost: 10,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
},
|
|
}
|
|
|
|
// WebHandler provides HTTP endpoints for the NOFXi agent.
|
|
type WebHandler struct {
|
|
agent *Agent
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewWebHandler(agent *Agent, logger *slog.Logger) *WebHandler {
|
|
return &WebHandler{agent: agent, logger: logger}
|
|
}
|
|
|
|
// HandleHealth handles GET /api/agent/health.
|
|
func (w *WebHandler) HandleHealth(rw http.ResponseWriter, r *http.Request) {
|
|
writeJSON(rw, 200, map[string]string{"status": "ok", "agent": "NOFXi", "time": time.Now().Format(time.RFC3339)})
|
|
}
|
|
|
|
// HandleChat handles POST /api/agent/chat.
|
|
func (w *WebHandler) HandleChat(rw http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(rw, "method not allowed", 405)
|
|
return
|
|
}
|
|
var req struct {
|
|
Message string `json:"message"`
|
|
UserID int64 `json:"user_id"`
|
|
UserKey string `json:"user_key"`
|
|
Lang string `json:"lang"`
|
|
}
|
|
// Limit request body to 64KB to prevent abuse
|
|
if err := json.NewDecoder(io.LimitReader(r.Body, 64*1024)).Decode(&req); err != nil {
|
|
writeJSON(rw, 400, map[string]string{"error": "invalid request"})
|
|
return
|
|
}
|
|
if req.Message == "" {
|
|
writeJSON(rw, 400, map[string]string{"error": "message required"})
|
|
return
|
|
}
|
|
if req.UserID == 0 {
|
|
req.UserID = SessionUserIDFromKey(req.UserKey)
|
|
}
|
|
msg := req.Message
|
|
if req.Lang != "" {
|
|
msg = "[lang:" + req.Lang + "] " + msg
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 55*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := w.agent.HandleMessageForStoreUser(ctx, storeUserIDFromContext(r.Context()), req.UserID, msg)
|
|
if err != nil {
|
|
w.logger.Error("agent HandleMessage failed", "error", err, "user_id", req.UserID)
|
|
writeJSON(rw, 500, map[string]string{"error": "Failed to process message. Please try again."})
|
|
return
|
|
}
|
|
writeJSON(rw, 200, map[string]string{"response": resp})
|
|
}
|
|
|
|
// HandleChatStream handles POST /api/agent/chat/stream — SSE streaming chat.
|
|
// Sends server-sent events with types including planning, plan, step_start,
|
|
// step_complete, replan, tool, delta, done, error.
|
|
func (w *WebHandler) HandleChatStream(rw http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(rw, "method not allowed", 405)
|
|
return
|
|
}
|
|
var req struct {
|
|
Message string `json:"message"`
|
|
UserID int64 `json:"user_id"`
|
|
UserKey string `json:"user_key"`
|
|
Lang string `json:"lang"`
|
|
}
|
|
if err := json.NewDecoder(io.LimitReader(r.Body, 64*1024)).Decode(&req); err != nil {
|
|
writeJSON(rw, 400, map[string]string{"error": "invalid request"})
|
|
return
|
|
}
|
|
if req.Message == "" {
|
|
writeJSON(rw, 400, map[string]string{"error": "message required"})
|
|
return
|
|
}
|
|
if req.UserID == 0 {
|
|
req.UserID = SessionUserIDFromKey(req.UserKey)
|
|
}
|
|
msg := req.Message
|
|
if req.Lang != "" {
|
|
msg = "[lang:" + req.Lang + "] " + msg
|
|
}
|
|
|
|
// Set SSE headers
|
|
rw.Header().Set("Content-Type", "text/event-stream")
|
|
rw.Header().Set("Cache-Control", "no-cache")
|
|
rw.Header().Set("Connection", "keep-alive")
|
|
rw.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
|
|
rw.WriteHeader(200)
|
|
|
|
flusher, ok := rw.(http.Flusher)
|
|
if !ok {
|
|
writeSSE(rw, nil, "error", "streaming not supported")
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := w.agent.HandleMessageStreamForStoreUser(ctx, storeUserIDFromContext(r.Context()), req.UserID, msg, func(event, data string) {
|
|
writeSSE(rw, flusher, event, data)
|
|
})
|
|
if err != nil {
|
|
w.logger.Error("agent HandleMessageStream failed", "error", err, "user_id", req.UserID)
|
|
writeSSE(rw, flusher, "error", "Failed to process message. Please try again.")
|
|
return
|
|
}
|
|
// Send final done event with complete response
|
|
writeSSE(rw, flusher, "done", resp)
|
|
}
|
|
|
|
// writeSSE writes a single SSE event.
|
|
func writeSSE(w http.ResponseWriter, flusher http.Flusher, event, data string) {
|
|
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, sseEscape(data))
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
|
|
// sseEscape escapes newlines in SSE data (each line needs a "data: " prefix).
|
|
func sseEscape(s string) string {
|
|
// SSE spec: multi-line data uses multiple "data:" lines
|
|
// But we use JSON encoding to avoid this complexity
|
|
b, _ := json.Marshal(s)
|
|
return string(b)
|
|
}
|
|
|
|
// HandleKlines proxies kline data from Binance.
|
|
func (w *WebHandler) HandleKlines(rw http.ResponseWriter, r *http.Request) {
|
|
symbol := r.URL.Query().Get("symbol")
|
|
if symbol == "" {
|
|
symbol = "BTCUSDT"
|
|
}
|
|
interval := r.URL.Query().Get("interval")
|
|
if interval == "" {
|
|
interval = "1h"
|
|
}
|
|
|
|
if !validSymbolRe.MatchString(symbol) {
|
|
writeJSON(rw, 400, map[string]string{"error": "invalid symbol"})
|
|
return
|
|
}
|
|
if !validIntervalRe.MatchString(interval) {
|
|
writeJSON(rw, 400, map[string]string{"error": "invalid interval"})
|
|
return
|
|
}
|
|
|
|
proxyBinance(rw, r.Context(), fmt.Sprintf("https://fapi.binance.com/fapi/v1/klines?symbol=%s&interval=%s&limit=300", symbol, interval))
|
|
}
|
|
|
|
// HandleTicker proxies ticker data from Binance.
|
|
func (w *WebHandler) HandleTicker(rw http.ResponseWriter, r *http.Request) {
|
|
symbol := r.URL.Query().Get("symbol")
|
|
if symbol == "" {
|
|
symbol = "BTCUSDT"
|
|
}
|
|
|
|
if !validSymbolRe.MatchString(symbol) {
|
|
writeJSON(rw, 400, map[string]string{"error": "invalid symbol"})
|
|
return
|
|
}
|
|
|
|
proxyBinance(rw, r.Context(), fmt.Sprintf("https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=%s", symbol))
|
|
}
|
|
|
|
// HandleTickers handles GET /api/agent/tickers?symbols=BTCUSDT,ETHUSDT,SOLUSDT
|
|
// Batch endpoint: fetches multiple tickers concurrently, returns array.
|
|
func (w *WebHandler) HandleTickers(rw http.ResponseWriter, r *http.Request) {
|
|
symbolsParam := r.URL.Query().Get("symbols")
|
|
if symbolsParam == "" {
|
|
symbolsParam = "BTCUSDT,ETHUSDT,SOLUSDT"
|
|
}
|
|
|
|
// Validate symbols
|
|
var symbols []string
|
|
for _, s := range splitComma(symbolsParam) {
|
|
if validSymbolRe.MatchString(s) {
|
|
symbols = append(symbols, s)
|
|
}
|
|
}
|
|
if len(symbols) == 0 {
|
|
writeJSON(rw, 400, map[string]string{"error": "no valid symbols"})
|
|
return
|
|
}
|
|
if len(symbols) > 20 {
|
|
writeJSON(rw, 400, map[string]string{"error": "max 20 symbols"})
|
|
return
|
|
}
|
|
|
|
// Fetch all tickers concurrently with context propagation
|
|
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
type result struct {
|
|
idx int
|
|
data json.RawMessage
|
|
}
|
|
results := make(chan result, len(symbols))
|
|
for i, sym := range symbols {
|
|
idx, s := i, sym
|
|
safe.GoNamed("ticker-fetch-"+s, func() {
|
|
req, err := http.NewRequestWithContext(ctx, "GET",
|
|
fmt.Sprintf("https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=%s", s), nil)
|
|
if err != nil {
|
|
results <- result{idx: idx}
|
|
return
|
|
}
|
|
resp, err := binanceClient.Do(req)
|
|
if err != nil {
|
|
results <- result{idx: idx}
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
results <- result{idx: idx}
|
|
return
|
|
}
|
|
body, err := safe.ReadAllLimited(resp.Body, 16*1024)
|
|
if err != nil {
|
|
results <- result{idx: idx}
|
|
return
|
|
}
|
|
results <- result{idx: idx, data: body}
|
|
})
|
|
}
|
|
|
|
// Collect results in order
|
|
ordered := make([]json.RawMessage, len(symbols))
|
|
for range symbols {
|
|
r := <-results
|
|
if r.data != nil {
|
|
ordered[r.idx] = r.data
|
|
}
|
|
}
|
|
|
|
// Filter out nil entries and write response
|
|
out := make([]json.RawMessage, 0, len(ordered))
|
|
for _, d := range ordered {
|
|
if d != nil {
|
|
out = append(out, d)
|
|
}
|
|
}
|
|
rw.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(rw).Encode(out)
|
|
}
|
|
|
|
// commaRe is pre-compiled for splitComma — avoids recompiling on every call.
|
|
var commaRe = regexp.MustCompile(`\s*,\s*`)
|
|
|
|
// splitComma splits a comma-separated string, trims whitespace, skips empty.
|
|
func splitComma(s string) []string {
|
|
var parts []string
|
|
for _, p := range commaRe.Split(s, -1) {
|
|
if p != "" {
|
|
parts = append(parts, p)
|
|
}
|
|
}
|
|
return parts
|
|
}
|
|
|
|
func proxyBinance(rw http.ResponseWriter, ctx context.Context, url string) {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
|
if err != nil {
|
|
writeJSON(rw, 500, map[string]string{"error": "failed to create request"})
|
|
return
|
|
}
|
|
resp, err := binanceClient.Do(req)
|
|
if err != nil {
|
|
// Distinguish client cancellation from upstream failures
|
|
if ctx.Err() != nil {
|
|
return // Client disconnected, no point writing response
|
|
}
|
|
writeJSON(rw, 502, map[string]string{"error": "upstream request failed"})
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Forward upstream error status codes instead of silently proxying bad data
|
|
if resp.StatusCode != http.StatusOK {
|
|
writeJSON(rw, 502, map[string]string{"error": fmt.Sprintf("upstream returned status %d", resp.StatusCode)})
|
|
return
|
|
}
|
|
|
|
rw.Header().Set("Content-Type", "application/json")
|
|
// CORS is handled by the gin middleware — no need to set it here
|
|
// Limit response body to 2MB to prevent memory exhaustion
|
|
io.Copy(rw, io.LimitReader(resp.Body, 2*1024*1024))
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
// CORS is handled by the gin middleware — no need to set it here
|
|
w.WriteHeader(status)
|
|
json.NewEncoder(w).Encode(v)
|
|
}
|