From f6539eb7505708ebd5c9cbfcb3c07fbbac71fa85 Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sat, 1 Nov 2025 15:58:54 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=86=85=E7=BD=AEAI?= =?UTF-8?q?=E8=AF=84=E5=88=86=20=E4=BF=AE=E6=94=B9market/data.go=20Get?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E8=8E=B7=E5=8F=96K=E7=BA=BF=E4=B8=BA?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=8E=B7=E5=8F=96(=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BC=A0=E5=85=A5=E5=B8=81=E7=A7=8D=E6=AF=94?= =?UTF-8?q?=E8=BE=83=E5=A4=9A=E7=9A=84=E6=83=85=E5=86=B5=E4=B8=8B=E8=80=97?= =?UTF-8?q?=E6=97=B6=E9=97=AE=E9=A2=98)=20market=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E4=B8=8B=E6=96=B0=E5=A2=9E=E6=96=87=E4=BB=B6=20main.go=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=BF=90=E8=A1=8C=E5=85=A5=E5=8F=A3=20?= =?UTF-8?q?=E9=80=9A=E8=BF=87inside=5Fcoins=3Dtrue=E6=8E=A7=E5=88=B6=20?= =?UTF-8?q?=E8=AF=A5=E8=AF=84=E5=88=86=E9=BB=98=E8=AE=A4=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=A4=A7=E7=BA=A6=E9=9C=80=E8=A6=812=E5=88=86?= =?UTF-8?q?=E9=92=9F=E5=B7=A6=E5=8F=B3(=E5=9B=A0=E4=B8=BA=E5=B8=81?= =?UTF-8?q?=E7=A7=8D=E5=88=97=E8=A1=A8=E6=AF=94=E8=BE=83=E5=A4=9A=EF=BC=8C?= =?UTF-8?q?api=E6=9C=89=E9=99=90=E9=80=9F)=20=E4=BD=BF=E7=94=A8=E6=97=B6?= =?UTF-8?q?=E5=BA=94=E8=AF=A5=E6=B3=A8=E6=84=8Fengine.go=E4=B8=8B=E7=9A=84?= =?UTF-8?q?=E6=B5=81=E5=8A=A8=E6=80=A7=E8=BF=87=E6=BB=A4=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json.example | 1 + config/config.go | 3 +- main.go | 81 +++--- market/api_client.go | 150 +++++++++++ market/combined_streams.go | 202 ++++++++++++++ market/data.go | 105 +------- market/feature_engine.go | 229 ++++++++++++++++ market/monitor.go | 526 +++++++++++++++++++++++++++++++++++++ market/types.go | 157 +++++++++++ market/websocket_client.go | 231 ++++++++++++++++ 10 files changed, 1550 insertions(+), 135 deletions(-) create mode 100644 market/api_client.go create mode 100644 market/combined_streams.go create mode 100644 market/feature_engine.go create mode 100644 market/monitor.go create mode 100644 market/types.go create mode 100644 market/websocket_client.go diff --git a/config.json.example b/config.json.example index ac9d5ac6..87b01edd 100644 --- a/config.json.example +++ b/config.json.example @@ -5,6 +5,7 @@ "altcoin_leverage": 5 }, "use_default_coins": true, + "inside_coins": true, "default_coins": [ "BTCUSDT", "ETHUSDT", diff --git a/config/config.go b/config/config.go index 97fcc84d..3b736d0e 100644 --- a/config/config.go +++ b/config/config.go @@ -11,7 +11,7 @@ import ( type TraderConfig struct { ID string `json:"id"` Name string `json:"name"` - Enabled bool `json:"enabled"` // 是否启用该trader + Enabled bool `json:"enabled"` // 是否启用该trader AIModel string `json:"ai_model"` // "qwen" or "deepseek" // 交易平台选择(二选一) @@ -54,6 +54,7 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 + InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/main.go b/main.go index 1d9631a9..7929072b 100644 --- a/main.go +++ b/main.go @@ -8,12 +8,14 @@ import ( "nofx/auth" "nofx/config" "nofx/manager" + "nofx/market" "nofx/pool" "os" "os/signal" "strconv" "strings" "syscall" + "time" ) // LeverageConfig 杠杆配置 @@ -28,6 +30,7 @@ type ConfigFile struct { APIServerPort int `json:"api_server_port"` UseDefaultCoins bool `json:"use_default_coins"` DefaultCoins []string `json:"default_coins"` + InsideCoins bool `json:"inside_coins"` CoinPoolAPIURL string `json:"coin_pool_api_url"` OITopAPIURL string `json:"oi_top_api_url"` MaxDailyLoss float64 `json:"max_daily_loss"` @@ -35,6 +38,7 @@ type ConfigFile struct { StopTradingMinutes int `json:"stop_trading_minutes"` Leverage LeverageConfig `json:"leverage"` JWTSecret string `json:"jwt_secret"` + DataKLineTime string `json:"data_k_line_time"` } // syncConfigToDatabase 从config.json读取配置并同步到数据库 @@ -61,14 +65,15 @@ func syncConfigToDatabase(database *config.Database) error { // 同步各配置项到数据库 configs := map[string]string{ - "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), - "api_server_port": strconv.Itoa(configFile.APIServerPort), - "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), - "coin_pool_api_url": configFile.CoinPoolAPIURL, - "oi_top_api_url": configFile.OITopAPIURL, - "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), - "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), - "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), + "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), + "api_server_port": strconv.Itoa(configFile.APIServerPort), + "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), + "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), + "coin_pool_api_url": configFile.CoinPoolAPIURL, + "oi_top_api_url": configFile.OITopAPIURL, + "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), + "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), + "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), } // 同步default_coins(转换为JSON字符串存储) @@ -132,12 +137,14 @@ func main() { // 获取系统配置 useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" + InsideCoinsStr, _ := database.GetSystemConfig("inside_coins") + insideCoins := InsideCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") - + // 获取管理员模式配置 adminModeStr, _ := database.GetSystemConfig("admin_mode") adminMode := adminModeStr != "false" // 默认为true - + // 设置JWT密钥 jwtSecret, _ := database.GetSystemConfig("jwt_secret") if jwtSecret == "" { @@ -145,7 +152,7 @@ func main() { log.Printf("⚠️ 使用默认JWT密钥,建议在生产环境中配置") } auth.SetJWTSecret(jwtSecret) - + // 在管理员模式下,确保admin用户存在 if adminMode { err := database.EnsureAdminUser() @@ -156,7 +163,7 @@ func main() { } auth.SetAdminMode(true) } - + log.Printf("✓ 配置数据库初始化成功") fmt.Println() @@ -180,6 +187,25 @@ func main() { pool.SetDefaultCoins(defaultCoins) + //内置AI评分 + if insideCoins { + log.Printf("✓ 启用内置AI评分币种列表") + monitor := market.NewWSMonitor(150) + go func() { + monitor.Start() + // 定时器设置默认的币种列表 - 覆蓋defaultCoins设置 + for { + if len(monitor.FilterSymbol) > 0 { + for _, coin := range defaultCoins { + monitor.FilterSymbol = append(monitor.FilterSymbol, coin) + } + pool.SetDefaultCoins(monitor.FilterSymbol) + monitor.FilterSymbol = nil + } + time.Sleep(1 * time.Minute) + } + }() + } // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -192,7 +218,7 @@ func main() { pool.SetCoinPoolAPI(coinPoolAPIURL) log.Printf("✓ 已配置AI500币种池API") } - + oiTopAPIURL, _ := database.GetSystemConfig("oi_top_api_url") if oiTopAPIURL != "" { pool.SetOITopAPI(oiTopAPIURL) @@ -208,37 +234,26 @@ func main() { log.Fatalf("❌ 加载交易员失败: %v", err) } - // 获取所有用户的交易员配置(用于显示) - userIDs, err := database.GetAllUsers() + // 获取数据库中的所有交易员配置(用于显示,使用default用户) + traders, err := database.GetTraders("default") if err != nil { - log.Printf("⚠️ 获取用户列表失败: %v", err) - userIDs = []string{"default"} // 回退到default用户 - } - - var allTraders []*config.TraderRecord - for _, userID := range userIDs { - traders, err := database.GetTraders(userID) - if err != nil { - log.Printf("⚠️ 获取用户 %s 的交易员失败: %v", userID, err) - continue - } - allTraders = append(allTraders, traders...) + log.Fatalf("❌ 获取交易员列表失败: %v", err) } // 显示加载的交易员信息 fmt.Println() fmt.Println("🤖 数据库中的AI交易员配置:") - if len(allTraders) == 0 { + if len(traders) == 0 { fmt.Println(" • 暂无配置的交易员,请通过Web界面创建") } else { - for _, trader := range allTraders { + for _, trader := range traders { status := "停止" if trader.IsRunning { status = "运行中" } - fmt.Printf(" • %s (%s + %s) - 用户: %s - 初始资金: %.0f USDT [%s]\n", - trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), - trader.UserID, trader.InitialBalance, status) + fmt.Printf(" • %s (%s + %s) - 初始资金: %.0f USDT [%s]\n", + trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), + trader.InitialBalance, status) } } @@ -256,7 +271,7 @@ func main() { fmt.Println() // 获取API服务器端口 - apiPort := 8080 // 默认端口 + apiPort := 8080 // 默认端口 if apiPortStr != "" { if port, err := strconv.Atoi(apiPortStr); err == nil { apiPort = port diff --git a/market/api_client.go b/market/api_client.go new file mode 100644 index 00000000..70bb1150 --- /dev/null +++ b/market/api_client.go @@ -0,0 +1,150 @@ +package market + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strconv" + "time" +) + +const ( + baseURL = "https://fapi.binance.com" +) + +type APIClient struct { + client *http.Client +} + +func NewAPIClient() *APIClient { + return &APIClient{ + client: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (c *APIClient) GetExchangeInfo() (*ExchangeInfo, error) { + url := fmt.Sprintf("%s/fapi/v1/exchangeInfo", baseURL) + resp, err := c.client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var exchangeInfo ExchangeInfo + err = json.Unmarshal(body, &exchangeInfo) + if err != nil { + return nil, err + } + + return &exchangeInfo, nil +} + +func (c *APIClient) GetKlines(symbol, interval string, limit int) ([]Kline, error) { + url := fmt.Sprintf("%s/fapi/v1/klines", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + q.Add("interval", interval) + q.Add("limit", strconv.Itoa(limit)) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var klineResponses []KlineResponse + err = json.Unmarshal(body, &klineResponses) + if err != nil { + return nil, err + } + + var klines []Kline + for _, kr := range klineResponses { + kline, err := parseKline(kr) + if err != nil { + log.Printf("解析K线数据失败: %v", err) + continue + } + klines = append(klines, kline) + } + + return klines, nil +} + +func parseKline(kr KlineResponse) (Kline, error) { + var kline Kline + + if len(kr) < 11 { + return kline, fmt.Errorf("invalid kline data") + } + + // 解析各个字段 + kline.OpenTime = int64(kr[0].(float64)) + kline.Open, _ = strconv.ParseFloat(kr[1].(string), 64) + kline.High, _ = strconv.ParseFloat(kr[2].(string), 64) + kline.Low, _ = strconv.ParseFloat(kr[3].(string), 64) + kline.Close, _ = strconv.ParseFloat(kr[4].(string), 64) + kline.Volume, _ = strconv.ParseFloat(kr[5].(string), 64) + kline.CloseTime = int64(kr[6].(float64)) + kline.QuoteVolume, _ = strconv.ParseFloat(kr[7].(string), 64) + kline.Trades = int(kr[8].(float64)) + kline.TakerBuyBaseVolume, _ = strconv.ParseFloat(kr[9].(string), 64) + kline.TakerBuyQuoteVolume, _ = strconv.ParseFloat(kr[10].(string), 64) + + return kline, nil +} + +func (c *APIClient) GetCurrentPrice(symbol string) (float64, error) { + url := fmt.Sprintf("%s/fapi/v1/ticker/price", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var ticker PriceTicker + err = json.Unmarshal(body, &ticker) + if err != nil { + return 0, err + } + + price, err := strconv.ParseFloat(ticker.Price, 64) + if err != nil { + return 0, err + } + + return price, nil +} diff --git a/market/combined_streams.go b/market/combined_streams.go new file mode 100644 index 00000000..801d423e --- /dev/null +++ b/market/combined_streams.go @@ -0,0 +1,202 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type CombinedStreamsClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} + batchSize int // 每批订阅的流数量 +} + +func NewCombinedStreamsClient(batchSize int) *CombinedStreamsClient { + return &CombinedStreamsClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + batchSize: batchSize, + } +} + +func (c *CombinedStreamsClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + // 组合流使用不同的端点 + conn, _, err := dialer.Dial("wss://fstream.binance.com/stream", nil) + if err != nil { + return fmt.Errorf("组合流WebSocket连接失败: %v", err) + } + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + log.Println("组合流WebSocket连接成功") + go c.readMessages() + + return nil +} + +// BatchSubscribeKlines 批量订阅K线 +func (c *CombinedStreamsClient) BatchSubscribeKlines(symbols []string, interval string) error { + // 将symbols分批处理 + batches := c.splitIntoBatches(symbols, c.batchSize) + + for i, batch := range batches { + log.Printf("订阅第 %d 批, 数量: %d", i+1, len(batch)) + + streams := make([]string, len(batch)) + for j, symbol := range batch { + streams[j] = fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), interval) + } + + if err := c.subscribeStreams(streams); err != nil { + return fmt.Errorf("第 %d 批订阅失败: %v", i+1, err) + } + + // 批次间延迟,避免被限制 + if i < len(batches)-1 { + time.Sleep(100 * time.Millisecond) + } + } + + return nil +} + +// splitIntoBatches 将切片分成指定大小的批次 +func (c *CombinedStreamsClient) splitIntoBatches(symbols []string, batchSize int) [][]string { + var batches [][]string + + for i := 0; i < len(symbols); i += batchSize { + end := i + batchSize + if end > len(symbols) { + end = len(symbols) + } + batches = append(batches, symbols[i:end]) + } + + return batches +} + +// subscribeStreams 订阅多个流 +func (c *CombinedStreamsClient) subscribeStreams(streams []string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": streams, + "id": time.Now().UnixNano(), + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if c.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + log.Printf("订阅流: %v", streams) + return c.conn.WriteJSON(subscribeMsg) +} + +func (c *CombinedStreamsClient) readMessages() { + for { + select { + case <-c.done: + return + default: + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取组合流消息失败: %v", err) + c.handleReconnect() + return + } + + c.handleCombinedMessage(message) + } + } +} + +func (c *CombinedStreamsClient) handleCombinedMessage(message []byte) { + var combinedMsg struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` + } + + if err := json.Unmarshal(message, &combinedMsg); err != nil { + log.Printf("解析组合消息失败: %v", err) + return + } + + c.mu.RLock() + ch, exists := c.subscribers[combinedMsg.Stream] + c.mu.RUnlock() + + if exists { + select { + case ch <- combinedMsg.Data: + default: + log.Printf("订阅者通道已满: %s", combinedMsg.Stream) + } + } +} + +func (c *CombinedStreamsClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + c.mu.Lock() + c.subscribers[stream] = ch + c.mu.Unlock() + return ch +} + +func (c *CombinedStreamsClient) handleReconnect() { + if !c.reconnect { + return + } + + log.Println("组合流尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := c.Connect(); err != nil { + log.Printf("组合流重新连接失败: %v", err) + go c.handleReconnect() + } +} + +func (c *CombinedStreamsClient) Close() { + c.reconnect = false + close(c.done) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + for stream, ch := range c.subscribers { + close(ch) + delete(c.subscribers, stream) + } +} diff --git a/market/data.go b/market/data.go index 97812e64..cd40be75 100644 --- a/market/data.go +++ b/market/data.go @@ -10,72 +10,20 @@ import ( "strings" ) -// Data 市场数据结构 -type Data struct { - Symbol string - CurrentPrice float64 - PriceChange1h float64 // 1小时价格变化百分比 - PriceChange4h float64 // 4小时价格变化百分比 - CurrentEMA20 float64 - CurrentMACD float64 - CurrentRSI7 float64 - OpenInterest *OIData - FundingRate float64 - IntradaySeries *IntradayData - LongerTermContext *LongerTermData -} - -// OIData Open Interest数据 -type OIData struct { - Latest float64 - Average float64 -} - -// IntradayData 日内数据(3分钟间隔) -type IntradayData struct { - MidPrices []float64 - EMA20Values []float64 - MACDValues []float64 - RSI7Values []float64 - RSI14Values []float64 -} - -// LongerTermData 长期数据(4小时时间框架) -type LongerTermData struct { - EMA20 float64 - EMA50 float64 - ATR3 float64 - ATR14 float64 - CurrentVolume float64 - AverageVolume float64 - MACDValues []float64 - RSI14Values []float64 -} - -// Kline K线数据 -type Kline struct { - OpenTime int64 - Open float64 - High float64 - Low float64 - Close float64 - Volume float64 - CloseTime int64 -} - // Get 获取指定代币的市场数据 func Get(symbol string) (*Data, error) { + var klines3m, klines4h []Kline + var err error // 标准化symbol symbol = Normalize(symbol) - // 获取3分钟K线数据 (最近10个) - klines3m, err := getKlines(symbol, "3m", 40) // 多获取一些用于计算 + klines3m, err = WSMonitorCli.GetCurrentKlines(symbol, "3m") // 多获取一些用于计算 if err != nil { return nil, fmt.Errorf("获取3分钟K线失败: %v", err) } // 获取4小时K线数据 (最近10个) - klines4h, err := getKlines(symbol, "4h", 60) // 多获取用于计算指标 + klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // 多获取用于计算指标 if err != nil { return nil, fmt.Errorf("获取4小时K线失败: %v", err) } @@ -136,51 +84,6 @@ func Get(symbol string) (*Data, error) { }, nil } -// getKlines 从Binance获取K线数据 -func getKlines(symbol, interval string, limit int) ([]Kline, error) { - url := fmt.Sprintf("https://fapi.binance.com/fapi/v1/klines?symbol=%s&interval=%s&limit=%d", - symbol, interval, limit) - - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - var rawData [][]interface{} - if err := json.Unmarshal(body, &rawData); err != nil { - return nil, err - } - - klines := make([]Kline, len(rawData)) - for i, item := range rawData { - openTime := int64(item[0].(float64)) - open, _ := parseFloat(item[1]) - high, _ := parseFloat(item[2]) - low, _ := parseFloat(item[3]) - close, _ := parseFloat(item[4]) - volume, _ := parseFloat(item[5]) - closeTime := int64(item[6].(float64)) - - klines[i] = Kline{ - OpenTime: openTime, - Open: open, - High: high, - Low: low, - Close: close, - Volume: volume, - CloseTime: closeTime, - } - } - - return klines, nil -} - // calculateEMA 计算EMA func calculateEMA(klines []Kline, period int) float64 { if len(klines) < period { diff --git a/market/feature_engine.go b/market/feature_engine.go new file mode 100644 index 00000000..91540a29 --- /dev/null +++ b/market/feature_engine.go @@ -0,0 +1,229 @@ +package market + +import ( + "fmt" + "math" + "time" +) + +type FeatureEngine struct { + alertThresholds AlertThresholds +} + +func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine { + return &FeatureEngine{ + alertThresholds: thresholds, + } +} + +func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures { + if len(klines) < 20 { + return nil + } + + features := &SymbolFeatures{ + Symbol: symbol, + Timestamp: time.Now(), + } + + // 提取价格和交易量数据 + closes := make([]float64, len(klines)) + volumes := make([]float64, len(klines)) + highs := make([]float64, len(klines)) + lows := make([]float64, len(klines)) + + for i, k := range klines { + closes[i] = k.Close + volumes[i] = k.Volume + highs[i] = k.High + lows[i] = k.Low + } + + // 价格特征 + features.Price = closes[len(closes)-1] + features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2] + + if len(closes) >= 5 { + features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5] + } + if len(closes) >= 17 { + features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17] + } + + // 交易量特征 + currentVolume := volumes[len(volumes)-1] + features.Volume = currentVolume + + // 5周期平均交易量 + if len(volumes) >= 6 { + avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1]) + features.VolumeRatio5 = currentVolume / avgVolume5 + } + + // 20周期平均交易量 + if len(volumes) >= 21 { + avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1]) + features.VolumeRatio20 = currentVolume / avgVolume20 + } + + // 交易量趋势 + if features.VolumeRatio20 > 0 { + features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20 + } + + // 技术指标 + features.RSI14 = e.calculateRSI(closes, 14) + features.SMA5 = e.calculateSMA(closes, 5) + features.SMA10 = e.calculateSMA(closes, 10) + features.SMA20 = e.calculateSMA(closes, 20) + + // 波动特征 + currentHigh := highs[len(highs)-1] + currentLow := lows[len(lows)-1] + features.HighLowRatio = (currentHigh - currentLow) / features.Price + features.Volatility20 = e.calculateVolatility(closes, 20) + + // 价格在区间中的位置 + if currentHigh != currentLow { + features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow) + } else { + features.PositionInRange = 0.5 + } + + return features +} + +func (e *FeatureEngine) calculateAverage(values []float64) float64 { + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} + +func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 { + if len(prices) < period { + return 0 + } + return e.calculateAverage(prices[len(prices)-period:]) +} + +func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 { + if len(prices) <= period { + return 50 + } + + gains := make([]float64, 0) + losses := make([]float64, 0) + + for i := 1; i < len(prices); i++ { + change := prices[i] - prices[i-1] + if change > 0 { + gains = append(gains, change) + losses = append(losses, 0) + } else { + gains = append(gains, 0) + losses = append(losses, -change) + } + } + + // 只取最近period个数据点 + if len(gains) > period { + gains = gains[len(gains)-period:] + losses = losses[len(losses)-period:] + } + + avgGain := e.calculateAverage(gains) + avgLoss := e.calculateAverage(losses) + + if avgLoss == 0 { + return 100 + } + + rs := avgGain / avgLoss + return 100 - (100 / (1 + rs)) +} + +func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 { + if len(prices) < period { + return 0 + } + + periodPrices := prices[len(prices)-period:] + mean := e.calculateAverage(periodPrices) + + variance := 0.0 + for _, price := range periodPrices { + variance += math.Pow(price-mean, 2) + } + variance /= float64(len(periodPrices)) + + return math.Sqrt(variance) / mean +} + +func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert { + var alerts []Alert + + // 交易量放大检测 + if features.VolumeRatio5 > e.alertThresholds.VolumeSpike { + alerts = append(alerts, Alert{ + Type: "VOLUME_SPIKE", + Symbol: features.Symbol, + Value: features.VolumeRatio5, + Threshold: e.alertThresholds.VolumeSpike, + Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5), + Timestamp: time.Now(), + }) + } + + // 15分钟价格异动 + if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min { + direction := "上涨" + if features.PriceChange15Min < 0 { + direction = "下跌" + } + alerts = append(alerts, Alert{ + Type: "PRICE_CHANGE_15MIN", + Symbol: features.Symbol, + Value: features.PriceChange15Min, + Threshold: e.alertThresholds.PriceChange15Min, + Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100), + Timestamp: time.Now(), + }) + } + + // 交易量趋势 + if features.VolumeTrend > e.alertThresholds.VolumeTrend { + alerts = append(alerts, Alert{ + Type: "VOLUME_TREND", + Symbol: features.Symbol, + Value: features.VolumeTrend, + Threshold: e.alertThresholds.VolumeTrend, + Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend), + Timestamp: time.Now(), + }) + } + + // RSI超买超卖 + if features.RSI14 > e.alertThresholds.RSIOverbought { + alerts = append(alerts, Alert{ + Type: "RSI_OVERBOUGHT", + Symbol: features.Symbol, + Value: features.RSI14, + Threshold: e.alertThresholds.RSIOverbought, + Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14), + Timestamp: time.Now(), + }) + } else if features.RSI14 < e.alertThresholds.RSIOversold { + alerts = append(alerts, Alert{ + Type: "RSI_OVERSOLD", + Symbol: features.Symbol, + Value: features.RSI14, + Threshold: e.alertThresholds.RSIOversold, + Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14), + Timestamp: time.Now(), + }) + } + + return alerts +} diff --git a/market/monitor.go b/market/monitor.go new file mode 100644 index 00000000..9837623e --- /dev/null +++ b/market/monitor.go @@ -0,0 +1,526 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "math" + "sort" + "strings" + "sync" + "time" +) + +type WSMonitor struct { + wsClient *WSClient + combinedClient *CombinedStreamsClient + featureEngine *FeatureEngine + symbols []string + featuresMap sync.Map + alertsChan chan Alert + klineDataMap3m sync.Map // 存储每个交易对的K线历史数据 + klineDataMap4h sync.Map // 存储每个交易对的K线历史数据 + tickerDataMap sync.Map // 存储每个交易对的ticker数据 + batchSize int + filterSymbols sync.Map // 使用sync.Map来存储需要监控的币种和其状态 + symbolStats sync.Map // 存储币种统计信息 + FilterSymbol []string //经过筛选的币种 +} +type SymbolStats struct { + LastActiveTime time.Time + AlertCount int + VolumeSpikeCount int + LastAlertTime time.Time + Score float64 // 综合评分 +} + +var WSMonitorCli *WSMonitor + +func NewWSMonitor(batchSize int) *WSMonitor { + WSMonitorCli = &WSMonitor{ + wsClient: NewWSClient(), + combinedClient: NewCombinedStreamsClient(batchSize), + featureEngine: NewFeatureEngine(config.AlertThresholds), + alertsChan: make(chan Alert, 1000), + batchSize: batchSize, + } + return WSMonitorCli +} + +func (m *WSMonitor) Initialize() error { + log.Println("初始化WebSocket监控器...") + + // 获取交易对信息 + apiClient := NewAPIClient() + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" { + m.symbols = append(m.symbols, Normalize(symbol.Symbol)) + } + } + log.Printf("找到 %d 个交易对", len(m.symbols)) + // 初始化历史数据 + if err := m.initializeHistoricalData(); err != nil { + log.Printf("初始化历史数据失败: %v", err) + } + + return nil +} + +func (m *WSMonitor) initializeHistoricalData() error { + apiClient := NewAPIClient() + + var wg sync.WaitGroup + semaphore := make(chan struct{}, 5) // 限制并发数 + + for _, symbol := range m.symbols { + wg.Add(1) + semaphore <- struct{}{} + + go func(s string) { + defer wg.Done() + defer func() { <-semaphore }() + + // 获取历史K线数据 + klines, err := apiClient.GetKlines(s, "3m", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines) > 0 { + m.klineDataMap3m.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-3m: %d 条", s, len(klines)) + } + // 获取历史K线数据 + klines4h, err := apiClient.GetKlines(s, "4h", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines4h) > 0 { + m.klineDataMap4h.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-4h: %d 条", s, len(klines)) + } + }(symbol) + } + + wg.Wait() + return nil +} + +func (m *WSMonitor) Start() { + log.Printf("启动WebSocket实时监控...") + // 初始化交易对 + err := m.Initialize() + if err != nil { + log.Fatalf("❌ 初始化币种: %v", err) + return + } + + err = m.combinedClient.Connect() + if err != nil { + log.Fatalf("❌ 批量订阅流: %v", err) + return + } + // 启动警报处理器 + go m.handleAlerts() + // 启动定期清理任务 + go m.cleanupInactiveSymbols() + // 输出监控统计 - 评分前十名 + go m.printFilterStats(50) + // 订阅所有交易对 + err = m.subscribeAll() + + if err != nil { + log.Fatalf("❌ 订阅币种交易对: %v", err) + return + } +} + +func (m *WSMonitor) subscribeAll() error { + // 执行批量订阅 + log.Println("开始订阅所有交易对...") + for _, symbol := range m.symbols { + stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) + ch3m := m.combinedClient.AddSubscriber(stream3m, 100) + go m.handleKlineData(symbol, ch3m, "3m") + + stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol)) + ch4h := m.combinedClient.AddSubscriber(stream4h, 100) + go m.handleKlineData(symbol, ch4h, "4h") + } + + err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } + err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h") + if err != nil { + log.Fatalf("❌ 订阅4h K线: %v", err) + return err + } + log.Println("所有交易对订阅完成") + return nil +} + +func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time string) { + for data := range ch { + var klineData KlineWSData + if err := json.Unmarshal(data, &klineData); err != nil { + log.Printf("解析Kline数据失败: %v", err) + continue + } + m.processKlineUpdate(symbol, klineData, _time) + } +} + +func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) { + for data := range ch { + var tickerData TickerWSData + if err := json.Unmarshal(data, &tickerData); err != nil { + log.Printf("解析Ticker数据失败: %v", err) + continue + } + + m.processTickerUpdate(symbol, tickerData) + } +} +func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) { + for data := range ch { + var tickerData []TickerWSData + if err := json.Unmarshal(data, &tickerData); err != nil { + log.Printf("解析Ticker数据失败: %v", err) + continue + } + log.Fatalln(tickerData) + //m.processTickerUpdate(symbol, tickerData) + } +} +func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { + var klineDataMap *sync.Map + if _time == "3m" { + klineDataMap = &m.klineDataMap3m + } else { + klineDataMap = &m.klineDataMap4h + } + return klineDataMap +} +func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time string) { + // 转换WebSocket数据为Kline结构 + kline := Kline{ + OpenTime: wsData.Kline.StartTime, + CloseTime: wsData.Kline.CloseTime, + Trades: wsData.Kline.NumberOfTrades, + } + kline.Open, _ = parseFloat(wsData.Kline.OpenPrice) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.Low, _ = parseFloat(wsData.Kline.LowPrice) + kline.Close, _ = parseFloat(wsData.Kline.ClosePrice) + kline.Volume, _ = parseFloat(wsData.Kline.Volume) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.QuoteVolume, _ = parseFloat(wsData.Kline.QuoteVolume) + kline.TakerBuyBaseVolume, _ = parseFloat(wsData.Kline.TakerBuyBaseVolume) + kline.TakerBuyQuoteVolume, _ = parseFloat(wsData.Kline.TakerBuyQuoteVolume) + // 更新K线数据 + var klineDataMap = m.getKlineDataMap(_time) + value, exists := klineDataMap.Load(symbol) + var klines []Kline + if exists { + klines = value.([]Kline) + + // 检查是否是新的K线 + if len(klines) > 0 && klines[len(klines)-1].OpenTime == kline.OpenTime { + // 更新当前K线 + klines[len(klines)-1] = kline + } else { + // 添加新K线 + klines = append(klines, kline) + + // 保持数据长度 + if len(klines) > 100 { + klines = klines[1:] + } + } + } else { + klines = []Kline{kline} + } + + klineDataMap.Store(symbol, klines) + // 计算特征并检测警报 + if len(klines) >= 20 { + features := m.featureEngine.CalculateFeatures(symbol, klines) + if features != nil { + m.featuresMap.Store(symbol, features) + + alerts := m.featureEngine.DetectAlerts(features) + hasAlert := len(alerts) > 0 + + // 更新统计信息 + m.updateSymbolStats(symbol, features, hasAlert) + + for _, alert := range alerts { + m.alertsChan <- alert + } + + // 实时日志输出重要特征 + if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 { + //log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f", + // symbol, features.Price, features.PriceChange15Min*100, + // features.VolumeRatio5, features.RSI14) + } + } + } +} + +func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) { + // 存储ticker数据 + m.tickerDataMap.Store(symbol, tickerData) +} + +func (m *WSMonitor) handleAlerts() { + alertCounts := make(map[string]int) + lastReset := time.Now() + + for alert := range m.alertsChan { + // 重置计数器(每小时) + if time.Since(lastReset) > time.Hour { + alertCounts = make(map[string]int) + lastReset = time.Now() + } + + // 警报去重和频率控制 + alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type) + alertCounts[alertKey]++ + m.filterSymbols.Store(alert.Symbol, true) + + //log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message) + if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报 + //log.Printf("🚨 实时警报: %s", alert.Message) + + // 这里可以添加其他警报处理逻辑 + } + } +} + +func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + value, exists := m.getKlineDataMap(_time).Load(symbol) + if !exists { + // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) + apiClient := NewAPIClient() + klines, err := apiClient.GetKlines(symbol, _time, 40) + if err != nil { + return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) + } + return klines, fmt.Errorf("symbol不存在") + } + return value.([]Kline), nil +} + +func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) { + value, exists := m.featuresMap.Load(symbol) + if !exists { + return nil, false + } + return value.(*SymbolFeatures), true +} + +func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures { + features := make(map[string]*SymbolFeatures) + m.featuresMap.Range(func(key, value interface{}) bool { + features[key.(string)] = value.(*SymbolFeatures) + return true + }) + return features +} + +func (m *WSMonitor) Close() { + m.wsClient.Close() + close(m.alertsChan) +} +func (m *WSMonitor) printFilterStats(nember int) { + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + var monitoredSymbols []string + m.filterSymbols.Range(func(key, value interface{}) bool { + monitoredSymbols = append(monitoredSymbols, key.(string)) + return true + }) + + log.Printf("🎯 监控统计 - 总数: %d, 币种: %v", + len(monitoredSymbols), monitoredSymbols) + + // 打印前5个评分最高的币种 + type symbolScore struct { + symbol string + score float64 + } + var topScores []symbolScore + + m.symbolStats.Range(func(key, value interface{}) bool { + symbol := key.(string) + stats := value.(*SymbolStats) + topScores = append(topScores, symbolScore{symbol, stats.Score}) + return true + }) + + // 按评分排序 + sort.Slice(topScores, func(i, j int) bool { + return topScores[i].score > topScores[j].score + }) + m.FilterSymbol = nil + if len(topScores) > 0 { + log.Printf("🏆 评分TOP%v:", nember) + for i := 0; i < len(topScores) && i < nember; i++ { + m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol) + log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score) + } + } + } +} + +// evaluateSymbolScore 评估币种得分,决定是否保留 +func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 { + score := 0.0 + + // 交易量活跃度评分 (权重: 40%) + if features.VolumeRatio5 > 1.5 { + score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0) + } + + // 价格波动评分 (权重: 30%) + volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数 + score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分 + + // RSI活跃度评分 (权重: 20%) + if features.RSI14 < 30 || features.RSI14 > 70 { + score += 20 // RSI在极端区域 + } else if features.RSI14 < 40 || features.RSI14 > 60 { + score += 10 // RSI在活跃区域 + } + + // 交易量趋势评分 (权重: 10%) + if features.VolumeTrend > 1.2 { + score += 10 * math.Min(features.VolumeTrend/3.0, 1.0) + } + + return score +} + +// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除 +func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool { + value, exists := m.symbolStats.Load(symbol) + if !exists { + return true // 没有统计信息,移除 + } + + stats := value.(*SymbolStats) + + // 规则1: 超过30分钟没有活跃迹象 + if time.Since(stats.LastActiveTime) > 30*time.Minute { + log.Printf("🔻 %s 因长时间不活跃被移除", symbol) + return true + } + + // 规则2: 评分持续低于阈值 (最近5次评分平均) + if stats.Score < 15 { // 调整这个阈值 + log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score) + return true + } + + // 规则3: 超过2小时没有产生警报 + if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 { + log.Printf("🔻 %s 因长时间无新警报被移除", symbol) + return true + } + + return false +} + +// updateSymbolStats 更新币种统计信息 +func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) { + now := time.Now() + + value, exists := m.symbolStats.Load(symbol) + var stats *SymbolStats + + if !exists { + stats = &SymbolStats{ + LastActiveTime: now, + Score: m.evaluateSymbolScore(symbol, features), + } + } else { + stats = value.(*SymbolStats) + stats.LastActiveTime = now + + // 平滑更新评分 (指数移动平均) + newScore := m.evaluateSymbolScore(symbol, features) + stats.Score = 0.7*stats.Score + 0.3*newScore + } + + if hasAlert { + stats.AlertCount++ + stats.LastAlertTime = now + } + + if features.VolumeRatio5 > 2.0 { + stats.VolumeSpikeCount++ + } + + m.symbolStats.Store(symbol, stats) +} + +// removeFromFilter 从FilterSymbols中移除币种 +func (m *WSMonitor) removeFromFilter(symbol string) { + + // 从filterSymbols中移除 + m.filterSymbols.Delete(symbol) + m.symbolStats.Delete(symbol) + + log.Printf("🗑️ 已移除币种监控: %s", symbol) +} + +// cleanupInactiveSymbols 定期清理不活跃的币种 +func (m *WSMonitor) cleanupInactiveSymbols() { + ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次 + defer ticker.Stop() + + for range ticker.C { + var symbolsToRemove []string + + // 收集需要移除的币种 + m.filterSymbols.Range(func(key, value interface{}) bool { + symbol := key.(string) + if m.shouldRemoveFromFilter(symbol) { + symbolsToRemove = append(symbolsToRemove, symbol) + } + return true + }) + + // 执行移除操作 + for _, symbol := range symbolsToRemove { + m.removeFromFilter(symbol) + } + + if len(symbolsToRemove) > 0 { + log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove)) + } + } +} + +// getSymbolScore 获取币种当前评分 +func (m *WSMonitor) getSymbolScore(symbol string) float64 { + value, exists := m.symbolStats.Load(symbol) + if !exists { + return 0 + } + return value.(*SymbolStats).Score +} diff --git a/market/types.go b/market/types.go new file mode 100644 index 00000000..82f44415 --- /dev/null +++ b/market/types.go @@ -0,0 +1,157 @@ +package market + +import "time" + +// Data 市场数据结构 +type Data struct { + Symbol string + CurrentPrice float64 + PriceChange1h float64 // 1小时价格变化百分比 + PriceChange4h float64 // 4小时价格变化百分比 + CurrentEMA20 float64 + CurrentMACD float64 + CurrentRSI7 float64 + OpenInterest *OIData + FundingRate float64 + IntradaySeries *IntradayData + LongerTermContext *LongerTermData +} + +// OIData Open Interest数据 +type OIData struct { + Latest float64 + Average float64 +} + +// IntradayData 日内数据(3分钟间隔) +type IntradayData struct { + MidPrices []float64 + EMA20Values []float64 + MACDValues []float64 + RSI7Values []float64 + RSI14Values []float64 +} + +// LongerTermData 长期数据(4小时时间框架) +type LongerTermData struct { + EMA20 float64 + EMA50 float64 + ATR3 float64 + ATR14 float64 + CurrentVolume float64 + AverageVolume float64 + MACDValues []float64 + RSI14Values []float64 +} + +// Binance API 响应结构 +type ExchangeInfo struct { + Symbols []SymbolInfo `json:"symbols"` +} + +type SymbolInfo struct { + Symbol string `json:"symbol"` + Status string `json:"status"` + BaseAsset string `json:"baseAsset"` + QuoteAsset string `json:"quoteAsset"` + ContractType string `json:"contractType"` + PricePrecision int `json:"pricePrecision"` + QuantityPrecision int `json:"quantityPrecision"` +} + +type Kline struct { + OpenTime int64 `json:"openTime"` + Open float64 `json:"open"` + High float64 `json:"high"` + Low float64 `json:"low"` + Close float64 `json:"close"` + Volume float64 `json:"volume"` + CloseTime int64 `json:"closeTime"` + QuoteVolume float64 `json:"quoteVolume"` + Trades int `json:"trades"` + TakerBuyBaseVolume float64 `json:"takerBuyBaseVolume"` + TakerBuyQuoteVolume float64 `json:"takerBuyQuoteVolume"` +} + +type KlineResponse []interface{} + +type PriceTicker struct { + Symbol string `json:"symbol"` + Price string `json:"price"` +} + +type Ticker24hr struct { + Symbol string `json:"symbol"` + PriceChange string `json:"priceChange"` + PriceChangePercent string `json:"priceChangePercent"` + Volume string `json:"volume"` + QuoteVolume string `json:"quoteVolume"` +} + +// 特征数据结构 +type SymbolFeatures struct { + Symbol string `json:"symbol"` + Timestamp time.Time `json:"timestamp"` + Price float64 `json:"price"` + PriceChange15Min float64 `json:"price_change_15min"` + PriceChange1H float64 `json:"price_change_1h"` + PriceChange4H float64 `json:"price_change_4h"` + Volume float64 `json:"volume"` + VolumeRatio5 float64 `json:"volume_ratio_5"` + VolumeRatio20 float64 `json:"volume_ratio_20"` + VolumeTrend float64 `json:"volume_trend"` + RSI14 float64 `json:"rsi_14"` + SMA5 float64 `json:"sma_5"` + SMA10 float64 `json:"sma_10"` + SMA20 float64 `json:"sma_20"` + HighLowRatio float64 `json:"high_low_ratio"` + Volatility20 float64 `json:"volatility_20"` + PositionInRange float64 `json:"position_in_range"` +} + +// 警报数据结构 +type Alert struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + Value float64 `json:"value"` + Threshold float64 `json:"threshold"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` +} + +type Config struct { + AlertThresholds AlertThresholds `json:"alert_thresholds"` + UpdateInterval int `json:"update_interval"` // seconds + CleanupConfig CleanupConfig `json:"cleanup_config"` +} + +type AlertThresholds struct { + VolumeSpike float64 `json:"volume_spike"` + PriceChange15Min float64 `json:"price_change_15min"` + VolumeTrend float64 `json:"volume_trend"` + RSIOverbought float64 `json:"rsi_overbought"` + RSIOversold float64 `json:"rsi_oversold"` +} +type CleanupConfig struct { + InactiveTimeout time.Duration `json:"inactive_timeout"` // 不活跃超时时间 + MinScoreThreshold float64 `json:"min_score_threshold"` // 最低评分阈值 + NoAlertTimeout time.Duration `json:"no_alert_timeout"` // 无警报超时时间 + CheckInterval time.Duration `json:"check_interval"` // 检查间隔 +} + +var config = Config{ + AlertThresholds: AlertThresholds{ + VolumeSpike: 3.0, + PriceChange15Min: 0.05, + VolumeTrend: 2.0, + RSIOverbought: 70, + RSIOversold: 30, + }, + CleanupConfig: CleanupConfig{ + InactiveTimeout: 30 * time.Minute, + MinScoreThreshold: 15.0, + NoAlertTimeout: 20 * time.Minute, + CheckInterval: 5 * time.Minute, + }, + UpdateInterval: 60, // 1 minute +} diff --git a/market/websocket_client.go b/market/websocket_client.go new file mode 100644 index 00000000..ce151691 --- /dev/null +++ b/market/websocket_client.go @@ -0,0 +1,231 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WSClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} +} + +type WSMessage struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` +} + +type KlineWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + Kline struct { + StartTime int64 `json:"t"` + CloseTime int64 `json:"T"` + Symbol string `json:"s"` + Interval string `json:"i"` + FirstTradeID int64 `json:"f"` + LastTradeID int64 `json:"L"` + OpenPrice string `json:"o"` + ClosePrice string `json:"c"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + NumberOfTrades int `json:"n"` + IsFinal bool `json:"x"` + QuoteVolume string `json:"q"` + TakerBuyBaseVolume string `json:"V"` + TakerBuyQuoteVolume string `json:"Q"` + } `json:"k"` +} + +type TickerWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + PriceChange string `json:"p"` + PriceChangePercent string `json:"P"` + WeightedAvgPrice string `json:"w"` + LastPrice string `json:"c"` + LastQty string `json:"Q"` + OpenPrice string `json:"o"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + QuoteVolume string `json:"q"` + OpenTime int64 `json:"O"` + CloseTime int64 `json:"C"` + FirstID int64 `json:"F"` + LastID int64 `json:"L"` + Count int `json:"n"` +} + +func NewWSClient() *WSClient { + return &WSClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + } +} + +func (w *WSClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.Dial("wss://ws-fapi.binance.com/ws-fapi/v1", nil) + if err != nil { + return fmt.Errorf("WebSocket连接失败: %v", err) + } + + w.mu.Lock() + w.conn = conn + w.mu.Unlock() + + log.Println("WebSocket连接成功") + + // 启动消息读取循环 + go w.readMessages() + + return nil +} + +func (w *WSClient) SubscribeKline(symbol, interval string) error { + stream := fmt.Sprintf("%s@kline_%s", symbol, interval) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeTicker(symbol string) error { + stream := fmt.Sprintf("%s@ticker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeMiniTicker(symbol string) error { + stream := fmt.Sprintf("%s@miniTicker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) subscribe(stream string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": []string{stream}, + "id": time.Now().Unix(), + } + + w.mu.RLock() + defer w.mu.RUnlock() + + if w.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + err := w.conn.WriteJSON(subscribeMsg) + if err != nil { + return err + } + + log.Printf("订阅流: %s", stream) + return nil +} + +func (w *WSClient) readMessages() { + for { + select { + case <-w.done: + return + default: + w.mu.RLock() + conn := w.conn + w.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取WebSocket消息失败: %v", err) + w.handleReconnect() + return + } + + w.handleMessage(message) + } + } +} + +func (w *WSClient) handleMessage(message []byte) { + var wsMsg WSMessage + if err := json.Unmarshal(message, &wsMsg); err != nil { + // 可能是其他格式的消息 + return + } + + w.mu.RLock() + ch, exists := w.subscribers[wsMsg.Stream] + w.mu.RUnlock() + + if exists { + select { + case ch <- wsMsg.Data: + default: + log.Printf("订阅者通道已满: %s", wsMsg.Stream) + } + } +} + +func (w *WSClient) handleReconnect() { + if !w.reconnect { + return + } + + log.Println("尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := w.Connect(); err != nil { + log.Printf("重新连接失败: %v", err) + go w.handleReconnect() + } +} + +func (w *WSClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + w.mu.Lock() + w.subscribers[stream] = ch + w.mu.Unlock() + return ch +} + +func (w *WSClient) RemoveSubscriber(stream string) { + w.mu.Lock() + delete(w.subscribers, stream) + w.mu.Unlock() +} + +func (w *WSClient) Close() { + w.reconnect = false + close(w.done) + + w.mu.Lock() + defer w.mu.Unlock() + + if w.conn != nil { + w.conn.Close() + w.conn = nil + } + + // 关闭所有订阅者通道 + for stream, ch := range w.subscribers { + close(ch) + delete(w.subscribers, stream) + } +} From 7302f96e8e33fbcff83691ebd66e178d857fe420 Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sun, 2 Nov 2025 14:03:13 +0800 Subject: [PATCH 2/3] =?UTF-8?q?K=E7=BA=BF=E8=8E=B7=E5=8F=96=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E6=94=B9=E4=B8=BAwebsocket=E7=BB=84=E5=90=88=E6=B5=81?= =?UTF-8?q?.=20=E5=B8=A6=E9=87=8D=E6=8B=A8=E6=9C=BA=E5=88=B6=20=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E4=B8=BA=E4=B8=8B=EF=BC=9A=201.=20=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=97=B6=E4=BD=BF=E7=94=A8=E6=89=80=E6=9C=89=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E8=AE=BE=E7=BD=AE=E7=9A=84=E5=B8=81=E7=A7=8D(?= =?UTF-8?q?=E5=8E=BB=E9=87=8D)=20=E5=A6=82=E6=9E=9C=E4=BA=A4=E6=98=93?= =?UTF-8?q?=E5=91=98=E6=9C=AA=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=88=99=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=B3=BB=E7=BB=9F=E9=BB=98=E8=AE=A4=202.=20=E5=9C=A8?= =?UTF-8?q?=E5=86=B3=E7=AD=96=E8=8E=B7=E5=8F=96K=E7=BA=BF=E6=97=B6=20?= =?UTF-8?q?=E5=A6=82=E6=9E=9C=E6=B2=A1=E6=9C=89=E7=BC=93=E5=AD=98=20?= =?UTF-8?q?=E5=88=99=E5=85=88=E5=AE=9E=E6=97=B6=E8=8E=B7=E5=8F=96=E5=90=8E?= =?UTF-8?q?=E5=86=8D=E6=B7=BB=E5=8A=A0=E8=AE=A2=E9=98=85.=20ps:=20?= =?UTF-8?q?=E9=80=82=E7=94=A8=E4=BA=8EApi=E6=96=B9=E5=BC=8F=E7=9A=84?= =?UTF-8?q?=E5=B8=81=E7=A7=8D=E5=88=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 2 +- config/database.go | 215 +++++++++++++++++++++++++-------------------- main.go | 29 +----- market/monitor.go | 112 +++++++++++------------ 4 files changed, 179 insertions(+), 179 deletions(-) diff --git a/config/config.go b/config/config.go index 3b736d0e..430e428c 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,7 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/config/database.go b/config/database.go index c5eef755..1102c6fb 100644 --- a/config/database.go +++ b/config/database.go @@ -4,8 +4,11 @@ import ( "crypto/rand" "database/sql" "encoding/base32" + "encoding/json" "fmt" "log" + "nofx/market" + "slices" "strings" "time" @@ -177,17 +180,18 @@ func (d *Database) createTables() error { `ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`, - `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 - `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 - `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) - `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 - `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 - `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 - `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 - `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 + `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 + `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) + `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 + `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 + `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 + `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 + `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN use_inside_coins BOOLEAN DEFAULT 0`, // 是否使用内置AI评分信号源 `ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称 - `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 - `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 + `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 + `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 } for _, query := range alterQueries { @@ -245,16 +249,16 @@ func (d *Database) initDefaultData() error { // 初始化系统配置 - 创建所有字段,设置默认值,后续由config.json同步更新 systemConfigs := map[string]string{ - "admin_mode": "true", // 默认开启管理员模式,便于首次使用 - "api_server_port": "8080", // 默认API端口 - "use_default_coins": "true", // 默认使用内置币种列表 - "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) - "max_daily_loss": "10.0", // 最大日损失百分比 - "max_drawdown": "20.0", // 最大回撤百分比 - "stop_trading_minutes": "60", // 停止交易时间(分钟) - "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 - "altcoin_leverage": "5", // 山寨币杠杆倍数 - "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 + "admin_mode": "true", // 默认开启管理员模式,便于首次使用 + "api_server_port": "8080", // 默认API端口 + "use_default_coins": "true", // 默认使用内置币种列表 + "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) + "max_daily_loss": "10.0", // 最大日损失百分比 + "max_drawdown": "20.0", // 最大回撤百分比 + "stop_trading_minutes": "60", // 停止交易时间(分钟) + "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 + "altcoin_leverage": "5", // 山寨币杠杆倍数 + "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 } for key, value := range systemConfigs { @@ -281,14 +285,14 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return err } - + // 如果已经迁移过,直接返回 if count > 0 { return nil } - + log.Printf("🔄 开始迁移exchanges表...") - + // 创建新的exchanges表,使用复合主键 _, err = d.db.Exec(` CREATE TABLE exchanges_new ( @@ -313,7 +317,7 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建新exchanges表失败: %w", err) } - + // 复制数据到新表 _, err = d.db.Exec(` INSERT INTO exchanges_new @@ -322,19 +326,19 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("复制数据失败: %w", err) } - + // 删除旧表 _, err = d.db.Exec(`DROP TABLE exchanges`) if err != nil { return fmt.Errorf("删除旧表失败: %w", err) } - + // 重命名新表 _, err = d.db.Exec(`ALTER TABLE exchanges_new RENAME TO exchanges`) if err != nil { return fmt.Errorf("重命名表失败: %w", err) } - + // 重新创建触发器 _, err = d.db.Exec(` CREATE TRIGGER IF NOT EXISTS update_exchanges_updated_at @@ -347,20 +351,20 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建触发器失败: %w", err) } - + log.Printf("✅ exchanges表迁移完成") return nil } // User 用户配置 type User struct { - ID string `json:"id"` - Email string `json:"email"` - PasswordHash string `json:"-"` // 不返回到前端 - OTPSecret string `json:"-"` // 不返回到前端 - OTPVerified bool `json:"otp_verified"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Email string `json:"email"` + PasswordHash string `json:"-"` // 不返回到前端 + OTPSecret string `json:"-"` // 不返回到前端 + OTPVerified bool `json:"otp_verified"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AIModelConfig AI模型配置 @@ -379,39 +383,40 @@ type AIModelConfig struct { // ExchangeConfig 交易所配置 type ExchangeConfig struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - Type string `json:"type"` - Enabled bool `json:"enabled"` - APIKey string `json:"apiKey"` - SecretKey string `json:"secretKey"` - Testnet bool `json:"testnet"` + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + Type string `json:"type"` + Enabled bool `json:"enabled"` + APIKey string `json:"apiKey"` + SecretKey string `json:"secretKey"` + Testnet bool `json:"testnet"` // Hyperliquid 特定字段 HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"` // Aster 特定字段 - AsterUser string `json:"asterUser"` - AsterSigner string `json:"asterSigner"` - AsterPrivateKey string `json:"asterPrivateKey"` + AsterUser string `json:"asterUser"` + AsterSigner string `json:"asterSigner"` + AsterPrivateKey string `json:"asterPrivateKey"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // TraderRecord 交易员配置(数据库实体) type TraderRecord struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - AIModelID string `json:"ai_model_id"` - ExchangeID string `json:"exchange_id"` - InitialBalance float64 `json:"initial_balance"` - ScanIntervalMinutes int `json:"scan_interval_minutes"` - IsRunning bool `json:"is_running"` - BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 - AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + AIModelID string `json:"ai_model_id"` + ExchangeID string `json:"exchange_id"` + InitialBalance float64 `json:"initial_balance"` + ScanIntervalMinutes int `json:"scan_interval_minutes"` + IsRunning bool `json:"is_running"` + BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 + AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔 UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源 UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源 + UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置评分信号源 CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称 @@ -422,12 +427,12 @@ type TraderRecord struct { // UserSignalSource 用户信号源配置 type UserSignalSource struct { - ID int `json:"id"` - UserID string `json:"user_id"` - CoinPoolURL string `json:"coin_pool_url"` - OITopURL string `json:"oi_top_url"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID int `json:"id"` + UserID string `json:"user_id"` + CoinPoolURL string `json:"coin_pool_url"` + OITopURL string `json:"oi_top_url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // GenerateOTPSecret 生成OTP密钥 @@ -457,12 +462,12 @@ func (d *Database) EnsureAdminUser() error { if err != nil { return err } - + // 如果已存在,直接返回 if count > 0 { return nil } - + // 创建admin用户(密码为空,因为管理员模式下不需要密码) adminUser := &User{ ID: "admin", @@ -471,7 +476,7 @@ func (d *Database) EnsureAdminUser() error { OTPSecret: "", OTPVerified: true, } - + return d.CreateUser(adminUser) } @@ -482,7 +487,7 @@ func (d *Database) GetUserByEmail(email string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE email = ? `, email).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -498,7 +503,7 @@ func (d *Database) GetUserByID(userID string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE id = ? `, userID).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -668,7 +673,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { err := rows.Scan( &exchange.ID, &exchange.UserID, &exchange.Name, &exchange.Type, &exchange.Enabled, &exchange.APIKey, &exchange.SecretKey, &exchange.Testnet, - &exchange.HyperliquidWalletAddr, &exchange.AsterUser, + &exchange.HyperliquidWalletAddr, &exchange.AsterUser, &exchange.AsterSigner, &exchange.AsterPrivateKey, &exchange.CreatedAt, &exchange.UpdatedAt, ) @@ -684,7 +689,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { // UpdateExchange 更新交易所配置,如果不存在则创建用户特定配置 func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secretKey string, testnet bool, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey string) error { log.Printf("🔧 UpdateExchange: userID=%s, id=%s, enabled=%v", userID, id, enabled) - + // 首先尝试更新现有的用户配置 result, err := d.db.Exec(` UPDATE exchanges SET enabled = ?, api_key = ?, secret_key = ?, testnet = ?, @@ -695,20 +700,20 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre log.Printf("❌ UpdateExchange: 更新失败: %v", err) return err } - + // 检查是否有行被更新 rowsAffected, err := result.RowsAffected() if err != nil { log.Printf("❌ UpdateExchange: 获取影响行数失败: %v", err) return err } - + log.Printf("📊 UpdateExchange: 影响行数 = %d", rowsAffected) - + // 如果没有行被更新,说明用户没有这个交易所的配置,需要创建 if rowsAffected == 0 { log.Printf("💡 UpdateExchange: 没有现有记录,创建新记录") - + // 根据交易所ID确定基本信息 var name, typ string if id == "binance" { @@ -724,16 +729,16 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre name = id + " Exchange" typ = "cex" } - + log.Printf("🆕 UpdateExchange: 创建新记录 ID=%s, name=%s, type=%s", id, name, typ) - + // 创建用户特定的配置,使用原始的交易所ID _, err = d.db.Exec(` INSERT INTO exchanges (id, user_id, name, type, enabled, api_key, secret_key, testnet, hyperliquid_wallet_addr, aster_user, aster_signer, aster_private_key, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) `, id, userID, name, typ, enabled, apiKey, secretKey, testnet, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey) - + if err != nil { log.Printf("❌ UpdateExchange: 创建记录失败: %v", err) } else { @@ -741,7 +746,7 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre } return err } - + log.Printf("✅ UpdateExchange: 更新现有记录成功") return nil } @@ -767,9 +772,9 @@ func (d *Database) CreateExchange(userID, id, name, typ string, enabled bool, ap // CreateTrader 创建交易员 func (d *Database) CreateTrader(trader *TraderRecord) error { _, err := d.db.Exec(` - INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) + INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, use_inside_coins, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?) + `, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.UseInsideCoins, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin) return err } @@ -779,7 +784,7 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage, COALESCE(trading_symbols, '') as trading_symbols, - COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top, + COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,COALESCE(use_inside_coins, 0) as use_inside_coins, COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt, COALESCE(system_prompt_template, 'default') as system_prompt_template, COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at @@ -790,14 +795,14 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { } defer rows.Close() - var traders []*TraderRecord + var traders []*TraderRecord for rows.Next() { - var trader TraderRecord + var trader TraderRecord err := rows.Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, - &trader.UseCoinPool, &trader.UseOITop, + &trader.UseCoinPool, &trader.UseOITop, &trader.UseInsideCoins, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, @@ -847,18 +852,13 @@ func (d *Database) DeleteTrader(userID, id string) error { // GetTraderConfig 获取交易员完整配置(包含AI模型和交易所信息) func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) { - var trader TraderRecord + var trader TraderRecord var aiModel AIModelConfig var exchange ExchangeConfig err := d.db.QueryRow(` SELECT - t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, - COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage, - COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool, - COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt, - COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin, - t.created_at, t.updated_at, + t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at, a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at, e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet, COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr, @@ -873,8 +873,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM `, traderID, userID).Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, - &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool, - &trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, &aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey, &aiModel.CreatedAt, &aiModel.UpdatedAt, @@ -940,7 +938,36 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string) return err } +// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies +func (d *Database) GetCustomCoins() []string { + var symbol string + var symbols []string + _ = d.db.QueryRow(` + SELECT GROUP_CONCAT(custom_coins , ',') as symbol + FROM main.traders where custom_coins != '' + `).Scan(&symbol) + // 检测用户是否未配置币种 - 兼容性 + if symbol == "" { + symbolJSON, _ := d.GetSystemConfig("default_coins") + if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil { + log.Printf("⚠️ 解析default_coins配置失败: %v,使用硬编码默认值", err) + symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"} + } + } + // filter Symbol + for _, s := range strings.Split(symbol, ",") { + if s == "" { + continue + } + coin := market.Normalize(s) + if !slices.Contains(symbols, coin) { + symbols = append(symbols, coin) + } + } + return symbols +} + // Close 关闭数据库连接 func (d *Database) Close() error { return d.db.Close() -} \ No newline at end of file +} diff --git a/main.go b/main.go index 7929072b..1ffc562e 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( "strconv" "strings" "syscall" - "time" ) // LeverageConfig 杠杆配置 @@ -30,9 +29,9 @@ type ConfigFile struct { APIServerPort int `json:"api_server_port"` UseDefaultCoins bool `json:"use_default_coins"` DefaultCoins []string `json:"default_coins"` - InsideCoins bool `json:"inside_coins"` CoinPoolAPIURL string `json:"coin_pool_api_url"` OITopAPIURL string `json:"oi_top_api_url"` + InsideCoins bool `json:"inside_coins"` MaxDailyLoss float64 `json:"max_daily_loss"` MaxDrawdown float64 `json:"max_drawdown"` StopTradingMinutes int `json:"stop_trading_minutes"` @@ -68,9 +67,9 @@ func syncConfigToDatabase(database *config.Database) error { "admin_mode": fmt.Sprintf("%t", configFile.AdminMode), "api_server_port": strconv.Itoa(configFile.APIServerPort), "use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins), - "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "coin_pool_api_url": configFile.CoinPoolAPIURL, "oi_top_api_url": configFile.OITopAPIURL, + "inside_coins": fmt.Sprintf("%t", configFile.InsideCoins), "max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss), "max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown), "stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes), @@ -137,8 +136,6 @@ func main() { // 获取系统配置 useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" - InsideCoinsStr, _ := database.GetSystemConfig("inside_coins") - insideCoins := InsideCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") // 获取管理员模式配置 @@ -186,26 +183,6 @@ func main() { } pool.SetDefaultCoins(defaultCoins) - - //内置AI评分 - if insideCoins { - log.Printf("✓ 启用内置AI评分币种列表") - monitor := market.NewWSMonitor(150) - go func() { - monitor.Start() - // 定时器设置默认的币种列表 - 覆蓋defaultCoins设置 - for { - if len(monitor.FilterSymbol) > 0 { - for _, coin := range defaultCoins { - monitor.FilterSymbol = append(monitor.FilterSymbol, coin) - } - pool.SetDefaultCoins(monitor.FilterSymbol) - monitor.FilterSymbol = nil - } - time.Sleep(1 * time.Minute) - } - }() - } // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -286,6 +263,8 @@ func main() { } }() + // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 + go market.NewWSMonitor(150).Start(database.GetCustomCoins()) // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/monitor.go b/market/monitor.go index 9837623e..b751dfe8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -35,6 +35,7 @@ type SymbolStats struct { } var WSMonitorCli *WSMonitor +var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期 func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ @@ -47,23 +48,27 @@ func NewWSMonitor(batchSize int) *WSMonitor { return WSMonitorCli } -func (m *WSMonitor) Initialize() error { +func (m *WSMonitor) Initialize(coins []string) error { log.Println("初始化WebSocket监控器...") - // 获取交易对信息 apiClient := NewAPIClient() - exchangeInfo, err := apiClient.GetExchangeInfo() - if err != nil { - return err + // 如果不指定交易对,则使用market市场的所有交易对币种 + if len(coins) == 0 { + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { + m.symbols = append(m.symbols, symbol.Symbol) + } + } + } else { + m.symbols = coins } - // 筛选永续合约交易对 --仅测试时使用 - //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] - for _, symbol := range exchangeInfo.Symbols { - if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" { - m.symbols = append(m.symbols, Normalize(symbol.Symbol)) - } - } log.Printf("找到 %d 个交易对", len(m.symbols)) // 初始化历史数据 if err := m.initializeHistoricalData(); err != nil { @@ -114,10 +119,10 @@ func (m *WSMonitor) initializeHistoricalData() error { return nil } -func (m *WSMonitor) Start() { +func (m *WSMonitor) Start(coins []string) { log.Printf("启动WebSocket实时监控...") // 初始化交易对 - err := m.Initialize() + err := m.Initialize(coins) if err != nil { log.Fatalf("❌ 初始化币种: %v", err) return @@ -129,42 +134,43 @@ func (m *WSMonitor) Start() { return } // 启动警报处理器 - go m.handleAlerts() + //go m.handleAlerts() // 启动定期清理任务 - go m.cleanupInactiveSymbols() + //go m.cleanupInactiveSymbols() // 输出监控统计 - 评分前十名 - go m.printFilterStats(50) + //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() - if err != nil { log.Fatalf("❌ 订阅币种交易对: %v", err) return } } +// subscribeSymbol 注册监听 +func (m *WSMonitor) subscribeSymbol(symbol, st string) []string { + var streams []string + stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st) + ch := m.combinedClient.AddSubscriber(stream, 100) + streams = append(streams, stream) + go m.handleKlineData(symbol, ch, st) + + return streams +} func (m *WSMonitor) subscribeAll() error { // 执行批量订阅 log.Println("开始订阅所有交易对...") for _, symbol := range m.symbols { - stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol)) - ch3m := m.combinedClient.AddSubscriber(stream3m, 100) - go m.handleKlineData(symbol, ch3m, "3m") - - stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol)) - ch4h := m.combinedClient.AddSubscriber(stream4h, 100) - go m.handleKlineData(symbol, ch4h, "4h") + for _, st := range subKlineTime { + m.subscribeSymbol(symbol, st) + } } - - err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m") - if err != nil { - log.Fatalf("❌ 订阅3m K线: %v", err) - return err - } - err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h") - if err != nil { - log.Fatalf("❌ 订阅4h K线: %v", err) - return err + for _, st := range subKlineTime { + err := m.combinedClient.BatchSubscribeKlines(m.symbols, st) + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } } log.Println("所有交易对订阅完成") return nil @@ -181,34 +187,14 @@ func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time strin } } -func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) { - for data := range ch { - var tickerData TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - - m.processTickerUpdate(symbol, tickerData) - } -} -func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) { - for data := range ch { - var tickerData []TickerWSData - if err := json.Unmarshal(data, &tickerData); err != nil { - log.Printf("解析Ticker数据失败: %v", err) - continue - } - log.Fatalln(tickerData) - //m.processTickerUpdate(symbol, tickerData) - } -} func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { var klineDataMap *sync.Map if _time == "3m" { klineDataMap = &m.klineDataMap3m - } else { + } else if _time == "4h" { klineDataMap = &m.klineDataMap4h + } else { + klineDataMap = &sync.Map{} } return klineDataMap } @@ -310,11 +296,19 @@ func (m *WSMonitor) handleAlerts() { } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + // 对每一个进来的symbol检测是否存在内类 是否的话就订阅它 value, exists := m.getKlineDataMap(_time).Load(symbol) if !exists { // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) apiClient := NewAPIClient() - klines, err := apiClient.GetKlines(symbol, _time, 40) + klines, err := apiClient.GetKlines(symbol, _time, 100) + m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存 + subStr := m.subscribeSymbol(symbol, _time) + subErr := m.combinedClient.subscribeStreams(subStr) + log.Printf("动态订阅流: %v", subStr) + if subErr != nil { + return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr) + } if err != nil { return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) } From d8582475d320e92170913b992344a5e6b876a95e Mon Sep 17 00:00:00 2001 From: yuanshi2016 <103150111@qq.com> Date: Sun, 2 Nov 2025 17:59:19 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9Kline=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E4=B8=BAWebsocket=E7=BC=93=E5=AD=98=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.json.example | 1 - config/config.go | 1 - docs/architecture/README.md | 7 +- docs/architecture/README.zh-CN.md | 5 + go.mod | 10 +- main.go | 1 + market/feature_engine.go | 229 -------------------------- market/monitor.go | 262 +----------------------------- 8 files changed, 21 insertions(+), 495 deletions(-) delete mode 100644 market/feature_engine.go diff --git a/config.json.example b/config.json.example index 87b01edd..ac9d5ac6 100644 --- a/config.json.example +++ b/config.json.example @@ -5,7 +5,6 @@ "altcoin_leverage": 5 }, "use_default_coins": true, - "inside_coins": true, "default_coins": [ "BTCUSDT", "ETHUSDT", diff --git a/config/config.go b/config/config.go index 430e428c..37a537db 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,6 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 2c1a2f6f..fb233a31 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -51,7 +51,12 @@ nofx/ │ ├── market/ # Market data fetching │ └── data.go # Market data & technical indicators (TA-Lib) -│ +│ └── api_client.go # Market data acquisition API +│ └── websocket_client.go # Market data acquisition WebSocket interface +│ └── combined_streams.go # Market data acquisition: Combined streaming (single link to subscribe to multiple cryptocurrencies) +│ └── monitor.go # Market data cache +│ └── types.go # market structure + ├── pool/ # Coin pool management │ └── coin_pool.go # AI500 + OI Top merged pool │ diff --git a/docs/architecture/README.zh-CN.md b/docs/architecture/README.zh-CN.md index 36732b09..4acc0f90 100644 --- a/docs/architecture/README.zh-CN.md +++ b/docs/architecture/README.zh-CN.md @@ -51,6 +51,11 @@ nofx/ │ ├── market/ # 市场数据获取 │ └── data.go # 市场数据与技术指标(TA-Lib) +│ └── api_client.go # 行情获取 Api接口 +│ └── websocket_client.go # 行情获取 Websocket接口 +│ └── combined_streams.go # 行情获取 组合流式(单链接订阅多个币种) +│ └── monitor.go # 行情数据缓存 +│ └── types.go # market结构体 │ ├── pool/ # 币种池管理 │ └── coin_pool.go # AI500 + OI Top 合并池 diff --git a/go.mod b/go.mod index 0c6dcfde..067172fd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/gin-gonic/gin v1.11.0 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/google/uuid v1.6.0 - github.com/mattn/go-sqlite3 v1.14.32 + github.com/gorilla/websocket v1.5.3 + github.com/mattn/go-sqlite3 v1.14.16 github.com/pquerna/otp v1.4.0 github.com/sonirico/go-hyperliquid v0.17.0 golang.org/x/crypto v0.42.0 @@ -26,6 +27,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/go-sysinfo v1.15.4 // indirect github.com/elastic/go-windows v1.0.2 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect @@ -37,7 +39,6 @@ require ( github.com/go-playground/validator/v10 v10.27.0 // indirect github.com/goccy/go-json v0.10.4 // indirect github.com/goccy/go-yaml v1.18.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -55,6 +56,7 @@ require ( github.com/prometheus/procfs v0.17.0 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/sonirico/vago v0.9.0 // indirect @@ -78,4 +80,8 @@ require ( golang.org/x/tools v0.36.0 // indirect google.golang.org/protobuf v1.36.9 // indirect howett.net/plist v1.0.1 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect ) diff --git a/main.go b/main.go index 1ffc562e..36537b50 100644 --- a/main.go +++ b/main.go @@ -265,6 +265,7 @@ func main() { // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 go market.NewWSMonitor(150).Start(database.GetCustomCoins()) + //go market.NewWSMonitor(150).Start([]string{}) //这里是一个使用方式 传入空的话 则使用market市场的所有币种 // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/feature_engine.go b/market/feature_engine.go deleted file mode 100644 index 91540a29..00000000 --- a/market/feature_engine.go +++ /dev/null @@ -1,229 +0,0 @@ -package market - -import ( - "fmt" - "math" - "time" -) - -type FeatureEngine struct { - alertThresholds AlertThresholds -} - -func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine { - return &FeatureEngine{ - alertThresholds: thresholds, - } -} - -func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures { - if len(klines) < 20 { - return nil - } - - features := &SymbolFeatures{ - Symbol: symbol, - Timestamp: time.Now(), - } - - // 提取价格和交易量数据 - closes := make([]float64, len(klines)) - volumes := make([]float64, len(klines)) - highs := make([]float64, len(klines)) - lows := make([]float64, len(klines)) - - for i, k := range klines { - closes[i] = k.Close - volumes[i] = k.Volume - highs[i] = k.High - lows[i] = k.Low - } - - // 价格特征 - features.Price = closes[len(closes)-1] - features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2] - - if len(closes) >= 5 { - features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5] - } - if len(closes) >= 17 { - features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17] - } - - // 交易量特征 - currentVolume := volumes[len(volumes)-1] - features.Volume = currentVolume - - // 5周期平均交易量 - if len(volumes) >= 6 { - avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1]) - features.VolumeRatio5 = currentVolume / avgVolume5 - } - - // 20周期平均交易量 - if len(volumes) >= 21 { - avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1]) - features.VolumeRatio20 = currentVolume / avgVolume20 - } - - // 交易量趋势 - if features.VolumeRatio20 > 0 { - features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20 - } - - // 技术指标 - features.RSI14 = e.calculateRSI(closes, 14) - features.SMA5 = e.calculateSMA(closes, 5) - features.SMA10 = e.calculateSMA(closes, 10) - features.SMA20 = e.calculateSMA(closes, 20) - - // 波动特征 - currentHigh := highs[len(highs)-1] - currentLow := lows[len(lows)-1] - features.HighLowRatio = (currentHigh - currentLow) / features.Price - features.Volatility20 = e.calculateVolatility(closes, 20) - - // 价格在区间中的位置 - if currentHigh != currentLow { - features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow) - } else { - features.PositionInRange = 0.5 - } - - return features -} - -func (e *FeatureEngine) calculateAverage(values []float64) float64 { - sum := 0.0 - for _, v := range values { - sum += v - } - return sum / float64(len(values)) -} - -func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - return e.calculateAverage(prices[len(prices)-period:]) -} - -func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 { - if len(prices) <= period { - return 50 - } - - gains := make([]float64, 0) - losses := make([]float64, 0) - - for i := 1; i < len(prices); i++ { - change := prices[i] - prices[i-1] - if change > 0 { - gains = append(gains, change) - losses = append(losses, 0) - } else { - gains = append(gains, 0) - losses = append(losses, -change) - } - } - - // 只取最近period个数据点 - if len(gains) > period { - gains = gains[len(gains)-period:] - losses = losses[len(losses)-period:] - } - - avgGain := e.calculateAverage(gains) - avgLoss := e.calculateAverage(losses) - - if avgLoss == 0 { - return 100 - } - - rs := avgGain / avgLoss - return 100 - (100 / (1 + rs)) -} - -func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - - periodPrices := prices[len(prices)-period:] - mean := e.calculateAverage(periodPrices) - - variance := 0.0 - for _, price := range periodPrices { - variance += math.Pow(price-mean, 2) - } - variance /= float64(len(periodPrices)) - - return math.Sqrt(variance) / mean -} - -func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert { - var alerts []Alert - - // 交易量放大检测 - if features.VolumeRatio5 > e.alertThresholds.VolumeSpike { - alerts = append(alerts, Alert{ - Type: "VOLUME_SPIKE", - Symbol: features.Symbol, - Value: features.VolumeRatio5, - Threshold: e.alertThresholds.VolumeSpike, - Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5), - Timestamp: time.Now(), - }) - } - - // 15分钟价格异动 - if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min { - direction := "上涨" - if features.PriceChange15Min < 0 { - direction = "下跌" - } - alerts = append(alerts, Alert{ - Type: "PRICE_CHANGE_15MIN", - Symbol: features.Symbol, - Value: features.PriceChange15Min, - Threshold: e.alertThresholds.PriceChange15Min, - Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100), - Timestamp: time.Now(), - }) - } - - // 交易量趋势 - if features.VolumeTrend > e.alertThresholds.VolumeTrend { - alerts = append(alerts, Alert{ - Type: "VOLUME_TREND", - Symbol: features.Symbol, - Value: features.VolumeTrend, - Threshold: e.alertThresholds.VolumeTrend, - Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend), - Timestamp: time.Now(), - }) - } - - // RSI超买超卖 - if features.RSI14 > e.alertThresholds.RSIOverbought { - alerts = append(alerts, Alert{ - Type: "RSI_OVERBOUGHT", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOverbought, - Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } else if features.RSI14 < e.alertThresholds.RSIOversold { - alerts = append(alerts, Alert{ - Type: "RSI_OVERSOLD", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOversold, - Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } - - return alerts -} diff --git a/market/monitor.go b/market/monitor.go index b751dfe8..337640d8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -4,8 +4,6 @@ import ( "encoding/json" "fmt" "log" - "math" - "sort" "strings" "sync" "time" @@ -14,7 +12,6 @@ import ( type WSMonitor struct { wsClient *WSClient combinedClient *CombinedStreamsClient - featureEngine *FeatureEngine symbols []string featuresMap sync.Map alertsChan chan Alert @@ -41,7 +38,6 @@ func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ wsClient: NewWSClient(), combinedClient: NewCombinedStreamsClient(batchSize), - featureEngine: NewFeatureEngine(config.AlertThresholds), alertsChan: make(chan Alert, 1000), batchSize: batchSize, } @@ -63,6 +59,7 @@ func (m *WSMonitor) Initialize(coins []string) error { for _, symbol := range exchangeInfo.Symbols { if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { m.symbols = append(m.symbols, symbol.Symbol) + m.filterSymbols.Store(symbol.Symbol, true) } } } else { @@ -133,12 +130,6 @@ func (m *WSMonitor) Start(coins []string) { log.Fatalf("❌ 批量订阅流: %v", err) return } - // 启动警报处理器 - //go m.handleAlerts() - // 启动定期清理任务 - //go m.cleanupInactiveSymbols() - // 输出监控统计 - 评分前十名 - //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() if err != nil { @@ -239,60 +230,6 @@ func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time } klineDataMap.Store(symbol, klines) - // 计算特征并检测警报 - if len(klines) >= 20 { - features := m.featureEngine.CalculateFeatures(symbol, klines) - if features != nil { - m.featuresMap.Store(symbol, features) - - alerts := m.featureEngine.DetectAlerts(features) - hasAlert := len(alerts) > 0 - - // 更新统计信息 - m.updateSymbolStats(symbol, features, hasAlert) - - for _, alert := range alerts { - m.alertsChan <- alert - } - - // 实时日志输出重要特征 - if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 { - //log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f", - // symbol, features.Price, features.PriceChange15Min*100, - // features.VolumeRatio5, features.RSI14) - } - } - } -} - -func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) { - // 存储ticker数据 - m.tickerDataMap.Store(symbol, tickerData) -} - -func (m *WSMonitor) handleAlerts() { - alertCounts := make(map[string]int) - lastReset := time.Now() - - for alert := range m.alertsChan { - // 重置计数器(每小时) - if time.Since(lastReset) > time.Hour { - alertCounts = make(map[string]int) - lastReset = time.Now() - } - - // 警报去重和频率控制 - alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type) - alertCounts[alertKey]++ - m.filterSymbols.Store(alert.Symbol, true) - - //log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message) - if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报 - //log.Printf("🚨 实时警报: %s", alert.Message) - - // 这里可以添加其他警报处理逻辑 - } - } } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { @@ -317,204 +254,7 @@ func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, erro return value.([]Kline), nil } -func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) { - value, exists := m.featuresMap.Load(symbol) - if !exists { - return nil, false - } - return value.(*SymbolFeatures), true -} - -func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures { - features := make(map[string]*SymbolFeatures) - m.featuresMap.Range(func(key, value interface{}) bool { - features[key.(string)] = value.(*SymbolFeatures) - return true - }) - return features -} - func (m *WSMonitor) Close() { m.wsClient.Close() close(m.alertsChan) } -func (m *WSMonitor) printFilterStats(nember int) { - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - - for range ticker.C { - var monitoredSymbols []string - m.filterSymbols.Range(func(key, value interface{}) bool { - monitoredSymbols = append(monitoredSymbols, key.(string)) - return true - }) - - log.Printf("🎯 监控统计 - 总数: %d, 币种: %v", - len(monitoredSymbols), monitoredSymbols) - - // 打印前5个评分最高的币种 - type symbolScore struct { - symbol string - score float64 - } - var topScores []symbolScore - - m.symbolStats.Range(func(key, value interface{}) bool { - symbol := key.(string) - stats := value.(*SymbolStats) - topScores = append(topScores, symbolScore{symbol, stats.Score}) - return true - }) - - // 按评分排序 - sort.Slice(topScores, func(i, j int) bool { - return topScores[i].score > topScores[j].score - }) - m.FilterSymbol = nil - if len(topScores) > 0 { - log.Printf("🏆 评分TOP%v:", nember) - for i := 0; i < len(topScores) && i < nember; i++ { - m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol) - log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score) - } - } - } -} - -// evaluateSymbolScore 评估币种得分,决定是否保留 -func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 { - score := 0.0 - - // 交易量活跃度评分 (权重: 40%) - if features.VolumeRatio5 > 1.5 { - score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0) - } - - // 价格波动评分 (权重: 30%) - volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数 - score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分 - - // RSI活跃度评分 (权重: 20%) - if features.RSI14 < 30 || features.RSI14 > 70 { - score += 20 // RSI在极端区域 - } else if features.RSI14 < 40 || features.RSI14 > 60 { - score += 10 // RSI在活跃区域 - } - - // 交易量趋势评分 (权重: 10%) - if features.VolumeTrend > 1.2 { - score += 10 * math.Min(features.VolumeTrend/3.0, 1.0) - } - - return score -} - -// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除 -func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return true // 没有统计信息,移除 - } - - stats := value.(*SymbolStats) - - // 规则1: 超过30分钟没有活跃迹象 - if time.Since(stats.LastActiveTime) > 30*time.Minute { - log.Printf("🔻 %s 因长时间不活跃被移除", symbol) - return true - } - - // 规则2: 评分持续低于阈值 (最近5次评分平均) - if stats.Score < 15 { // 调整这个阈值 - log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score) - return true - } - - // 规则3: 超过2小时没有产生警报 - if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 { - log.Printf("🔻 %s 因长时间无新警报被移除", symbol) - return true - } - - return false -} - -// updateSymbolStats 更新币种统计信息 -func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) { - now := time.Now() - - value, exists := m.symbolStats.Load(symbol) - var stats *SymbolStats - - if !exists { - stats = &SymbolStats{ - LastActiveTime: now, - Score: m.evaluateSymbolScore(symbol, features), - } - } else { - stats = value.(*SymbolStats) - stats.LastActiveTime = now - - // 平滑更新评分 (指数移动平均) - newScore := m.evaluateSymbolScore(symbol, features) - stats.Score = 0.7*stats.Score + 0.3*newScore - } - - if hasAlert { - stats.AlertCount++ - stats.LastAlertTime = now - } - - if features.VolumeRatio5 > 2.0 { - stats.VolumeSpikeCount++ - } - - m.symbolStats.Store(symbol, stats) -} - -// removeFromFilter 从FilterSymbols中移除币种 -func (m *WSMonitor) removeFromFilter(symbol string) { - - // 从filterSymbols中移除 - m.filterSymbols.Delete(symbol) - m.symbolStats.Delete(symbol) - - log.Printf("🗑️ 已移除币种监控: %s", symbol) -} - -// cleanupInactiveSymbols 定期清理不活跃的币种 -func (m *WSMonitor) cleanupInactiveSymbols() { - ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次 - defer ticker.Stop() - - for range ticker.C { - var symbolsToRemove []string - - // 收集需要移除的币种 - m.filterSymbols.Range(func(key, value interface{}) bool { - symbol := key.(string) - if m.shouldRemoveFromFilter(symbol) { - symbolsToRemove = append(symbolsToRemove, symbol) - } - return true - }) - - // 执行移除操作 - for _, symbol := range symbolsToRemove { - m.removeFromFilter(symbol) - } - - if len(symbolsToRemove) > 0 { - log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove)) - } - } -} - -// getSymbolScore 获取币种当前评分 -func (m *WSMonitor) getSymbolScore(symbol string) float64 { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return 0 - } - return value.(*SymbolStats).Score -}