From c9f9f4318e390e2a99d07c2dfcb2e10e0e73eec3 Mon Sep 17 00:00:00 2001 From: Yinghao Fan Date: Fri, 31 Oct 2025 02:06:20 +0800 Subject: [PATCH 1/9] fix: Correct error handling in decision parsing Changes: - Updated error handling in `GetFullDecision` and `parseFullDecisionResponse` functions to return the decision object even when an error occurs, improving the clarity of error messages. This ensures that the decision object is consistently returned, allowing for better debugging and handling of errors in the decision-making process. --- decision/engine.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/decision/engine.go b/decision/engine.go index 76bcffca..97181572 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -109,7 +109,7 @@ func GetFullDecision(ctx *Context, mcpClient *mcp.Client) (*FullDecision, error) // 4. 解析AI响应 decision, err := parseFullDecisionResponse(aiResponse, ctx.Account.TotalEquity, ctx.BTCETHLeverage, ctx.AltcoinLeverage) if err != nil { - return nil, fmt.Errorf("解析AI响应失败: %w", err) + return decision, fmt.Errorf("解析AI响应失败: %w", err) } decision.Timestamp = time.Now() @@ -427,7 +427,7 @@ func parseFullDecisionResponse(aiResponse string, accountEquity float64, btcEthL return &FullDecision{ CoTTrace: cotTrace, Decisions: []Decision{}, - }, fmt.Errorf("提取决策失败: %w\n\n=== AI思维链分析 ===\n%s", err, cotTrace) + }, fmt.Errorf("提取决策失败: %w", err) } // 3. 验证决策 @@ -435,7 +435,7 @@ func parseFullDecisionResponse(aiResponse string, accountEquity float64, btcEthL return &FullDecision{ CoTTrace: cotTrace, Decisions: decisions, - }, fmt.Errorf("决策验证失败: %w\n\n=== AI思维链分析 ===\n%s", err, cotTrace) + }, fmt.Errorf("决策验证失败: %w", err) } return &FullDecision{ From f6539eb7505708ebd5c9cbfcb3c07fbbac71fa85 Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sat, 1 Nov 2025 15:58:54 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=86=85=E7=BD=AEAI?= =?UTF-8?q?=E8=AF=84=E5=88=86=20=E4=BF=AE=E6=94=B9market/data.go=20Get?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E8=8E=B7=E5=8F=96K=E7=BA=BF=E4=B8=BA?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=8E=B7=E5=8F=96(=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BC=A0=E5=85=A5=E5=B8=81=E7=A7=8D=E6=AF=94?= =?UTF-8?q?=E8=BE=83=E5=A4=9A=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B=E8=80=97?= =?UTF-8?q?=E6=97=B6=E9=97=AE=E9=A2=98)=20market=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E4=B8=8B=E6=96=B0=E5=A2=9E=E6=96=87=E4=BB=B6=20main.go=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=BF=90=E8=A1=8C=E5=85=A5=E5=8F=A3=20?= =?UTF-8?q?=E9=80=9A=E8=BF=87inside=5Fcoins=3Dtrue=E6=8E=A7=E5=88=B6=20?= =?UTF-8?q?=E8=AF=A5=E8=AF=84=E5=88=86=E9=BB=98=E8=AE=A4=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=A4=A7=E7=BA=A6=E9=9C=80=E8=A6=812=E5=88=86?= =?UTF-8?q?=E9=92=9F=E5=B7=A6=E5=8F=B3(=E5=9B=A0=E4=B8=BA=E5=B8=81?= =?UTF-8?q?=E7=A7=8D=E5=88=97=E8=A1=A8=E6=AF=94=E8=BE=83=E5=A4=9A=EF=BC=8C?= =?UTF-8?q?api=E6=9C=89=E9=99=90=E9=80=9F)=20=E4=BD=BF=E7=94=A8=E6=97=B6?= =?UTF-8?q?=E5=BA=94=E8=AF=A5=E6=B3=A8=E6=84=8Fengine.go=E4=B8=8B=E7=9A=84?= =?UTF-8?q?=E6=B5=81=E5=8A=A8=E6=80=A7=E8=BF=87=E6=BB=A4=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json.example | 1 + config/config.go | 3 +- main.go | 81 +++--- market/api_client.go | 150 +++++++++++ market/combined_streams.go | 202 ++++++++++++++ market/data.go | 105 +------- market/feature_engine.go | 229 ++++++++++++++++ market/monitor.go | 526 +++++++++++++++++++++++++++++++++++++ market/types.go | 157 +++++++++++ market/websocket_client.go | 231 ++++++++++++++++ 10 files changed, 1550 insertions(+), 135 deletions(-) create mode 100644 market/api_client.go create mode 100644 market/combined_streams.go create mode 100644 market/feature_engine.go create mode 100644 market/monitor.go create mode 100644 market/types.go create mode 100644 market/websocket_client.go diff --git a/config.json.example b/config.json.example index ac9d5ac6..87b01edd 100644 --- a/config.json.example +++ b/config.json.example @@ -5,6 +5,7 @@ "altcoin_leverage": 5 }, "use_default_coins": true, + "inside_coins": true, "default_coins": [ "BTCUSDT", "ETHUSDT", diff --git a/config/config.go b/config/config.go index 97fcc84d..3b736d0e 100644 --- a/config/config.go +++ b/config/config.go @@ -11,7 +11,7 @@ import ( type TraderConfig struct { ID string `json:"id"` Name string `json:"name"` - Enabled bool `json:"enabled"` // 是否启用该trader + Enabled bool `json:"enabled"` // 是否启用该trader AIModel string `json:"ai_model"` // "qwen" or "deepseek" // 交易平台选择(二选一) @@ -54,6 +54,7 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 + InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/main.go b/main.go index 1d9631a9..7929072b 100644 --- a/main.go +++ b/main.go @@ -8,12 +8,14 @@ import ( "nofx/auth" "nofx/config" "nofx/manager" + "nofx/market" "nofx/pool" "os" "os/signal" "strconv" "strings" "syscall" + "time" ) // LeverageConfig 杠杆配置 @@ -28,6 +30,7 @@ type ConfigFile struct { APIServerPort int `json:"api_server_port"` UseDefaultCoins bool `json:"use_default_coins"` DefaultCoins []string `json:"default_coins"` + InsideCoins bool `json:"inside_coins"` CoinPoolAPIURL string `json:"coin_pool_api_url"` OITopAPIURL string `json:"oi_top_api_url"` MaxDailyLoss float64 `json:"max_daily_loss"` @@ -35,6 +38,7 @@ type ConfigFile struct { StopTradingMinutes int `json:"stop_trading_minutes"` Leverage LeverageConfig `json:"leverage"` JWTSecret string `json:"jwt_secret"` + DataKLineTime string `json:"data_k_line_time"` } // syncConfigToDatabase 从config.json读取配置并同步到数据库 @@ -61,14 +65,15 @@ func syncConfigToDatabase(database *config.Database) error { // 同步各配置项到数据库 configs := map[string]string{ - "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), - "api_server_port": strconv.Itoa(configFile.APIServerPort), - "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), - "coin_pool_api_url": configFile.CoinPoolAPIURL, - "oi_top_api_url": configFile.OITopAPIURL, - "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), - "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), - "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), + "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), + "api_server_port": strconv.Itoa(configFile.APIServerPort), + "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), + "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), + "coin_pool_api_url": configFile.CoinPoolAPIURL, + "oi_top_api_url": configFile.OITopAPIURL, + "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), + "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), + "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), } // 同步default_coins(转换为JSON字符串存储) @@ -132,12 +137,14 @@ func main() { // 获取系统配置 useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" + InsideCoinsStr, _ := database.GetSystemConfig("inside_coins") + insideCoins := InsideCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") - + // 获取管理员模式配置 adminModeStr, _ := database.GetSystemConfig("admin_mode") adminMode := adminModeStr != "false" // 默认为true - + // 设置JWT密钥 jwtSecret, _ := database.GetSystemConfig("jwt_secret") if jwtSecret == "" { @@ -145,7 +152,7 @@ func main() { log.Printf("⚠️ 使用默认JWT密钥,建议在生产环境中配置") } auth.SetJWTSecret(jwtSecret) - + // 在管理员模式下,确保admin用户存在 if adminMode { err := database.EnsureAdminUser() @@ -156,7 +163,7 @@ func main() { } auth.SetAdminMode(true) } - + log.Printf("✓ 配置数据库初始化成功") fmt.Println() @@ -180,6 +187,25 @@ func main() { pool.SetDefaultCoins(defaultCoins) + //内置AI评分 + if insideCoins { + log.Printf("✓ 启用内置AI评分币种列表") + monitor := market.NewWSMonitor(150) + go func() { + monitor.Start() + // 定时器设置默认的币种列表 - 覆蓋defaultCoins设置 + for { + if len(monitor.FilterSymbol) > 0 { + for _, coin := range defaultCoins { + monitor.FilterSymbol = append(monitor.FilterSymbol, coin) + } + pool.SetDefaultCoins(monitor.FilterSymbol) + monitor.FilterSymbol = nil + } + time.Sleep(1 * time.Minute) + } + }() + } // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -192,7 +218,7 @@ func main() { pool.SetCoinPoolAPI(coinPoolAPIURL) log.Printf("✓ 已配置AI500币种池API") } - + oiTopAPIURL, _ := database.GetSystemConfig("oi_top_api_url") if oiTopAPIURL != "" { pool.SetOITopAPI(oiTopAPIURL) @@ -208,37 +234,26 @@ func main() { log.Fatalf("❌ 加载交易员失败: %v", err) } - // 获取所有用户的交易员配置(用于显示) - userIDs, err := database.GetAllUsers() + // 获取数据库中的所有交易员配置(用于显示,使用default用户) + traders, err := database.GetTraders("default") if err != nil { - log.Printf("⚠️ 获取用户列表失败: %v", err) - userIDs = []string{"default"} // 回退到default用户 - } - - var allTraders []*config.TraderRecord - for _, userID := range userIDs { - traders, err := database.GetTraders(userID) - if err != nil { - log.Printf("⚠️ 获取用户 %s 的交易员失败: %v", userID, err) - continue - } - allTraders = append(allTraders, traders...) + log.Fatalf("❌ 获取交易员列表失败: %v", err) } // 显示加载的交易员信息 fmt.Println() fmt.Println("🤖 数据库中的AI交易员配置:") - if len(allTraders) == 0 { + if len(traders) == 0 { fmt.Println(" • 暂无配置的交易员,请通过Web界面创建") } else { - for _, trader := range allTraders { + for _, trader := range traders { status := "停止" if trader.IsRunning { status = "运行中" } - fmt.Printf(" • %s (%s + %s) - 用户: %s - 初始资金: %.0f USDT [%s]\n", - trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), - trader.UserID, trader.InitialBalance, status) + fmt.Printf(" • %s (%s + %s) - 初始资金: %.0f USDT [%s]\n", + trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), + trader.InitialBalance, status) } } @@ -256,7 +271,7 @@ func main() { fmt.Println() // 获取API服务器端口 - apiPort := 8080 // 默认端口 + apiPort := 8080 // 默认端口 if apiPortStr != "" { if port, err := strconv.Atoi(apiPortStr); err == nil { apiPort = port diff --git a/market/api_client.go b/market/api_client.go new file mode 100644 index 00000000..70bb1150 --- /dev/null +++ b/market/api_client.go @@ -0,0 +1,150 @@ +package market + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strconv" + "time" +) + +const ( + baseURL = "https://fapi.binance.com" +) + +type APIClient struct { + client *http.Client +} + +func NewAPIClient() *APIClient { + return &APIClient{ + client: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (c *APIClient) GetExchangeInfo() (*ExchangeInfo, error) { + url := fmt.Sprintf("%s/fapi/v1/exchangeInfo", baseURL) + resp, err := c.client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var exchangeInfo ExchangeInfo + err = json.Unmarshal(body, &exchangeInfo) + if err != nil { + return nil, err + } + + return &exchangeInfo, nil +} + +func (c *APIClient) GetKlines(symbol, interval string, limit int) ([]Kline, error) { + url := fmt.Sprintf("%s/fapi/v1/klines", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + q.Add("interval", interval) + q.Add("limit", strconv.Itoa(limit)) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var klineResponses []KlineResponse + err = json.Unmarshal(body, &klineResponses) + if err != nil { + return nil, err + } + + var klines []Kline + for _, kr := range klineResponses { + kline, err := parseKline(kr) + if err != nil { + log.Printf("解析K线数据失败: %v", err) + continue + } + klines = append(klines, kline) + } + + return klines, nil +} + +func parseKline(kr KlineResponse) (Kline, error) { + var kline Kline + + if len(kr) < 11 { + return kline, fmt.Errorf("invalid kline data") + } + + // 解析各个字段 + kline.OpenTime = int64(kr[0].(float64)) + kline.Open, _ = strconv.ParseFloat(kr[1].(string), 64) + kline.High, _ = strconv.ParseFloat(kr[2].(string), 64) + kline.Low, _ = strconv.ParseFloat(kr[3].(string), 64) + kline.Close, _ = strconv.ParseFloat(kr[4].(string), 64) + kline.Volume, _ = strconv.ParseFloat(kr[5].(string), 64) + kline.CloseTime = int64(kr[6].(float64)) + kline.QuoteVolume, _ = strconv.ParseFloat(kr[7].(string), 64) + kline.Trades = int(kr[8].(float64)) + kline.TakerBuyBaseVolume, _ = strconv.ParseFloat(kr[9].(string), 64) + kline.TakerBuyQuoteVolume, _ = strconv.ParseFloat(kr[10].(string), 64) + + return kline, nil +} + +func (c *APIClient) GetCurrentPrice(symbol string) (float64, error) { + url := fmt.Sprintf("%s/fapi/v1/ticker/price", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var ticker PriceTicker + err = json.Unmarshal(body, &ticker) + if err != nil { + return 0, err + } + + price, err := strconv.ParseFloat(ticker.Price, 64) + if err != nil { + return 0, err + } + + return price, nil +} diff --git a/market/combined_streams.go b/market/combined_streams.go new file mode 100644 index 00000000..801d423e --- /dev/null +++ b/market/combined_streams.go @@ -0,0 +1,202 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type CombinedStreamsClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} + batchSize int // 每批订阅的流数量 +} + +func NewCombinedStreamsClient(batchSize int) *CombinedStreamsClient { + return &CombinedStreamsClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + batchSize: batchSize, + } +} + +func (c *CombinedStreamsClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + // 组合流使用不同的端点 + conn, _, err := dialer.Dial("wss://fstream.binance.com/stream", nil) + if err != nil { + return fmt.Errorf("组合流WebSocket连接失败: %v", err) + } + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + log.Println("组合流WebSocket连接成功") + go c.readMessages() + + return nil +} + +// BatchSubscribeKlines 批量订阅K线 +func (c *CombinedStreamsClient) BatchSubscribeKlines(symbols []string, interval string) error { + // 将symbols分批处理 + batches := c.splitIntoBatches(symbols, c.batchSize) + + for i, batch := range batches { + log.Printf("订阅第 %d 批, 数量: %d", i+1, len(batch)) + + streams := make([]string, len(batch)) + for j, symbol := range batch { + streams[j] = fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), interval) + } + + if err := c.subscribeStreams(streams); err != nil { + return fmt.Errorf("第 %d 批订阅失败: %v", i+1, err) + } + + // 批次间延迟,避免被限制 + if i < len(batches)-1 { + time.Sleep(100 * time.Millisecond) + } + } + + return nil +} + +// splitIntoBatches 将切片分成指定大小的批次 +func (c *CombinedStreamsClient) splitIntoBatches(symbols []string, batchSize int) [][]string { + var batches [][]string + + for i := 0; i < len(symbols); i += batchSize { + end := i + batchSize + if end > len(symbols) { + end = len(symbols) + } + batches = append(batches, symbols[i:end]) + } + + return batches +} + +// subscribeStreams 订阅多个流 +func (c *CombinedStreamsClient) subscribeStreams(streams []string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": streams, + "id": time.Now().UnixNano(), + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if c.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + log.Printf("订阅流: %v", streams) + return c.conn.WriteJSON(subscribeMsg) +} + +func (c *CombinedStreamsClient) readMessages() { + for { + select { + case <-c.done: + return + default: + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取组合流消息失败: %v", err) + c.handleReconnect() + return + } + + c.handleCombinedMessage(message) + } + } +} + +func (c *CombinedStreamsClient) handleCombinedMessage(message []byte) { + var combinedMsg struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` + } + + if err := json.Unmarshal(message, &combinedMsg); err != nil { + log.Printf("解析组合消息失败: %v", err) + return + } + + c.mu.RLock() + ch, exists := c.subscribers[combinedMsg.Stream] + c.mu.RUnlock() + + if exists { + select { + case ch <- combinedMsg.Data: + default: + log.Printf("订阅者通道已满: %s", combinedMsg.Stream) + } + } +} + +func (c *CombinedStreamsClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + c.mu.Lock() + c.subscribers[stream] = ch + c.mu.Unlock() + return ch +} + +func (c *CombinedStreamsClient) handleReconnect() { + if !c.reconnect { + return + } + + log.Println("组合流尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := c.Connect(); err != nil { + log.Printf("组合流重新连接失败: %v", err) + go c.handleReconnect() + } +} + +func (c *CombinedStreamsClient) Close() { + c.reconnect = false + close(c.done) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + for stream, ch := range c.subscribers { + close(ch) + delete(c.subscribers, stream) + } +} diff --git a/market/data.go b/market/data.go index 97812e64..cd40be75 100644 --- a/market/data.go +++ b/market/data.go @@ -10,72 +10,20 @@ import ( "strings" ) -// Data 市场数据结构 -type Data struct { - Symbol string - CurrentPrice float64 - PriceChange1h float64 // 1小时价格变化百分比 - PriceChange4h float64 // 4小时价格变化百分比 - CurrentEMA20 float64 - CurrentMACD float64 - CurrentRSI7 float64 - OpenInterest *OIData - FundingRate float64 - IntradaySeries *IntradayData - LongerTermContext *LongerTermData -} - -// OIData Open Interest数据 -type OIData struct { - Latest float64 - Average float64 -} - -// IntradayData 日内数据(3分钟间隔) -type IntradayData struct { - MidPrices []float64 - EMA20Values []float64 - MACDValues []float64 - RSI7Values []float64 - RSI14Values []float64 -} - -// LongerTermData 长期数据(4小时时间框架) -type LongerTermData struct { - EMA20 float64 - EMA50 float64 - ATR3 float64 - ATR14 float64 - CurrentVolume float64 - AverageVolume float64 - MACDValues []float64 - RSI14Values []float64 -} - -// Kline K线数据 -type Kline struct { - OpenTime int64 - Open float64 - High float64 - Low float64 - Close float64 - Volume float64 - CloseTime int64 -} - // Get 获取指定代币的市场数据 func Get(symbol string) (*Data, error) { + var klines3m, klines4h []Kline + var err error // 标准化symbol symbol = Normalize(symbol) - // 获取3分钟K线数据 (最近10个) - klines3m, err := getKlines(symbol, "3m", 40) // 多获取一些用于计算 + klines3m, err = WSMonitorCli.GetCurrentKlines(symbol, "3m") // 多获取一些用于计算 if err != nil { return nil, fmt.Errorf("获取3分钟K线失败: %v", err) } // 获取4小时K线数据 (最近10个) - klines4h, err := getKlines(symbol, "4h", 60) // 多获取用于计算指标 + klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // 多获取用于计算指标 if err != nil { return nil, fmt.Errorf("获取4小时K线失败: %v", err) } @@ -136,51 +84,6 @@ func Get(symbol string) (*Data, error) { }, nil } -// getKlines 从Binance获取K线数据 -func getKlines(symbol, interval string, limit int) ([]Kline, error) { - url := fmt.Sprintf("https://fapi.binance.com/fapi/v1/klines?symbol=%s&interval=%s&limit=%d", - symbol, interval, limit) - - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - var rawData [][]interface{} - if err := json.Unmarshal(body, &rawData); err != nil { - return nil, err - } - - klines := make([]Kline, len(rawData)) - for i, item := range rawData { - openTime := int64(item[0].(float64)) - open, _ := parseFloat(item[1]) - high, _ := parseFloat(item[2]) - low, _ := parseFloat(item[3]) - close, _ := parseFloat(item[4]) - volume, _ := parseFloat(item[5]) - closeTime := int64(item[6].(float64)) - - klines[i] = Kline{ - OpenTime: openTime, - Open: open, - High: high, - Low: low, - Close: close, - Volume: volume, - CloseTime: closeTime, - } - } - - return klines, nil -} - // calculateEMA 计算EMA func calculateEMA(klines []Kline, period int) float64 { if len(klines) < period { diff --git a/market/feature_engine.go b/market/feature_engine.go new file mode 100644 index 00000000..91540a29 --- /dev/null +++ b/market/feature_engine.go @@ -0,0 +1,229 @@ +package market + +import ( + "fmt" + "math" + "time" +) + +type FeatureEngine struct { + alertThresholds AlertThresholds +} + +func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine { + return &FeatureEngine{ + alertThresholds: thresholds, + } +} + +func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures { + if len(klines) < 20 { + return nil + } + + features := &SymbolFeatures{ + Symbol: symbol, + Timestamp: time.Now(), + } + + // 提取价格和交易量数据 + closes := make([]float64, len(klines)) + volumes := make([]float64, len(klines)) + highs := make([]float64, len(klines)) + lows := make([]float64, len(klines)) + + for i, k := range klines { + closes[i] = k.Close + volumes[i] = k.Volume + highs[i] = k.High + lows[i] = k.Low + } + + // 价格特征 + features.Price = closes[len(closes)-1] + features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2] + + if len(closes) >= 5 { + features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5] + } + if len(closes) >= 17 { + features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17] + } + + // 交易量特征 + currentVolume := volumes[len(volumes)-1] + features.Volume = currentVolume + + // 5周期平均交易量 + if len(volumes) >= 6 { + avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1]) + features.VolumeRatio5 = currentVolume / avgVolume5 + } + + // 20周期平均交易量 + if len(volumes) >= 21 { + avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1]) + features.VolumeRatio20 = currentVolume / avgVolume20 + } + + // 交易量趋势 + if features.VolumeRatio20 > 0 { + features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20 + } + + // 技术指标 + features.RSI14 = e.calculateRSI(closes, 14) + features.SMA5 = e.calculateSMA(closes, 5) + features.SMA10 = e.calculateSMA(closes, 10) + features.SMA20 = e.calculateSMA(closes, 20) + + // 波动特征 + currentHigh := highs[len(highs)-1] + currentLow := lows[len(lows)-1] + features.HighLowRatio = (currentHigh - currentLow) / features.Price + features.Volatility20 = e.calculateVolatility(closes, 20) + + // 价格在区间中的位置 + if currentHigh != currentLow { + features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow) + } else { + features.PositionInRange = 0.5 + } + + return features +} + +func (e *FeatureEngine) calculateAverage(values []float64) float64 { + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} + +func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 { + if len(prices) < period { + return 0 + } + return e.calculateAverage(prices[len(prices)-period:]) +} + +func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 { + if len(prices) <= period { + return 50 + } + + gains := make([]float64, 0) + losses := make([]float64, 0) + + for i := 1; i < len(prices); i++ { + change := prices[i] - prices[i-1] + if change > 0 { + gains = append(gains, change) + losses = append(losses, 0) + } else { + gains = append(gains, 0) + losses = append(losses, -change) + } + } + + // 只取最近period个数据点 + if len(gains) > period { + gains = gains[len(gains)-period:] + losses = losses[len(losses)-period:] + } + + avgGain := e.calculateAverage(gains) + avgLoss := e.calculateAverage(losses) + + if avgLoss == 0 { + return 100 + } + + rs := avgGain / avgLoss + return 100 - (100 / (1 + rs)) +} + +func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 { + if len(prices) < period { + return 0 + } + + periodPrices := prices[len(prices)-period:] + mean := e.calculateAverage(periodPrices) + + variance := 0.0 + for _, price := range periodPrices { + variance += math.Pow(price-mean, 2) + } + variance /= float64(len(periodPrices)) + + return math.Sqrt(variance) / mean +} + +func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert { + var alerts []Alert + + // 交易量放大检测 + if features.VolumeRatio5 > e.alertThresholds.VolumeSpike { + alerts = append(alerts, Alert{ + Type: "VOLUME_SPIKE", + Symbol: features.Symbol, + Value: features.VolumeRatio5, + Threshold: e.alertThresholds.VolumeSpike, + Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5), + Timestamp: time.Now(), + }) + } + + // 15分钟价格异动 + if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min { + direction := "上涨" + if features.PriceChange15Min < 0 { + direction = "下跌" + } + alerts = append(alerts, Alert{ + Type: "PRICE_CHANGE_15MIN", + Symbol: features.Symbol, + Value: features.PriceChange15Min, + Threshold: e.alertThresholds.PriceChange15Min, + Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100), + Timestamp: time.Now(), + }) + } + + // 交易量趋势 + if features.VolumeTrend > e.alertThresholds.VolumeTrend { + alerts = append(alerts, Alert{ + Type: "VOLUME_TREND", + Symbol: features.Symbol, + Value: features.VolumeTrend, + Threshold: e.alertThresholds.VolumeTrend, + Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend), + Timestamp: time.Now(), + }) + } + + // RSI超买超卖 + if features.RSI14 > e.alertThresholds.RSIOverbought { + alerts = append(alerts, Alert{ + Type: "RSI_OVERBOUGHT", + Symbol: features.Symbol, + Value: features.RSI14, + Threshold: e.alertThresholds.RSIOverbought, + Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14), + Timestamp: time.Now(), + }) + } else if features.RSI14 < e.alertThresholds.RSIOversold { + alerts = append(alerts, Alert{ + Type: "RSI_OVERSOLD", + Symbol: features.Symbol, + Value: features.RSI14, + Threshold: e.alertThresholds.RSIOversold, + Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14), + Timestamp: time.Now(), + }) + } + + return alerts +} diff --git a/market/monitor.go b/market/monitor.go new file mode 100644 index 00000000..9837623e --- /dev/null +++ b/market/monitor.go @@ -0,0 +1,526 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "math" + "sort" + "strings" + "sync" + "time" +) + +type WSMonitor struct { + wsClient *WSClient + combinedClient *CombinedStreamsClient + featureEngine *FeatureEngine + symbols []string + featuresMap sync.Map + alertsChan chan Alert + klineDataMap3m sync.Map // 存储每个交易对的K线历史数据 + klineDataMap4h sync.Map // 存储每个交易对的K线历史数据 + tickerDataMap sync.Map // 存储每个交易对的ticker数据 + batchSize int + filterSymbols sync.Map // 使用sync.Map来存储需要监控的币种和其状态 + symbolStats sync.Map // 存储币种统计信息 + FilterSymbol []string //经过筛选的币种 +} +type SymbolStats struct { + LastActiveTime time.Time + AlertCount int + VolumeSpikeCount int + LastAlertTime time.Time + Score float64 // 综合评分 +} + +var WSMonitorCli *WSMonitor + +func NewWSMonitor(batchSize int) *WSMonitor { + WSMonitorCli = &WSMonitor{ + wsClient: NewWSClient(), + combinedClient: NewCombinedStreamsClient(batchSize), + featureEngine: NewFeatureEngine(config.AlertThresholds), + alertsChan: make(chan Alert, 1000), + batchSize: batchSize, + } + return WSMonitorCli +} + +func (m *WSMonitor) Initialize() error { + log.Println("初始化WebSocket监控器...") + + // 获取交易对信息 + apiClient := NewAPIClient() + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" { + m.symbols = append(m.symbols, Normalize(symbol.Symbol)) + } + } + log.Printf("找到 %d 个交易对", len(m.symbols)) + // 初始化历史数据 + if err := m.initializeHistoricalData(); err != nil { + log.Printf("初始化历史数据失败: %v", err) + } + + return nil +} + +func (m *WSMonitor) initializeHistoricalData() error { + apiClient := NewAPIClient() + + var wg sync.WaitGroup + semaphore := make(chan struct{}, 5) // 限制并发数 + + for _, symbol := range m.symbols { + wg.Add(1) + semaphore <- struct{}{} + + go func(s string) { + defer wg.Done() + defer func() { <-semaphore }() + + // 获取历史K线数据 + klines, err := apiClient.GetKlines(s, "3m", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines) > 0 { + m.klineDataMap3m.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-3m: %d 条", s, len(klines)) + } + // 获取历史K线数据 + klines4h, err := apiClient.GetKlines(s, "4h", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines4h) > 0 { + m.klineDataMap4h.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-4h: %d 条", s, len(klines)) + } + }(symbol) + } + + wg.Wait() + return nil +} + +func (m *WSMonitor) Start() { + log.Printf("启动WebSocket实时监控...") + // 初始化交易对 + err := m.Initialize() + if err != nil { + log.Fatalf("❌ 初始化币种: %v", err) + return + } + + err = m.combinedClient.Connect() + if err != nil { + log.Fatalf("❌ 批量订阅流: %v", err) + return + } + // 启动警报处理器 + go m.handleAlerts() + // 启动定期清理任务 + go m.cleanupInactiveSymbols() + // 输出监控统计 - 评分前十名 + go m.printFilterStats(50) + // 订阅所有交易对 + err = m.subscribeAll() + + if err != nil { + log.Fatalf("❌ 订阅币种交易对: %v", err) + return + } +} + +func (m *WSMonitor) subscribeAll() error { + // 执行批量订阅 + log.Println("开始订阅所有交易对...") + for _, symbol := range m.symbols { + stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) + ch3m := m.combinedClient.AddSubscriber(stream3m, 100) + go m.handleKlineData(symbol, ch3m, "3m") + + stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol)) + ch4h := m.combinedClient.AddSubscriber(stream4h, 100) + go m.handleKlineData(symbol, ch4h, "4h") + } + + err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } + err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h") + if err != nil { + log.Fatalf("❌ 订阅4h K线: %v", err) + return err + } + log.Println("所有交易对订阅完成") + return nil +} + +func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time string) { + for data := range ch { + var klineData KlineWSData + if err := json.Unmarshal(data, &klineData); err != nil { + log.Printf("解析Kline数据失败: %v", err) + continue + } + m.processKlineUpdate(symbol, klineData, _time) + } +} + +func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) { + for data := range ch { + var tickerData TickerWSData + if err := json.Unmarshal(data, &tickerData); err != nil { + log.Printf("解析Ticker数据失败: %v", err) + continue + } + + m.processTickerUpdate(symbol, tickerData) + } +} +func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) { + for data := range ch { + var tickerData []TickerWSData + if err := json.Unmarshal(data, &tickerData); err != nil { + log.Printf("解析Ticker数据失败: %v", err) + continue + } + log.Fatalln(tickerData) + //m.processTickerUpdate(symbol, tickerData) + } +} +func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { + var klineDataMap *sync.Map + if _time == "3m" { + klineDataMap = &m.klineDataMap3m + } else { + klineDataMap = &m.klineDataMap4h + } + return klineDataMap +} +func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time string) { + // 转换WebSocket数据为Kline结构 + kline := Kline{ + OpenTime: wsData.Kline.StartTime, + CloseTime: wsData.Kline.CloseTime, + Trades: wsData.Kline.NumberOfTrades, + } + kline.Open, _ = parseFloat(wsData.Kline.OpenPrice) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.Low, _ = parseFloat(wsData.Kline.LowPrice) + kline.Close, _ = parseFloat(wsData.Kline.ClosePrice) + kline.Volume, _ = parseFloat(wsData.Kline.Volume) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.QuoteVolume, _ = parseFloat(wsData.Kline.QuoteVolume) + kline.TakerBuyBaseVolume, _ = parseFloat(wsData.Kline.TakerBuyBaseVolume) + kline.TakerBuyQuoteVolume, _ = parseFloat(wsData.Kline.TakerBuyQuoteVolume) + // 更新K线数据 + var klineDataMap = m.getKlineDataMap(_time) + value, exists := klineDataMap.Load(symbol) + var klines []Kline + if exists { + klines = value.([]Kline) + + // 检查是否是新的K线 + if len(klines) > 0 && klines[len(klines)-1].OpenTime == kline.OpenTime { + // 更新当前K线 + klines[len(klines)-1] = kline + } else { + // 添加新K线 + klines = append(klines, kline) + + // 保持数据长度 + if len(klines) > 100 { + klines = klines[1:] + } + } + } else { + klines = []Kline{kline} + } + + klineDataMap.Store(symbol, klines) + // 计算特征并检测警报 + if len(klines) >= 20 { + features := m.featureEngine.CalculateFeatures(symbol, klines) + if features != nil { + m.featuresMap.Store(symbol, features) + + alerts := m.featureEngine.DetectAlerts(features) + hasAlert := len(alerts) > 0 + + // 更新统计信息 + m.updateSymbolStats(symbol, features, hasAlert) + + for _, alert := range alerts { + m.alertsChan <- alert + } + + // 实时日志输出重要特征 + if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 { + //log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f", + // symbol, features.Price, features.PriceChange15Min*100, + // features.VolumeRatio5, features.RSI14) + } + } + } +} + +func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) { + // 存储ticker数据 + m.tickerDataMap.Store(symbol, tickerData) +} + +func (m *WSMonitor) handleAlerts() { + alertCounts := make(map[string]int) + lastReset := time.Now() + + for alert := range m.alertsChan { + // 重置计数器(每小时) + if time.Since(lastReset) > time.Hour { + alertCounts = make(map[string]int) + lastReset = time.Now() + } + + // 警报去重和频率控制 + alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type) + alertCounts[alertKey]++ + m.filterSymbols.Store(alert.Symbol, true) + + //log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message) + if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报 + //log.Printf("🚨 实时警报: %s", alert.Message) + + // 这里可以添加其他警报处理逻辑 + } + } +} + +func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + value, exists := m.getKlineDataMap(_time).Load(symbol) + if !exists { + // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) + apiClient := NewAPIClient() + klines, err := apiClient.GetKlines(symbol, _time, 40) + if err != nil { + return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) + } + return klines, fmt.Errorf("symbol不存在") + } + return value.([]Kline), nil +} + +func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) { + value, exists := m.featuresMap.Load(symbol) + if !exists { + return nil, false + } + return value.(*SymbolFeatures), true +} + +func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures { + features := make(map[string]*SymbolFeatures) + m.featuresMap.Range(func(key, value interface{}) bool { + features[key.(string)] = value.(*SymbolFeatures) + return true + }) + return features +} + +func (m *WSMonitor) Close() { + m.wsClient.Close() + close(m.alertsChan) +} +func (m *WSMonitor) printFilterStats(nember int) { + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + var monitoredSymbols []string + m.filterSymbols.Range(func(key, value interface{}) bool { + monitoredSymbols = append(monitoredSymbols, key.(string)) + return true + }) + + log.Printf("🎯 监控统计 - 总数: %d, 币种: %v", + len(monitoredSymbols), monitoredSymbols) + + // 打印前5个评分最高的币种 + type symbolScore struct { + symbol string + score float64 + } + var topScores []symbolScore + + m.symbolStats.Range(func(key, value interface{}) bool { + symbol := key.(string) + stats := value.(*SymbolStats) + topScores = append(topScores, symbolScore{symbol, stats.Score}) + return true + }) + + // 按评分排序 + sort.Slice(topScores, func(i, j int) bool { + return topScores[i].score > topScores[j].score + }) + m.FilterSymbol = nil + if len(topScores) > 0 { + log.Printf("🏆 评分TOP%v:", nember) + for i := 0; i < len(topScores) && i < nember; i++ { + m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol) + log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score) + } + } + } +} + +// evaluateSymbolScore 评估币种得分,决定是否保留 +func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 { + score := 0.0 + + // 交易量活跃度评分 (权重: 40%) + if features.VolumeRatio5 > 1.5 { + score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0) + } + + // 价格波动评分 (权重: 30%) + volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数 + score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分 + + // RSI活跃度评分 (权重: 20%) + if features.RSI14 < 30 || features.RSI14 > 70 { + score += 20 // RSI在极端区域 + } else if features.RSI14 < 40 || features.RSI14 > 60 { + score += 10 // RSI在活跃区域 + } + + // 交易量趋势评分 (权重: 10%) + if features.VolumeTrend > 1.2 { + score += 10 * math.Min(features.VolumeTrend/3.0, 1.0) + } + + return score +} + +// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除 +func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool { + value, exists := m.symbolStats.Load(symbol) + if !exists { + return true // 没有统计信息,移除 + } + + stats := value.(*SymbolStats) + + // 规则1: 超过30分钟没有活跃迹象 + if time.Since(stats.LastActiveTime) > 30*time.Minute { + log.Printf("🔻 %s 因长时间不活跃被移除", symbol) + return true + } + + // 规则2: 评分持续低于阈值 (最近5次评分平均) + if stats.Score < 15 { // 调整这个阈值 + log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score) + return true + } + + // 规则3: 超过2小时没有产生警报 + if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 { + log.Printf("🔻 %s 因长时间无新警报被移除", symbol) + return true + } + + return false +} + +// updateSymbolStats 更新币种统计信息 +func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) { + now := time.Now() + + value, exists := m.symbolStats.Load(symbol) + var stats *SymbolStats + + if !exists { + stats = &SymbolStats{ + LastActiveTime: now, + Score: m.evaluateSymbolScore(symbol, features), + } + } else { + stats = value.(*SymbolStats) + stats.LastActiveTime = now + + // 平滑更新评分 (指数移动平均) + newScore := m.evaluateSymbolScore(symbol, features) + stats.Score = 0.7*stats.Score + 0.3*newScore + } + + if hasAlert { + stats.AlertCount++ + stats.LastAlertTime = now + } + + if features.VolumeRatio5 > 2.0 { + stats.VolumeSpikeCount++ + } + + m.symbolStats.Store(symbol, stats) +} + +// removeFromFilter 从FilterSymbols中移除币种 +func (m *WSMonitor) removeFromFilter(symbol string) { + + // 从filterSymbols中移除 + m.filterSymbols.Delete(symbol) + m.symbolStats.Delete(symbol) + + log.Printf("🗑️ 已移除币种监控: %s", symbol) +} + +// cleanupInactiveSymbols 定期清理不活跃的币种 +func (m *WSMonitor) cleanupInactiveSymbols() { + ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次 + defer ticker.Stop() + + for range ticker.C { + var symbolsToRemove []string + + // 收集需要移除的币种 + m.filterSymbols.Range(func(key, value interface{}) bool { + symbol := key.(string) + if m.shouldRemoveFromFilter(symbol) { + symbolsToRemove = append(symbolsToRemove, symbol) + } + return true + }) + + // 执行移除操作 + for _, symbol := range symbolsToRemove { + m.removeFromFilter(symbol) + } + + if len(symbolsToRemove) > 0 { + log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove)) + } + } +} + +// getSymbolScore 获取币种当前评分 +func (m *WSMonitor) getSymbolScore(symbol string) float64 { + value, exists := m.symbolStats.Load(symbol) + if !exists { + return 0 + } + return value.(*SymbolStats).Score +} diff --git a/market/types.go b/market/types.go new file mode 100644 index 00000000..82f44415 --- /dev/null +++ b/market/types.go @@ -0,0 +1,157 @@ +package market + +import "time" + +// Data 市场数据结构 +type Data struct { + Symbol string + CurrentPrice float64 + PriceChange1h float64 // 1小时价格变化百分比 + PriceChange4h float64 // 4小时价格变化百分比 + CurrentEMA20 float64 + CurrentMACD float64 + CurrentRSI7 float64 + OpenInterest *OIData + FundingRate float64 + IntradaySeries *IntradayData + LongerTermContext *LongerTermData +} + +// OIData Open Interest数据 +type OIData struct { + Latest float64 + Average float64 +} + +// IntradayData 日内数据(3分钟间隔) +type IntradayData struct { + MidPrices []float64 + EMA20Values []float64 + MACDValues []float64 + RSI7Values []float64 + RSI14Values []float64 +} + +// LongerTermData 长期数据(4小时时间框架) +type LongerTermData struct { + EMA20 float64 + EMA50 float64 + ATR3 float64 + ATR14 float64 + CurrentVolume float64 + AverageVolume float64 + MACDValues []float64 + RSI14Values []float64 +} + +// Binance API 响应结构 +type ExchangeInfo struct { + Symbols []SymbolInfo `json:"symbols"` +} + +type SymbolInfo struct { + Symbol string `json:"symbol"` + Status string `json:"status"` + BaseAsset string `json:"baseAsset"` + QuoteAsset string `json:"quoteAsset"` + ContractType string `json:"contractType"` + PricePrecision int `json:"pricePrecision"` + QuantityPrecision int `json:"quantityPrecision"` +} + +type Kline struct { + OpenTime int64 `json:"openTime"` + Open float64 `json:"open"` + High float64 `json:"high"` + Low float64 `json:"low"` + Close float64 `json:"close"` + Volume float64 `json:"volume"` + CloseTime int64 `json:"closeTime"` + QuoteVolume float64 `json:"quoteVolume"` + Trades int `json:"trades"` + TakerBuyBaseVolume float64 `json:"takerBuyBaseVolume"` + TakerBuyQuoteVolume float64 `json:"takerBuyQuoteVolume"` +} + +type KlineResponse []interface{} + +type PriceTicker struct { + Symbol string `json:"symbol"` + Price string `json:"price"` +} + +type Ticker24hr struct { + Symbol string `json:"symbol"` + PriceChange string `json:"priceChange"` + PriceChangePercent string `json:"priceChangePercent"` + Volume string `json:"volume"` + QuoteVolume string `json:"quoteVolume"` +} + +// 特征数据结构 +type SymbolFeatures struct { + Symbol string `json:"symbol"` + Timestamp time.Time `json:"timestamp"` + Price float64 `json:"price"` + PriceChange15Min float64 `json:"price_change_15min"` + PriceChange1H float64 `json:"price_change_1h"` + PriceChange4H float64 `json:"price_change_4h"` + Volume float64 `json:"volume"` + VolumeRatio5 float64 `json:"volume_ratio_5"` + VolumeRatio20 float64 `json:"volume_ratio_20"` + VolumeTrend float64 `json:"volume_trend"` + RSI14 float64 `json:"rsi_14"` + SMA5 float64 `json:"sma_5"` + SMA10 float64 `json:"sma_10"` + SMA20 float64 `json:"sma_20"` + HighLowRatio float64 `json:"high_low_ratio"` + Volatility20 float64 `json:"volatility_20"` + PositionInRange float64 `json:"position_in_range"` +} + +// 警报数据结构 +type Alert struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + Value float64 `json:"value"` + Threshold float64 `json:"threshold"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` +} + +type Config struct { + AlertThresholds AlertThresholds `json:"alert_thresholds"` + UpdateInterval int `json:"update_interval"` // seconds + CleanupConfig CleanupConfig `json:"cleanup_config"` +} + +type AlertThresholds struct { + VolumeSpike float64 `json:"volume_spike"` + PriceChange15Min float64 `json:"price_change_15min"` + VolumeTrend float64 `json:"volume_trend"` + RSIOverbought float64 `json:"rsi_overbought"` + RSIOversold float64 `json:"rsi_oversold"` +} +type CleanupConfig struct { + InactiveTimeout time.Duration `json:"inactive_timeout"` // 不活跃超时时间 + MinScoreThreshold float64 `json:"min_score_threshold"` // 最低评分阈值 + NoAlertTimeout time.Duration `json:"no_alert_timeout"` // 无警报超时时间 + CheckInterval time.Duration `json:"check_interval"` // 检查间隔 +} + +var config = Config{ + AlertThresholds: AlertThresholds{ + VolumeSpike: 3.0, + PriceChange15Min: 0.05, + VolumeTrend: 2.0, + RSIOverbought: 70, + RSIOversold: 30, + }, + CleanupConfig: CleanupConfig{ + InactiveTimeout: 30 * time.Minute, + MinScoreThreshold: 15.0, + NoAlertTimeout: 20 * time.Minute, + CheckInterval: 5 * time.Minute, + }, + UpdateInterval: 60, // 1 minute +} diff --git a/market/websocket_client.go b/market/websocket_client.go new file mode 100644 index 00000000..ce151691 --- /dev/null +++ b/market/websocket_client.go @@ -0,0 +1,231 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WSClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} +} + +type WSMessage struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` +} + +type KlineWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + Kline struct { + StartTime int64 `json:"t"` + CloseTime int64 `json:"T"` + Symbol string `json:"s"` + Interval string `json:"i"` + FirstTradeID int64 `json:"f"` + LastTradeID int64 `json:"L"` + OpenPrice string `json:"o"` + ClosePrice string `json:"c"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + NumberOfTrades int `json:"n"` + IsFinal bool `json:"x"` + QuoteVolume string `json:"q"` + TakerBuyBaseVolume string `json:"V"` + TakerBuyQuoteVolume string `json:"Q"` + } `json:"k"` +} + +type TickerWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + PriceChange string `json:"p"` + PriceChangePercent string `json:"P"` + WeightedAvgPrice string `json:"w"` + LastPrice string `json:"c"` + LastQty string `json:"Q"` + OpenPrice string `json:"o"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + QuoteVolume string `json:"q"` + OpenTime int64 `json:"O"` + CloseTime int64 `json:"C"` + FirstID int64 `json:"F"` + LastID int64 `json:"L"` + Count int `json:"n"` +} + +func NewWSClient() *WSClient { + return &WSClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + } +} + +func (w *WSClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.Dial("wss://ws-fapi.binance.com/ws-fapi/v1", nil) + if err != nil { + return fmt.Errorf("WebSocket连接失败: %v", err) + } + + w.mu.Lock() + w.conn = conn + w.mu.Unlock() + + log.Println("WebSocket连接成功") + + // 启动消息读取循环 + go w.readMessages() + + return nil +} + +func (w *WSClient) SubscribeKline(symbol, interval string) error { + stream := fmt.Sprintf("%s@kline_%s", symbol, interval) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeTicker(symbol string) error { + stream := fmt.Sprintf("%s@ticker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeMiniTicker(symbol string) error { + stream := fmt.Sprintf("%s@miniTicker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) subscribe(stream string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": []string{stream}, + "id": time.Now().Unix(), + } + + w.mu.RLock() + defer w.mu.RUnlock() + + if w.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + err := w.conn.WriteJSON(subscribeMsg) + if err != nil { + return err + } + + log.Printf("订阅流: %s", stream) + return nil +} + +func (w *WSClient) readMessages() { + for { + select { + case <-w.done: + return + default: + w.mu.RLock() + conn := w.conn + w.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取WebSocket消息失败: %v", err) + w.handleReconnect() + return + } + + w.handleMessage(message) + } + } +} + +func (w *WSClient) handleMessage(message []byte) { + var wsMsg WSMessage + if err := json.Unmarshal(message, &wsMsg); err != nil { + // 可能是其他格式的消息 + return + } + + w.mu.RLock() + ch, exists := w.subscribers[wsMsg.Stream] + w.mu.RUnlock() + + if exists { + select { + case ch <- wsMsg.Data: + default: + log.Printf("订阅者通道已满: %s", wsMsg.Stream) + } + } +} + +func (w *WSClient) handleReconnect() { + if !w.reconnect { + return + } + + log.Println("尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := w.Connect(); err != nil { + log.Printf("重新连接失败: %v", err) + go w.handleReconnect() + } +} + +func (w *WSClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + w.mu.Lock() + w.subscribers[stream] = ch + w.mu.Unlock() + return ch +} + +func (w *WSClient) RemoveSubscriber(stream string) { + w.mu.Lock() + delete(w.subscribers, stream) + w.mu.Unlock() +} + +func (w *WSClient) Close() { + w.reconnect = false + close(w.done) + + w.mu.Lock() + defer w.mu.Unlock() + + if w.conn != nil { + w.conn.Close() + w.conn = nil + } + + // 关闭所有订阅者通道 + for stream, ch := range w.subscribers { + close(ch) + delete(w.subscribers, stream) + } +} From 3af9f3e37678ea12dc414d5f491f479482606c06 Mon Sep 17 00:00:00 2001 From: zbhan Date: Sat, 1 Nov 2025 22:25:32 -0400 Subject: [PATCH 3/9] fix: github workflow permission --- .github/workflows/pr-checks-advisory.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/pr-checks-advisory.yml b/.github/workflows/pr-checks-advisory.yml index 1c352233..9cab882d 100644 --- a/.github/workflows/pr-checks-advisory.yml +++ b/.github/workflows/pr-checks-advisory.yml @@ -8,12 +8,16 @@ on: # These checks are advisory only - they won't block PR merging # Results will be posted as comments to help contributors improve their PRs +permissions: + contents: read + pull-requests: write + checks: write + issues: write + jobs: pr-info: name: PR Information runs-on: ubuntu-latest - permissions: - pull-requests: write steps: - name: Check PR title format id: check-title @@ -98,8 +102,6 @@ jobs: backend-checks: name: Backend Checks (Advisory) runs-on: ubuntu-latest - permissions: - pull-requests: write continue-on-error: true steps: - uses: actions/checkout@v4 @@ -208,8 +210,6 @@ jobs: frontend-checks: name: Frontend Checks (Advisory) runs-on: ubuntu-latest - permissions: - pull-requests: write continue-on-error: true steps: - uses: actions/checkout@v4 From 48ee2e3eca4efab88ef9a36023ebd483ddf066ef Mon Sep 17 00:00:00 2001 From: Xeron Date: Sun, 2 Nov 2025 10:56:24 +0800 Subject: [PATCH 4/9] Fix broken DashScope link in README files (fixes #128) --- README.md | 4 ++-- docs/i18n/ru/README.md | 4 ++-- docs/i18n/uk/README.md | 2 +- docs/i18n/zh-CN/README.md | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0fa6210e..4c706a85 100644 --- a/README.md +++ b/README.md @@ -406,7 +406,7 @@ Before configuring the system, you need to obtain AI API keys. Choose one of the **How to get Qwen API Key:** -1. **Visit**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Visit**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Register**: Sign up with Alibaba Cloud account 3. **Enable Service**: Activate DashScope service 4. **Create API Key**: @@ -1271,7 +1271,7 @@ We welcome contributions from the community! See our comprehensive guides: - [Binance API](https://binance-docs.github.io/apidocs/futures/en/) - Binance Futures API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - Alibaba Cloud Qwen +- [Qwen](https://dashscope.console.aliyun.com/) - Alibaba Cloud Qwen - [TA-Lib](https://ta-lib.org/) - Technical indicator library - [Recharts](https://recharts.org/) - React chart library diff --git a/docs/i18n/ru/README.md b/docs/i18n/ru/README.md index 2feaa824..fa2153ab 100644 --- a/docs/i18n/ru/README.md +++ b/docs/i18n/ru/README.md @@ -399,7 +399,7 @@ cd .. **Как получить Qwen API ключ:** -1. **Посетите**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Посетите**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Зарегистрируйтесь**: Используя аккаунт Alibaba Cloud 3. **Активируйте сервис**: Активируйте DashScope сервис 4. **Создайте API ключ**: @@ -1094,7 +1094,7 @@ sudo apt-get install libta-lib0-dev - [Binance API](https://binance-docs.github.io/apidocs/futures/en/) - Binance Futures API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - Alibaba Cloud Qwen +- [Qwen](https://dashscope.console.aliyun.com/) - Alibaba Cloud Qwen - [TA-Lib](https://ta-lib.org/) - Библиотека технических индикаторов - [Recharts](https://recharts.org/) - Библиотека графиков React diff --git a/docs/i18n/uk/README.md b/docs/i18n/uk/README.md index 19d506ef..4d3622e2 100644 --- a/docs/i18n/uk/README.md +++ b/docs/i18n/uk/README.md @@ -402,7 +402,7 @@ cd .. **Як отримати Qwen API ключ:** -1. **Відвідайте**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Відвідайте**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Зареєструйтеся**: Використовуючи акаунт Alibaba Cloud 3. **Активуйте сервіс**: Активуйте DashScope сервіс 4. **Створіть API ключ**: diff --git a/docs/i18n/zh-CN/README.md b/docs/i18n/zh-CN/README.md index 29d69c8e..8e3aedcf 100644 --- a/docs/i18n/zh-CN/README.md +++ b/docs/i18n/zh-CN/README.md @@ -398,7 +398,7 @@ cd .. **如何获取Qwen API密钥:** -1. **访问**:[https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **访问**:[https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **注册**:使用阿里云账户注册 3. **开通服务**:激活DashScope服务 4. **创建API密钥**: @@ -1290,7 +1290,7 @@ MIT License - 详见 [LICENSE](LICENSE) 文件 - [Binance API](https://binance-docs.github.io/apidocs/futures/cn/) - 币安合约API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - 阿里云通义千问 +- [Qwen](https://dashscope.console.aliyun.com/) - 阿里云通义千问 - [TA-Lib](https://ta-lib.org/) - 技术指标库 - [Recharts](https://recharts.org/) - React图表库 From 4a8d4d92840889c43e4a05a218bc86440a4f9550 Mon Sep 17 00:00:00 2001 From: tinkle-community Date: Sun, 2 Nov 2025 12:15:40 +0800 Subject: [PATCH 5/9] update aster exchange guide --- README.md | 22 ++++++++++++---------- README.ru.md | 22 ++++++++++++---------- README.uk.md | 22 ++++++++++++---------- README.zh-CN.md | 22 ++++++++++++---------- 4 files changed, 48 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 05b209e4..783a1495 100644 --- a/README.md +++ b/README.md @@ -98,11 +98,12 @@ A Binance-compatible decentralized perpetual futures exchange! - 🌐 **Multi-chain support** - trade on your preferred EVM chain **Quick Start:** -1. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Connect your main wallet and create an API wallet -3. Copy the API Signer address and Private Key -4. Set `"exchange": "aster"` in config.json -5. Add `"aster_user"`, `"aster_signer"`, and `"aster_private_key"` +1. Register via [Aster Referral Link](https://www.asterdex.com/en/referral/fdfc0e) (get fee discounts!) +2. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Connect your main wallet and create an API wallet +4. Copy the API Signer address and Private Key +5. Set `"exchange": "aster"` in config.json +6. Add `"aster_user"`, `"aster_signer"`, and `"aster_private_key"` --- @@ -535,12 +536,13 @@ cp config.json.example config.json - 🌐 Multi-chain support (ETH, BSC, Polygon) - 🌍 No KYC required -**Step 1**: Create Aster API Wallet +**Step 1**: Register and Create Aster API Wallet -1. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Connect your main wallet (MetaMask, WalletConnect, etc.) -3. Click "Create API Wallet" -4. **Save these 3 items immediately:** +1. Register via [Aster Referral Link](https://www.asterdex.com/en/referral/fdfc0e) (get fee discounts!) +2. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Connect your main wallet (MetaMask, WalletConnect, etc.) +4. Click "Create API Wallet" +5. **Save these 3 items immediately:** - Main Wallet address (User) - API Wallet address (Signer) - API Wallet Private Key (⚠️ shown only once!) diff --git a/README.ru.md b/README.ru.md index 7a06c10a..b3dc8e63 100644 --- a/README.ru.md +++ b/README.ru.md @@ -98,11 +98,12 @@ NOFX теперь поддерживает **три основные биржи* - 🌐 **Поддержка нескольких цепей** - торгуйте на вашей любимой EVM цепи **Быстрый старт:** -1. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Подключите основной кошелек и создайте API кошелек -3. Скопируйте адрес API Signer и приватный ключ -4. Установите `"exchange": "aster"` в config.json -5. Добавьте `"aster_user"`, `"aster_signer"` и `"aster_private_key"` +1. Зарегистрируйтесь по [реферальной ссылке Aster](https://www.asterdex.com/en/referral/fdfc0e) (получите скидку на комиссии!) +2. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Подключите основной кошелек и создайте API кошелек +4. Скопируйте адрес API Signer и приватный ключ +5. Установите `"exchange": "aster"` в config.json +6. Добавьте `"aster_user"`, `"aster_signer"` и `"aster_private_key"` --- @@ -462,12 +463,13 @@ cp config.json.example config.json - 🌐 Поддержка нескольких цепей (ETH, BSC, Polygon) - 🌍 Не нужна KYC -**Шаг 1**: Создайте Aster API кошелек +**Шаг 1**: Зарегистрируйтесь и создайте Aster API кошелек -1. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Подключите основной кошелек (MetaMask, WalletConnect и т.д.) -3. Нажмите "Создать API кошелек" -4. **Сохраните эти 3 элемента немедленно:** +1. Зарегистрируйтесь по [реферальной ссылке Aster](https://www.asterdex.com/en/referral/fdfc0e) (получите скидку на комиссии!) +2. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Подключите основной кошелек (MetaMask, WalletConnect и т.д.) +4. Нажмите "Создать API кошелек" +5. **Сохраните эти 3 элемента немедленно:** - Адрес основного кошелька (User) - Адрес API кошелька (Signer) - Приватный ключ API кошелька (⚠️ показывается только один раз!) diff --git a/README.uk.md b/README.uk.md index a54eef1f..3a667f7e 100644 --- a/README.uk.md +++ b/README.uk.md @@ -98,11 +98,12 @@ NOFX тепер підтримує **три основні біржі**: Binance - 🌐 **Підтримка кількох ланцюгів** - торгуйте на вашому улюбленому EVM ланцюзі **Швидкий старт:** -1. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Підключіть основний гаманець і створіть API гаманець -3. Скопіюйте адресу API Signer та приватний ключ -4. Встановіть `"exchange": "aster"` в config.json -5. Додайте `"aster_user"`, `"aster_signer"` та `"aster_private_key"` +1. Зареєструйтеся за [реферальним посиланням Aster](https://www.asterdex.com/en/referral/fdfc0e) (отримайте знижку на комісії!) +2. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Підключіть основний гаманець і створіть API гаманець +4. Скопіюйте адресу API Signer та приватний ключ +5. Встановіть `"exchange": "aster"` в config.json +6. Додайте `"aster_user"`, `"aster_signer"` та `"aster_private_key"` --- @@ -462,12 +463,13 @@ cp config.json.example config.json - 🌐 Підтримка кількох ланцюгів (ETH, BSC, Polygon) - 🌍 Не потрібна KYC -**Крок 1**: Створіть Aster API гаманець +**Крок 1**: Зареєструйтеся та створіть Aster API гаманець -1. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Підключіть основний гаманець (MetaMask, WalletConnect тощо) -3. Натисніть "Створити API гаманець" -4. **Збережіть ці 3 елементи негайно:** +1. Зареєструйтеся за [реферальним посиланням Aster](https://www.asterdex.com/en/referral/fdfc0e) (отримайте знижку на комісії!) +2. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Підключіть основний гаманець (MetaMask, WalletConnect тощо) +4. Натисніть "Створити API гаманець" +5. **Збережіть ці 3 елементи негайно:** - Адреса основного гаманця (User) - Адреса API гаманця (Signer) - Приватний ключ API гаманця (⚠️ показується лише один раз!) diff --git a/README.zh-CN.md b/README.zh-CN.md index 1a28811d..412632b8 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -98,11 +98,12 @@ NOFX现已支持**三大交易所**:Binance、Hyperliquid和Aster DEX! - 🌐 **多链支持** - 在你喜欢的EVM链上交易 **快速开始:** -1. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) -2. 连接你的主钱包并创建API钱包 -3. 复制API Signer地址和私钥 -4. 在config.json中设置`"exchange": "aster"` -5. 添加`"aster_user"`、`"aster_signer"`和`"aster_private_key"` +1. 通过[推荐链接注册Aster](https://www.asterdex.com/en/referral/fdfc0e)(享手续费优惠) +2. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) +3. 连接你的主钱包并创建API钱包 +4. 复制API Signer地址和私钥 +5. 在config.json中设置`"exchange": "aster"` +6. 添加`"aster_user"`、`"aster_signer"`和`"aster_private_key"` --- @@ -531,12 +532,13 @@ cp config.json.example config.json - 🌐 多链支持(ETH、BSC、Polygon) - 🌍 无需KYC -**步骤1**:创建Aster API钱包 +**步骤1**:注册并创建Aster API钱包 -1. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) -2. 连接你的主钱包(MetaMask、WalletConnect等) -3. 点击"创建API钱包" -4. **立即保存这3项:** +1. 通过[推荐链接注册Aster](https://www.asterdex.com/en/referral/fdfc0e)(享手续费优惠) +2. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) +3. 连接你的主钱包(MetaMask、WalletConnect等) +4. 点击"创建API钱包" +5. **立即保存这3项:** - 主钱包地址(User) - API钱包地址(Signer) - API钱包私钥(⚠️ 仅显示一次!) From 7302f96e8e33fbcff83691ebd66e178d857fe420 Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sun, 2 Nov 2025 14:03:13 +0800 Subject: [PATCH 6/9] =?UTF-8?q?K=E7=BA=BF=E8=8E=B7=E5=8F=96=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E6=94=B9=E4=B8=BAwebsocket=E7=BB=84=E5=90=88=E6=B5=81?= =?UTF-8?q?.=20=E5=B8=A6=E9=87=8D=E6=8B=A8=E6=9C=BA=E5=88=B6=20=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E4=B8=BA=E4=B8=8B=EF=BC=9A=201.=20=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=97=B6=E4=BD=BF=E7=94=A8=E6=89=80=E6=9C=89=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E8=AE=BE=E7=BD=AE=E7=9A=84=E5=B8=81=E7=A7=8D(?= =?UTF-8?q?=E5=8E=BB=E9=87=8D)=20=E5=A6=82=E6=9E=9C=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E6=9C=AA=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=88=99=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=B3=BB=E7=BB=9F=E9=BB=98=E8=AE=A4=202.=20=E5=9C=A8?= =?UTF-8?q?=E5=86=B3=E7=AD=96=E8=8E=B7=E5=8F=96K=E7=BA=BF=E6=97=B6=20?= =?UTF-8?q?=E5=A6=82=E6=9E=9C=E6=B2=A1=E6=9C=89=E7=BC=93=E5=AD=98=20?= =?UTF-8?q?=E5=88=99=E5=85=88=E5=AE=9E=E6=97=B6=E8=8E=B7=E5=8F=96=E5=90=8E?= =?UTF-8?q?=E5=86=8D=E6=B7=BB=E5=8A=A0=E8=AE=A2=E9=98=85.=20ps:=20?= =?UTF-8?q?=E9=80=82=E7=94=A8=E4=BA=8EApi=E6=96=B9=E5=BC=8F=E7=9A=84?= =?UTF-8?q?=E5=B8=81=E7=A7=8D=E5=88=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 2 +- config/database.go | 215 +++++++++++++++++++++++++-------------------- main.go | 29 +----- market/monitor.go | 112 +++++++++++------------ 4 files changed, 179 insertions(+), 179 deletions(-) diff --git a/config/config.go b/config/config.go index 3b736d0e..430e428c 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,7 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/config/database.go b/config/database.go index c5eef755..1102c6fb 100644 --- a/config/database.go +++ b/config/database.go @@ -4,8 +4,11 @@ import ( "crypto/rand" "database/sql" "encoding/base32" + "encoding/json" "fmt" "log" + "nofx/market" + "slices" "strings" "time" @@ -177,17 +180,18 @@ func (d *Database) createTables() error { `ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`, - `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 - `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 - `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) - `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 - `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 - `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 - `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 - `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 + `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 + `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) + `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 + `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 + `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 + `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 + `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN use_inside_coins BOOLEAN DEFAULT 0`, // 是否使用内置AI评分信号源 `ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称 - `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 - `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 + `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 + `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 } for _, query := range alterQueries { @@ -245,16 +249,16 @@ func (d *Database) initDefaultData() error { // 初始化系统配置 - 创建所有字段,设置默认值,后续由config.json同步更新 systemConfigs := map[string]string{ - "admin_mode": "true", // 默认开启管理员模式,便于首次使用 - "api_server_port": "8080", // 默认API端口 - "use_default_coins": "true", // 默认使用内置币种列表 - "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) - "max_daily_loss": "10.0", // 最大日损失百分比 - "max_drawdown": "20.0", // 最大回撤百分比 - "stop_trading_minutes": "60", // 停止交易时间(分钟) - "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 - "altcoin_leverage": "5", // 山寨币杠杆倍数 - "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 + "admin_mode": "true", // 默认开启管理员模式,便于首次使用 + "api_server_port": "8080", // 默认API端口 + "use_default_coins": "true", // 默认使用内置币种列表 + "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) + "max_daily_loss": "10.0", // 最大日损失百分比 + "max_drawdown": "20.0", // 最大回撤百分比 + "stop_trading_minutes": "60", // 停止交易时间(分钟) + "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 + "altcoin_leverage": "5", // 山寨币杠杆倍数 + "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 } for key, value := range systemConfigs { @@ -281,14 +285,14 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return err } - + // 如果已经迁移过,直接返回 if count > 0 { return nil } - + log.Printf("🔄 开始迁移exchanges表...") - + // 创建新的exchanges表,使用复合主键 _, err = d.db.Exec(` CREATE TABLE exchanges_new ( @@ -313,7 +317,7 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建新exchanges表失败: %w", err) } - + // 复制数据到新表 _, err = d.db.Exec(` INSERT INTO exchanges_new @@ -322,19 +326,19 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("复制数据失败: %w", err) } - + // 删除旧表 _, err = d.db.Exec(`DROP TABLE exchanges`) if err != nil { return fmt.Errorf("删除旧表失败: %w", err) } - + // 重命名新表 _, err = d.db.Exec(`ALTER TABLE exchanges_new RENAME TO exchanges`) if err != nil { return fmt.Errorf("重命名表失败: %w", err) } - + // 重新创建触发器 _, err = d.db.Exec(` CREATE TRIGGER IF NOT EXISTS update_exchanges_updated_at @@ -347,20 +351,20 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建触发器失败: %w", err) } - + log.Printf("✅ exchanges表迁移完成") return nil } // User 用户配置 type User struct { - ID string `json:"id"` - Email string `json:"email"` - PasswordHash string `json:"-"` // 不返回到前端 - OTPSecret string `json:"-"` // 不返回到前端 - OTPVerified bool `json:"otp_verified"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Email string `json:"email"` + PasswordHash string `json:"-"` // 不返回到前端 + OTPSecret string `json:"-"` // 不返回到前端 + OTPVerified bool `json:"otp_verified"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AIModelConfig AI模型配置 @@ -379,39 +383,40 @@ type AIModelConfig struct { // ExchangeConfig 交易所配置 type ExchangeConfig struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - Type string `json:"type"` - Enabled bool `json:"enabled"` - APIKey string `json:"apiKey"` - SecretKey string `json:"secretKey"` - Testnet bool `json:"testnet"` + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + Type string `json:"type"` + Enabled bool `json:"enabled"` + APIKey string `json:"apiKey"` + SecretKey string `json:"secretKey"` + Testnet bool `json:"testnet"` // Hyperliquid 特定字段 HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"` // Aster 特定字段 - AsterUser string `json:"asterUser"` - AsterSigner string `json:"asterSigner"` - AsterPrivateKey string `json:"asterPrivateKey"` + AsterUser string `json:"asterUser"` + AsterSigner string `json:"asterSigner"` + AsterPrivateKey string `json:"asterPrivateKey"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // TraderRecord 交易员配置(数据库实体) type TraderRecord struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - AIModelID string `json:"ai_model_id"` - ExchangeID string `json:"exchange_id"` - InitialBalance float64 `json:"initial_balance"` - ScanIntervalMinutes int `json:"scan_interval_minutes"` - IsRunning bool `json:"is_running"` - BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 - AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + AIModelID string `json:"ai_model_id"` + ExchangeID string `json:"exchange_id"` + InitialBalance float64 `json:"initial_balance"` + ScanIntervalMinutes int `json:"scan_interval_minutes"` + IsRunning bool `json:"is_running"` + BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 + AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔 UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源 UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置评分信号源 CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称 @@ -422,12 +427,12 @@ type TraderRecord struct { // UserSignalSource 用户信号源配置 type UserSignalSource struct { - ID int `json:"id"` - UserID string `json:"user_id"` - CoinPoolURL string `json:"coin_pool_url"` - OITopURL string `json:"oi_top_url"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID int `json:"id"` + UserID string `json:"user_id"` + CoinPoolURL string `json:"coin_pool_url"` + OITopURL string `json:"oi_top_url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // GenerateOTPSecret 生成OTP密钥 @@ -457,12 +462,12 @@ func (d *Database) EnsureAdminUser() error { if err != nil { return err } - + // 如果已存在,直接返回 if count > 0 { return nil } - + // 创建admin用户(密码为空,因为管理员模式下不需要密码) adminUser := &User{ ID: "admin", @@ -471,7 +476,7 @@ func (d *Database) EnsureAdminUser() error { OTPSecret: "", OTPVerified: true, } - + return d.CreateUser(adminUser) } @@ -482,7 +487,7 @@ func (d *Database) GetUserByEmail(email string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE email = ? `, email).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -498,7 +503,7 @@ func (d *Database) GetUserByID(userID string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE id = ? `, userID).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -668,7 +673,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { err := rows.Scan( &exchange.ID, &exchange.UserID, &exchange.Name, &exchange.Type, &exchange.Enabled, &exchange.APIKey, &exchange.SecretKey, &exchange.Testnet, - &exchange.HyperliquidWalletAddr, &exchange.AsterUser, + &exchange.HyperliquidWalletAddr, &exchange.AsterUser, &exchange.AsterSigner, &exchange.AsterPrivateKey, &exchange.CreatedAt, &exchange.UpdatedAt, ) @@ -684,7 +689,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { // UpdateExchange 更新交易所配置,如果不存在则创建用户特定配置 func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secretKey string, testnet bool, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey string) error { log.Printf("🔧 UpdateExchange: userID=%s, id=%s, enabled=%v", userID, id, enabled) - + // 首先尝试更新现有的用户配置 result, err := d.db.Exec(` UPDATE exchanges SET enabled = ?, api_key = ?, secret_key = ?, testnet = ?, @@ -695,20 +700,20 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre log.Printf("❌ UpdateExchange: 更新失败: %v", err) return err } - + // 检查是否有行被更新 rowsAffected, err := result.RowsAffected() if err != nil { log.Printf("❌ UpdateExchange: 获取影响行数失败: %v", err) return err } - + log.Printf("📊 UpdateExchange: 影响行数 = %d", rowsAffected) - + // 如果没有行被更新,说明用户没有这个交易所的配置,需要创建 if rowsAffected == 0 { log.Printf("💡 UpdateExchange: 没有现有记录,创建新记录") - + // 根据交易所ID确定基本信息 var name, typ string if id == "binance" { @@ -724,16 +729,16 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre name = id + " Exchange" typ = "cex" } - + log.Printf("🆕 UpdateExchange: 创建新记录 ID=%s, name=%s, type=%s", id, name, typ) - + // 创建用户特定的配置,使用原始的交易所ID _, err = d.db.Exec(` INSERT INTO exchanges (id, user_id, name, type, enabled, api_key, secret_key, testnet, hyperliquid_wallet_addr, aster_user, aster_signer, aster_private_key, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) `, id, userID, name, typ, enabled, apiKey, secretKey, testnet, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey) - + if err != nil { log.Printf("❌ UpdateExchange: 创建记录失败: %v", err) } else { @@ -741,7 +746,7 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre } return err } - + log.Printf("✅ UpdateExchange: 更新现有记录成功") return nil } @@ -767,9 +772,9 @@ func (d *Database) CreateExchange(userID, id, name, typ string, enabled bool, ap // CreateTrader 创建交易员 func (d *Database) CreateTrader(trader *TraderRecord) error { _, err := d.db.Exec(` - INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) + INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, use_inside_coins, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?) + `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.UseInsideCoins, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) return err } @@ -779,7 +784,7 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage, COALESCE(trading_symbols, '') as trading_symbols, - COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top, + COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,COALESCE(use_inside_coins, 0) as use_inside_coins, COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt, COALESCE(system_prompt_template, 'default') as system_prompt_template, COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at @@ -790,14 +795,14 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { } defer rows.Close() - var traders []*TraderRecord + var traders []*TraderRecord for rows.Next() { - var trader TraderRecord + var trader TraderRecord err := rows.Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, - &trader.UseCoinPool, &trader.UseOITop, + &trader.UseCoinPool, &trader.UseOITop, &trader.UseInsideCoins, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, @@ -847,18 +852,13 @@ func (d *Database) DeleteTrader(userID, id string) error { // GetTraderConfig 获取交易员完整配置(包含AI模型和交易所信息) func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) { - var trader TraderRecord + var trader TraderRecord var aiModel AIModelConfig var exchange ExchangeConfig err := d.db.QueryRow(` SELECT - t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, - COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage, - COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool, - COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt, - COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin, - t.created_at, t.updated_at, + t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at, a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at, e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet, COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr, @@ -873,8 +873,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM `, traderID, userID).Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, - &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool, - &trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, &aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey, &aiModel.CreatedAt, &aiModel.UpdatedAt, @@ -940,7 +938,36 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string) return err } +// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies +func (d *Database) GetCustomCoins() []string { + var symbol string + var symbols []string + _ = d.db.QueryRow(` + SELECT GROUP_CONCAT(custom_coins , ',') as symbol + FROM main.traders where custom_coins != '' + `).Scan(&symbol) + // 检测用户是否未配置币种 - 兼容性 + if symbol == "" { + symbolJSON, _ := d.GetSystemConfig("default_coins") + if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil { + log.Printf("⚠️ 解析default_coins配置失败: %v,使用硬编码默认值", err) + symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"} + } + } + // filter Symbol + for _, s := range strings.Split(symbol, ",") { + if s == "" { + continue + } + coin := market.Normalize(s) + if !slices.Contains(symbols, coin) { + symbols = append(symbols, coin) + } + } + return symbols +} + // Close 关闭数据库连接 func (d *Database) Close() error { return d.db.Close() -} \ No newline at end of file +} diff --git a/main.go b/main.go index 7929072b..1ffc562e 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "syscall" - "time" ) // LeverageConfig 杠杆配置 @@ -30,9 +29,9 @@ type ConfigFile struct { APIServerPort int `json:"api_server_port"` UseDefaultCoins bool `json:"use_default_coins"` DefaultCoins []string `json:"default_coins"` - InsideCoins bool `json:"inside_coins"` CoinPoolAPIURL string `json:"coin_pool_api_url"` OITopAPIURL string `json:"oi_top_api_url"` + InsideCoins bool `json:"inside_coins"` MaxDailyLoss float64 `json:"max_daily_loss"` MaxDrawdown float64 `json:"max_drawdown"` StopTradingMinutes int `json:"stop_trading_minutes"` @@ -68,9 +67,9 @@ func syncConfigToDatabase(database *config.Database) error { "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), "api_server_port": strconv.Itoa(configFile.APIServerPort), "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), - "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "coin_pool_api_url": configFile.CoinPoolAPIURL, "oi_top_api_url": configFile.OITopAPIURL, + "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), @@ -137,8 +136,6 @@ func main() { // 获取系统配置 useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" - InsideCoinsStr, _ := database.GetSystemConfig("inside_coins") - insideCoins := InsideCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") // 获取管理员模式配置 @@ -186,26 +183,6 @@ func main() { } pool.SetDefaultCoins(defaultCoins) - - //内置AI评分 - if insideCoins { - log.Printf("✓ 启用内置AI评分币种列表") - monitor := market.NewWSMonitor(150) - go func() { - monitor.Start() - // 定时器设置默认的币种列表 - 覆蓋defaultCoins设置 - for { - if len(monitor.FilterSymbol) > 0 { - for _, coin := range defaultCoins { - monitor.FilterSymbol = append(monitor.FilterSymbol, coin) - } - pool.SetDefaultCoins(monitor.FilterSymbol) - monitor.FilterSymbol = nil - } - time.Sleep(1 * time.Minute) - } - }() - } // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -286,6 +263,8 @@ func main() { } }() + // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 + go market.NewWSMonitor(150).Start(database.GetCustomCoins()) // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/monitor.go b/market/monitor.go index 9837623e..b751dfe8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -35,6 +35,7 @@ type SymbolStats struct { } var WSMonitorCli *WSMonitor +var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期 func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ @@ -47,23 +48,27 @@ func NewWSMonitor(batchSize int) *WSMonitor { return WSMonitorCli } -func (m *WSMonitor) Initialize() error { +func (m *WSMonitor) Initialize(coins []string) error { log.Println("初始化WebSocket监控器...") - // 获取交易对信息 apiClient := NewAPIClient() - exchangeInfo, err := apiClient.GetExchangeInfo() - if err != nil { - return err + // 如果不指定交易对,则使用market市场的所有交易对币种 + if len(coins) == 0 { + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { + m.symbols = append(m.symbols, symbol.Symbol) + } + } + } else { + m.symbols = coins } - // 筛选永续合约交易对 --仅测试时使用 - //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] - for _, symbol := range exchangeInfo.Symbols { - if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" { - m.symbols = append(m.symbols, Normalize(symbol.Symbol)) - } - } log.Printf("找到 %d 个交易对", len(m.symbols)) // 初始化历史数据 if err := m.initializeHistoricalData(); err != nil { @@ -114,10 +119,10 @@ func (m *WSMonitor) initializeHistoricalData() error { return nil } -func (m *WSMonitor) Start() { +func (m *WSMonitor) Start(coins []string) { log.Printf("启动WebSocket实时监控...") // 初始化交易对 - err := m.Initialize() + err := m.Initialize(coins) if err != nil { log.Fatalf("❌ 初始化币种: %v", err) return @@ -129,42 +134,43 @@ func (m *WSMonitor) Start() { return } // 启动警报处理器 - go m.handleAlerts() + //go m.handleAlerts() // 启动定期清理任务 - go m.cleanupInactiveSymbols() + //go m.cleanupInactiveSymbols() // 输出监控统计 - 评分前十名 - go m.printFilterStats(50) + //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() - if err != nil { log.Fatalf("❌ 订阅币种交易对: %v", err) return } } +// subscribeSymbol 注册监听 +func (m *WSMonitor) subscribeSymbol(symbol, st string) []string { + var streams []string + stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st) + ch := m.combinedClient.AddSubscriber(stream, 100) + streams = append(streams, stream) + go m.handleKlineData(symbol, ch, st) + + return streams +} func (m *WSMonitor) subscribeAll() error { // 执行批量订阅 log.Println("开始订阅所有交易对...") for _, symbol := range m.symbols { - stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) - ch3m := m.combinedClient.AddSubscriber(stream3m, 100) - go m.handleKlineData(symbol, ch3m, "3m") - - stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol)) - ch4h := m.combinedClient.AddSubscriber(stream4h, 100) - go m.handleKlineData(symbol, ch4h, "4h") + for _, st := range subKlineTime { + m.subscribeSymbol(symbol, st) + } } - - err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") - if err != nil { - log.Fatalf("❌ 订阅3m K线: %v", err) - return err - } - err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h") - if err != nil { - log.Fatalf("❌ 订阅4h K线: %v", err) - return err + for _, st := range subKlineTime { + err := m.combinedClient.BatchSubscribeKlines(m.symbols, st) + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } } log.Println("所有交易对订阅完成") return nil @@ -181,34 +187,14 @@ func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time strin } } -func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) { - for data := range ch { - var tickerData TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - - m.processTickerUpdate(symbol, tickerData) - } -} -func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) { - for data := range ch { - var tickerData []TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - log.Fatalln(tickerData) - //m.processTickerUpdate(symbol, tickerData) - } -} func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { var klineDataMap *sync.Map if _time == "3m" { klineDataMap = &m.klineDataMap3m - } else { + } else if _time == "4h" { klineDataMap = &m.klineDataMap4h + } else { + klineDataMap = &sync.Map{} } return klineDataMap } @@ -310,11 +296,19 @@ func (m *WSMonitor) handleAlerts() { } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + // 对每一个进来的symbol检测是否存在内类 是否的话就订阅它 value, exists := m.getKlineDataMap(_time).Load(symbol) if !exists { // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) apiClient := NewAPIClient() - klines, err := apiClient.GetKlines(symbol, _time, 40) + klines, err := apiClient.GetKlines(symbol, _time, 100) + m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存 + subStr := m.subscribeSymbol(symbol, _time) + subErr := m.combinedClient.subscribeStreams(subStr) + log.Printf("动态订阅流: %v", subStr) + if subErr != nil { + return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr) + } if err != nil { return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) } From d8582475d320e92170913b992344a5e6b876a95e Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sun, 2 Nov 2025 17:59:19 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9Kline=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E4=B8=BAWebsocket=E7=BC=93=E5=AD=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json.example | 1 - config/config.go | 1 - docs/architecture/README.md | 7 +- docs/architecture/README.zh-CN.md | 5 + go.mod | 10 +- main.go | 1 + market/feature_engine.go | 229 -------------------------- market/monitor.go | 262 +----------------------------- 8 files changed, 21 insertions(+), 495 deletions(-) delete mode 100644 market/feature_engine.go diff --git a/config.json.example b/config.json.example index 87b01edd..ac9d5ac6 100644 --- a/config.json.example +++ b/config.json.example @@ -5,7 +5,6 @@ "altcoin_leverage": 5 }, "use_default_coins": true, - "inside_coins": true, "default_coins": [ "BTCUSDT", "ETHUSDT", diff --git a/config/config.go b/config/config.go index 430e428c..37a537db 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,6 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 2c1a2f6f..fb233a31 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -51,7 +51,12 @@ nofx/ │ ├── market/ # Market data fetching │ └── data.go # Market data & technical indicators (TA-Lib) -│ +│ └── api_client.go # Market data acquisition API +│ └── websocket_client.go # Market data acquisition WebSocket interface +│ └── combined_streams.go # Market data acquisition: Combined streaming (single link to subscribe to multiple cryptocurrencies) +│ └── monitor.go # Market data cache +│ └── types.go # market structure + ├── pool/ # Coin pool management │ └── coin_pool.go # AI500 + OI Top merged pool │ diff --git a/docs/architecture/README.zh-CN.md b/docs/architecture/README.zh-CN.md index 36732b09..4acc0f90 100644 --- a/docs/architecture/README.zh-CN.md +++ b/docs/architecture/README.zh-CN.md @@ -51,6 +51,11 @@ nofx/ │ ├── market/ # 市场数据获取 │ └── data.go # 市场数据与技术指标(TA-Lib) +│ └── api_client.go # 行情获取 Api接口 +│ └── websocket_client.go # 行情获取 Websocket接口 +│ └── combined_streams.go # 行情获取 组合流式(单链接订阅多个币种) +│ └── monitor.go # 行情数据缓存 +│ └── types.go # market结构体 │ ├── pool/ # 币种池管理 │ └── coin_pool.go # AI500 + OI Top 合并池 diff --git a/go.mod b/go.mod index 0c6dcfde..067172fd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/gin-gonic/gin v1.11.0 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/google/uuid v1.6.0 - github.com/mattn/go-sqlite3 v1.14.32 + github.com/gorilla/websocket v1.5.3 + github.com/mattn/go-sqlite3 v1.14.16 github.com/pquerna/otp v1.4.0 github.com/sonirico/go-hyperliquid v0.17.0 golang.org/x/crypto v0.42.0 @@ -26,6 +27,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/go-sysinfo v1.15.4 // indirect github.com/elastic/go-windows v1.0.2 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect @@ -37,7 +39,6 @@ require ( github.com/go-playground/validator/v10 v10.27.0 // indirect github.com/goccy/go-json v0.10.4 // indirect github.com/goccy/go-yaml v1.18.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -55,6 +56,7 @@ require ( github.com/prometheus/procfs v0.17.0 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/sonirico/vago v0.9.0 // indirect @@ -78,4 +80,8 @@ require ( golang.org/x/tools v0.36.0 // indirect google.golang.org/protobuf v1.36.9 // indirect howett.net/plist v1.0.1 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect ) diff --git a/main.go b/main.go index 1ffc562e..36537b50 100644 --- a/main.go +++ b/main.go @@ -265,6 +265,7 @@ func main() { // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 go market.NewWSMonitor(150).Start(database.GetCustomCoins()) + //go market.NewWSMonitor(150).Start([]string{}) //这里是一个使用方式 传入空的话 则使用market市场的所有币种 // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/feature_engine.go b/market/feature_engine.go deleted file mode 100644 index 91540a29..00000000 --- a/market/feature_engine.go +++ /dev/null @@ -1,229 +0,0 @@ -package market - -import ( - "fmt" - "math" - "time" -) - -type FeatureEngine struct { - alertThresholds AlertThresholds -} - -func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine { - return &FeatureEngine{ - alertThresholds: thresholds, - } -} - -func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures { - if len(klines) < 20 { - return nil - } - - features := &SymbolFeatures{ - Symbol: symbol, - Timestamp: time.Now(), - } - - // 提取价格和交易量数据 - closes := make([]float64, len(klines)) - volumes := make([]float64, len(klines)) - highs := make([]float64, len(klines)) - lows := make([]float64, len(klines)) - - for i, k := range klines { - closes[i] = k.Close - volumes[i] = k.Volume - highs[i] = k.High - lows[i] = k.Low - } - - // 价格特征 - features.Price = closes[len(closes)-1] - features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2] - - if len(closes) >= 5 { - features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5] - } - if len(closes) >= 17 { - features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17] - } - - // 交易量特征 - currentVolume := volumes[len(volumes)-1] - features.Volume = currentVolume - - // 5周期平均交易量 - if len(volumes) >= 6 { - avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1]) - features.VolumeRatio5 = currentVolume / avgVolume5 - } - - // 20周期平均交易量 - if len(volumes) >= 21 { - avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1]) - features.VolumeRatio20 = currentVolume / avgVolume20 - } - - // 交易量趋势 - if features.VolumeRatio20 > 0 { - features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20 - } - - // 技术指标 - features.RSI14 = e.calculateRSI(closes, 14) - features.SMA5 = e.calculateSMA(closes, 5) - features.SMA10 = e.calculateSMA(closes, 10) - features.SMA20 = e.calculateSMA(closes, 20) - - // 波动特征 - currentHigh := highs[len(highs)-1] - currentLow := lows[len(lows)-1] - features.HighLowRatio = (currentHigh - currentLow) / features.Price - features.Volatility20 = e.calculateVolatility(closes, 20) - - // 价格在区间中的位置 - if currentHigh != currentLow { - features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow) - } else { - features.PositionInRange = 0.5 - } - - return features -} - -func (e *FeatureEngine) calculateAverage(values []float64) float64 { - sum := 0.0 - for _, v := range values { - sum += v - } - return sum / float64(len(values)) -} - -func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - return e.calculateAverage(prices[len(prices)-period:]) -} - -func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 { - if len(prices) <= period { - return 50 - } - - gains := make([]float64, 0) - losses := make([]float64, 0) - - for i := 1; i < len(prices); i++ { - change := prices[i] - prices[i-1] - if change > 0 { - gains = append(gains, change) - losses = append(losses, 0) - } else { - gains = append(gains, 0) - losses = append(losses, -change) - } - } - - // 只取最近period个数据点 - if len(gains) > period { - gains = gains[len(gains)-period:] - losses = losses[len(losses)-period:] - } - - avgGain := e.calculateAverage(gains) - avgLoss := e.calculateAverage(losses) - - if avgLoss == 0 { - return 100 - } - - rs := avgGain / avgLoss - return 100 - (100 / (1 + rs)) -} - -func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - - periodPrices := prices[len(prices)-period:] - mean := e.calculateAverage(periodPrices) - - variance := 0.0 - for _, price := range periodPrices { - variance += math.Pow(price-mean, 2) - } - variance /= float64(len(periodPrices)) - - return math.Sqrt(variance) / mean -} - -func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert { - var alerts []Alert - - // 交易量放大检测 - if features.VolumeRatio5 > e.alertThresholds.VolumeSpike { - alerts = append(alerts, Alert{ - Type: "VOLUME_SPIKE", - Symbol: features.Symbol, - Value: features.VolumeRatio5, - Threshold: e.alertThresholds.VolumeSpike, - Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5), - Timestamp: time.Now(), - }) - } - - // 15分钟价格异动 - if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min { - direction := "上涨" - if features.PriceChange15Min < 0 { - direction = "下跌" - } - alerts = append(alerts, Alert{ - Type: "PRICE_CHANGE_15MIN", - Symbol: features.Symbol, - Value: features.PriceChange15Min, - Threshold: e.alertThresholds.PriceChange15Min, - Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100), - Timestamp: time.Now(), - }) - } - - // 交易量趋势 - if features.VolumeTrend > e.alertThresholds.VolumeTrend { - alerts = append(alerts, Alert{ - Type: "VOLUME_TREND", - Symbol: features.Symbol, - Value: features.VolumeTrend, - Threshold: e.alertThresholds.VolumeTrend, - Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend), - Timestamp: time.Now(), - }) - } - - // RSI超买超卖 - if features.RSI14 > e.alertThresholds.RSIOverbought { - alerts = append(alerts, Alert{ - Type: "RSI_OVERBOUGHT", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOverbought, - Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } else if features.RSI14 < e.alertThresholds.RSIOversold { - alerts = append(alerts, Alert{ - Type: "RSI_OVERSOLD", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOversold, - Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } - - return alerts -} diff --git a/market/monitor.go b/market/monitor.go index b751dfe8..337640d8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -4,8 +4,6 @@ import ( "encoding/json" "fmt" "log" - "math" - "sort" "strings" "sync" "time" @@ -14,7 +12,6 @@ import ( type WSMonitor struct { wsClient *WSClient combinedClient *CombinedStreamsClient - featureEngine *FeatureEngine symbols []string featuresMap sync.Map alertsChan chan Alert @@ -41,7 +38,6 @@ func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ wsClient: NewWSClient(), combinedClient: NewCombinedStreamsClient(batchSize), - featureEngine: NewFeatureEngine(config.AlertThresholds), alertsChan: make(chan Alert, 1000), batchSize: batchSize, } @@ -63,6 +59,7 @@ func (m *WSMonitor) Initialize(coins []string) error { for _, symbol := range exchangeInfo.Symbols { if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { m.symbols = append(m.symbols, symbol.Symbol) + m.filterSymbols.Store(symbol.Symbol, true) } } } else { @@ -133,12 +130,6 @@ func (m *WSMonitor) Start(coins []string) { log.Fatalf("❌ 批量订阅流: %v", err) return } - // 启动警报处理器 - //go m.handleAlerts() - // 启动定期清理任务 - //go m.cleanupInactiveSymbols() - // 输出监控统计 - 评分前十名 - //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() if err != nil { @@ -239,60 +230,6 @@ func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time } klineDataMap.Store(symbol, klines) - // 计算特征并检测警报 - if len(klines) >= 20 { - features := m.featureEngine.CalculateFeatures(symbol, klines) - if features != nil { - m.featuresMap.Store(symbol, features) - - alerts := m.featureEngine.DetectAlerts(features) - hasAlert := len(alerts) > 0 - - // 更新统计信息 - m.updateSymbolStats(symbol, features, hasAlert) - - for _, alert := range alerts { - m.alertsChan <- alert - } - - // 实时日志输出重要特征 - if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 { - //log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f", - // symbol, features.Price, features.PriceChange15Min*100, - // features.VolumeRatio5, features.RSI14) - } - } - } -} - -func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) { - // 存储ticker数据 - m.tickerDataMap.Store(symbol, tickerData) -} - -func (m *WSMonitor) handleAlerts() { - alertCounts := make(map[string]int) - lastReset := time.Now() - - for alert := range m.alertsChan { - // 重置计数器(每小时) - if time.Since(lastReset) > time.Hour { - alertCounts = make(map[string]int) - lastReset = time.Now() - } - - // 警报去重和频率控制 - alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type) - alertCounts[alertKey]++ - m.filterSymbols.Store(alert.Symbol, true) - - //log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message) - if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报 - //log.Printf("🚨 实时警报: %s", alert.Message) - - // 这里可以添加其他警报处理逻辑 - } - } } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { @@ -317,204 +254,7 @@ func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, erro return value.([]Kline), nil } -func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) { - value, exists := m.featuresMap.Load(symbol) - if !exists { - return nil, false - } - return value.(*SymbolFeatures), true -} - -func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures { - features := make(map[string]*SymbolFeatures) - m.featuresMap.Range(func(key, value interface{}) bool { - features[key.(string)] = value.(*SymbolFeatures) - return true - }) - return features -} - func (m *WSMonitor) Close() { m.wsClient.Close() close(m.alertsChan) } -func (m *WSMonitor) printFilterStats(nember int) { - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - - for range ticker.C { - var monitoredSymbols []string - m.filterSymbols.Range(func(key, value interface{}) bool { - monitoredSymbols = append(monitoredSymbols, key.(string)) - return true - }) - - log.Printf("🎯 监控统计 - 总数: %d, 币种: %v", - len(monitoredSymbols), monitoredSymbols) - - // 打印前5个评分最高的币种 - type symbolScore struct { - symbol string - score float64 - } - var topScores []symbolScore - - m.symbolStats.Range(func(key, value interface{}) bool { - symbol := key.(string) - stats := value.(*SymbolStats) - topScores = append(topScores, symbolScore{symbol, stats.Score}) - return true - }) - - // 按评分排序 - sort.Slice(topScores, func(i, j int) bool { - return topScores[i].score > topScores[j].score - }) - m.FilterSymbol = nil - if len(topScores) > 0 { - log.Printf("🏆 评分TOP%v:", nember) - for i := 0; i < len(topScores) && i < nember; i++ { - m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol) - log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score) - } - } - } -} - -// evaluateSymbolScore 评估币种得分,决定是否保留 -func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 { - score := 0.0 - - // 交易量活跃度评分 (权重: 40%) - if features.VolumeRatio5 > 1.5 { - score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0) - } - - // 价格波动评分 (权重: 30%) - volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数 - score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分 - - // RSI活跃度评分 (权重: 20%) - if features.RSI14 < 30 || features.RSI14 > 70 { - score += 20 // RSI在极端区域 - } else if features.RSI14 < 40 || features.RSI14 > 60 { - score += 10 // RSI在活跃区域 - } - - // 交易量趋势评分 (权重: 10%) - if features.VolumeTrend > 1.2 { - score += 10 * math.Min(features.VolumeTrend/3.0, 1.0) - } - - return score -} - -// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除 -func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return true // 没有统计信息,移除 - } - - stats := value.(*SymbolStats) - - // 规则1: 超过30分钟没有活跃迹象 - if time.Since(stats.LastActiveTime) > 30*time.Minute { - log.Printf("🔻 %s 因长时间不活跃被移除", symbol) - return true - } - - // 规则2: 评分持续低于阈值 (最近5次评分平均) - if stats.Score < 15 { // 调整这个阈值 - log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score) - return true - } - - // 规则3: 超过2小时没有产生警报 - if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 { - log.Printf("🔻 %s 因长时间无新警报被移除", symbol) - return true - } - - return false -} - -// updateSymbolStats 更新币种统计信息 -func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) { - now := time.Now() - - value, exists := m.symbolStats.Load(symbol) - var stats *SymbolStats - - if !exists { - stats = &SymbolStats{ - LastActiveTime: now, - Score: m.evaluateSymbolScore(symbol, features), - } - } else { - stats = value.(*SymbolStats) - stats.LastActiveTime = now - - // 平滑更新评分 (指数移动平均) - newScore := m.evaluateSymbolScore(symbol, features) - stats.Score = 0.7*stats.Score + 0.3*newScore - } - - if hasAlert { - stats.AlertCount++ - stats.LastAlertTime = now - } - - if features.VolumeRatio5 > 2.0 { - stats.VolumeSpikeCount++ - } - - m.symbolStats.Store(symbol, stats) -} - -// removeFromFilter 从FilterSymbols中移除币种 -func (m *WSMonitor) removeFromFilter(symbol string) { - - // 从filterSymbols中移除 - m.filterSymbols.Delete(symbol) - m.symbolStats.Delete(symbol) - - log.Printf("🗑️ 已移除币种监控: %s", symbol) -} - -// cleanupInactiveSymbols 定期清理不活跃的币种 -func (m *WSMonitor) cleanupInactiveSymbols() { - ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次 - defer ticker.Stop() - - for range ticker.C { - var symbolsToRemove []string - - // 收集需要移除的币种 - m.filterSymbols.Range(func(key, value interface{}) bool { - symbol := key.(string) - if m.shouldRemoveFromFilter(symbol) { - symbolsToRemove = append(symbolsToRemove, symbol) - } - return true - }) - - // 执行移除操作 - for _, symbol := range symbolsToRemove { - m.removeFromFilter(symbol) - } - - if len(symbolsToRemove) > 0 { - log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove)) - } - } -} - -// getSymbolScore 获取币种当前评分 -func (m *WSMonitor) getSymbolScore(symbol string) float64 { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return 0 - } - return value.(*SymbolStats).Score -} From a8d517108780c569e340275df64b274d264138ca Mon Sep 17 00:00:00 2001 From: Liu Xiang Qian Date: Sun, 2 Nov 2025 18:08:25 +0800 Subject: [PATCH 8/9] fix: Update model validation in handleSaveModelConfig to support both configured and supported models - Change validation to check allModels first, then supportedModels - This allows saving new model configurations without "model does not exist" error - Fixes issue where users couldn't save AI model config after selecting from dropdown Fixes #245 Co-Authored-By: tinkle-community --- web/src/components/AITradersPage.tsx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/web/src/components/AITradersPage.tsx b/web/src/components/AITradersPage.tsx index 3a947df3..9361fc80 100644 --- a/web/src/components/AITradersPage.tsx +++ b/web/src/components/AITradersPage.tsx @@ -277,17 +277,17 @@ export function AITradersPage({ onTraderSelect }: AITradersPageProps) { const handleSaveModelConfig = async (modelId: string, apiKey: string, customApiUrl?: string, customModelName?: string) => { try { - // 找到要配置的模型(从supportedModels中) - const modelToUpdate = supportedModels?.find(m => m.id === modelId); + // 创建或更新用户的模型配置 + const existingModel = allModels?.find(m => m.id === modelId); + let updatedModels; + + // 找到要配置的模型(优先从已配置列表,其次从支持列表) + const modelToUpdate = existingModel || supportedModels?.find(m => m.id === modelId); if (!modelToUpdate) { alert(t('modelNotExist', language)); return; } - // 创建或更新用户的模型配置 - const existingModel = allModels?.find(m => m.id === modelId); - let updatedModels; - if (existingModel) { // 更新现有配置 updatedModels = allModels?.map(m => From 3d6d1e4104ceb73d9504663f6be0ec516d4ce771 Mon Sep 17 00:00:00 2001 From: SkywalkerJi Date: Sun, 2 Nov 2025 21:44:53 +0800 Subject: [PATCH 9/9] Google Tag Manager --- web/index.html | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/web/index.html b/web/index.html index badfe608..574bc83a 100644 --- a/web/index.html +++ b/web/index.html @@ -2,11 +2,22 @@ + + + NOFX - AI Auto Trading Dashboard + + +