diff --git a/.github/workflows/pr-checks-advisory.yml b/.github/workflows/pr-checks-advisory.yml index 1c352233..9cab882d 100644 --- a/.github/workflows/pr-checks-advisory.yml +++ b/.github/workflows/pr-checks-advisory.yml @@ -8,12 +8,16 @@ on: # These checks are advisory only - they won't block PR merging # Results will be posted as comments to help contributors improve their PRs +permissions: + contents: read + pull-requests: write + checks: write + issues: write + jobs: pr-info: name: PR Information runs-on: ubuntu-latest - permissions: - pull-requests: write steps: - name: Check PR title format id: check-title @@ -98,8 +102,6 @@ jobs: backend-checks: name: Backend Checks (Advisory) runs-on: ubuntu-latest - permissions: - pull-requests: write continue-on-error: true steps: - uses: actions/checkout@v4 @@ -208,8 +210,6 @@ jobs: frontend-checks: name: Frontend Checks (Advisory) runs-on: ubuntu-latest - permissions: - pull-requests: write continue-on-error: true steps: - uses: actions/checkout@v4 diff --git a/README.md b/README.md index 0fa6210e..b2f9cb83 100644 --- a/README.md +++ b/README.md @@ -124,11 +124,12 @@ A Binance-compatible decentralized perpetual futures exchange! - 🌐 **Multi-chain support** - trade on your preferred EVM chain **Quick Start:** -1. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Connect your main wallet and create an API wallet -3. Copy the API Signer address and Private Key -4. ~~Set `"exchange": "aster"` in config.json~~ *Configure through web interface* -5. Add `"aster_user"`, `"aster_signer"`, and `"aster_private_key"` +1. Register via [Aster Referral Link](https://www.asterdex.com/en/referral/fdfc0e) (get fee discounts!) +2. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Connect your main wallet and create an API wallet +4. Copy the API Signer address and Private Key +5. Set `"exchange": "aster"` in config.json +6. Add `"aster_user"`, `"aster_signer"`, and `"aster_private_key"` --- @@ -406,7 +407,7 @@ Before configuring the system, you need to obtain AI API keys. Choose one of the **How to get Qwen API Key:** -1. **Visit**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Visit**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Register**: Sign up with Alibaba Cloud account 3. **Enable Service**: Activate DashScope service 4. **Create API Key**: @@ -543,12 +544,13 @@ Open your browser and visit: **🌐 http://localhost:3000** - 🌐 Multi-chain support (ETH, BSC, Polygon) - 🌍 No KYC required -**Step 1**: Create Aster API Wallet +**Step 1**: Register and Create Aster API Wallet -1. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Connect your main wallet (MetaMask, WalletConnect, etc.) -3. Click "Create API Wallet" -4. **Save these 3 items immediately:** +1. Register via [Aster Referral Link](https://www.asterdex.com/en/referral/fdfc0e) (get fee discounts!) +2. Visit [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Connect your main wallet (MetaMask, WalletConnect, etc.) +4. Click "Create API Wallet" +5. **Save these 3 items immediately:** - Main Wallet address (User) - API Wallet address (Signer) - API Wallet Private Key (⚠️ shown only once!) @@ -1271,7 +1273,7 @@ We welcome contributions from the community! See our comprehensive guides: - [Binance API](https://binance-docs.github.io/apidocs/futures/en/) - Binance Futures API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - Alibaba Cloud Qwen +- [Qwen](https://dashscope.console.aliyun.com/) - Alibaba Cloud Qwen - [TA-Lib](https://ta-lib.org/) - Technical indicator library - [Recharts](https://recharts.org/) - React chart library diff --git a/config/config.go b/config/config.go index 97fcc84d..37a537db 100644 --- a/config/config.go +++ b/config/config.go @@ -11,7 +11,7 @@ import ( type TraderConfig struct { ID string `json:"id"` Name string `json:"name"` - Enabled bool `json:"enabled"` // 是否启用该trader + Enabled bool `json:"enabled"` // 是否启用该trader AIModel string `json:"ai_model"` // "qwen" or "deepseek" // 交易平台选择(二选一) diff --git a/config/database.go b/config/database.go index 686f9346..719fd07f 100644 --- a/config/database.go +++ b/config/database.go @@ -4,9 +4,12 @@ import ( "crypto/rand" "database/sql" "encoding/base32" + "encoding/json" "fmt" "log" + "nofx/market" "os" + "slices" "strings" "time" @@ -187,17 +190,17 @@ func (d *Database) createTables() error { `ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`, `ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`, - `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 - `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 - `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) - `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 - `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 - `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 - `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 - `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 + `ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式 + `ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种 + `ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式) + `ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数 + `ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数 + `ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔 + `ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源 + `ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源 `ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称 - `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 - `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 + `ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址 + `ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称 } for _, query := range alterQueries { @@ -255,17 +258,17 @@ func (d *Database) initDefaultData() error { // 初始化系统配置 - 创建所有字段,设置默认值,后续由config.json同步更新 systemConfigs := map[string]string{ - "admin_mode": "true", // 默认开启管理员模式,便于首次使用 - "beta_mode": "false", // 默认关闭内测模式 - "api_server_port": "8080", // 默认API端口 - "use_default_coins": "true", // 默认使用内置币种列表 + "admin_mode": "true", // 默认开启管理员模式,便于首次使用 + "beta_mode": "false", // 默认关闭内测模式 + "api_server_port": "8080", // 默认API端口 + "use_default_coins": "true", // 默认使用内置币种列表 "default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式) - "max_daily_loss": "10.0", // 最大日损失百分比 - "max_drawdown": "20.0", // 最大回撤百分比 - "stop_trading_minutes": "60", // 停止交易时间(分钟) - "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 - "altcoin_leverage": "5", // 山寨币杠杆倍数 - "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 + "max_daily_loss": "10.0", // 最大日损失百分比 + "max_drawdown": "20.0", // 最大回撤百分比 + "stop_trading_minutes": "60", // 停止交易时间(分钟) + "btc_eth_leverage": "5", // BTC/ETH杠杆倍数 + "altcoin_leverage": "5", // 山寨币杠杆倍数 + "jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成 } for key, value := range systemConfigs { @@ -292,14 +295,14 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return err } - + // 如果已经迁移过,直接返回 if count > 0 { return nil } - + log.Printf("🔄 开始迁移exchanges表...") - + // 创建新的exchanges表,使用复合主键 _, err = d.db.Exec(` CREATE TABLE exchanges_new ( @@ -324,7 +327,7 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建新exchanges表失败: %w", err) } - + // 复制数据到新表 _, err = d.db.Exec(` INSERT INTO exchanges_new @@ -333,19 +336,19 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("复制数据失败: %w", err) } - + // 删除旧表 _, err = d.db.Exec(`DROP TABLE exchanges`) if err != nil { return fmt.Errorf("删除旧表失败: %w", err) } - + // 重命名新表 _, err = d.db.Exec(`ALTER TABLE exchanges_new RENAME TO exchanges`) if err != nil { return fmt.Errorf("重命名表失败: %w", err) } - + // 重新创建触发器 _, err = d.db.Exec(` CREATE TRIGGER IF NOT EXISTS update_exchanges_updated_at @@ -358,20 +361,20 @@ func (d *Database) migrateExchangesTable() error { if err != nil { return fmt.Errorf("创建触发器失败: %w", err) } - + log.Printf("✅ exchanges表迁移完成") return nil } // User 用户配置 type User struct { - ID string `json:"id"` - Email string `json:"email"` - PasswordHash string `json:"-"` // 不返回到前端 - OTPSecret string `json:"-"` // 不返回到前端 - OTPVerified bool `json:"otp_verified"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Email string `json:"email"` + PasswordHash string `json:"-"` // 不返回到前端 + OTPSecret string `json:"-"` // 不返回到前端 + OTPVerified bool `json:"otp_verified"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // AIModelConfig AI模型配置 @@ -390,36 +393,36 @@ type AIModelConfig struct { // ExchangeConfig 交易所配置 type ExchangeConfig struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - Type string `json:"type"` - Enabled bool `json:"enabled"` - APIKey string `json:"apiKey"` - SecretKey string `json:"secretKey"` - Testnet bool `json:"testnet"` + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + Type string `json:"type"` + Enabled bool `json:"enabled"` + APIKey string `json:"apiKey"` + SecretKey string `json:"secretKey"` + Testnet bool `json:"testnet"` // Hyperliquid 特定字段 HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"` // Aster 特定字段 - AsterUser string `json:"asterUser"` - AsterSigner string `json:"asterSigner"` - AsterPrivateKey string `json:"asterPrivateKey"` + AsterUser string `json:"asterUser"` + AsterSigner string `json:"asterSigner"` + AsterPrivateKey string `json:"asterPrivateKey"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // TraderRecord 交易员配置(数据库实体) type TraderRecord struct { - ID string `json:"id"` - UserID string `json:"user_id"` - Name string `json:"name"` - AIModelID string `json:"ai_model_id"` - ExchangeID string `json:"exchange_id"` - InitialBalance float64 `json:"initial_balance"` - ScanIntervalMinutes int `json:"scan_interval_minutes"` - IsRunning bool `json:"is_running"` - BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 - AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 + ID string `json:"id"` + UserID string `json:"user_id"` + Name string `json:"name"` + AIModelID string `json:"ai_model_id"` + ExchangeID string `json:"exchange_id"` + InitialBalance float64 `json:"initial_balance"` + ScanIntervalMinutes int `json:"scan_interval_minutes"` + IsRunning bool `json:"is_running"` + BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数 + AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数 TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔 UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源 UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源 @@ -433,12 +436,12 @@ type TraderRecord struct { // UserSignalSource 用户信号源配置 type UserSignalSource struct { - ID int `json:"id"` - UserID string `json:"user_id"` - CoinPoolURL string `json:"coin_pool_url"` - OITopURL string `json:"oi_top_url"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID int `json:"id"` + UserID string `json:"user_id"` + CoinPoolURL string `json:"coin_pool_url"` + OITopURL string `json:"oi_top_url"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // GenerateOTPSecret 生成OTP密钥 @@ -468,12 +471,12 @@ func (d *Database) EnsureAdminUser() error { if err != nil { return err } - + // 如果已存在,直接返回 if count > 0 { return nil } - + // 创建admin用户(密码为空,因为管理员模式下不需要密码) adminUser := &User{ ID: "admin", @@ -482,7 +485,7 @@ func (d *Database) EnsureAdminUser() error { OTPSecret: "", OTPVerified: true, } - + return d.CreateUser(adminUser) } @@ -493,7 +496,7 @@ func (d *Database) GetUserByEmail(email string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE email = ? `, email).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -509,7 +512,7 @@ func (d *Database) GetUserByID(userID string) (*User, error) { SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at FROM users WHERE id = ? `, userID).Scan( - &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, + &user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret, &user.OTPVerified, &user.CreatedAt, &user.UpdatedAt, ) if err != nil { @@ -679,7 +682,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { err := rows.Scan( &exchange.ID, &exchange.UserID, &exchange.Name, &exchange.Type, &exchange.Enabled, &exchange.APIKey, &exchange.SecretKey, &exchange.Testnet, - &exchange.HyperliquidWalletAddr, &exchange.AsterUser, + &exchange.HyperliquidWalletAddr, &exchange.AsterUser, &exchange.AsterSigner, &exchange.AsterPrivateKey, &exchange.CreatedAt, &exchange.UpdatedAt, ) @@ -695,7 +698,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) { // UpdateExchange 更新交易所配置,如果不存在则创建用户特定配置 func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secretKey string, testnet bool, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey string) error { log.Printf("🔧 UpdateExchange: userID=%s, id=%s, enabled=%v", userID, id, enabled) - + // 首先尝试更新现有的用户配置 result, err := d.db.Exec(` UPDATE exchanges SET enabled = ?, api_key = ?, secret_key = ?, testnet = ?, @@ -706,20 +709,20 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre log.Printf("❌ UpdateExchange: 更新失败: %v", err) return err } - + // 检查是否有行被更新 rowsAffected, err := result.RowsAffected() if err != nil { log.Printf("❌ UpdateExchange: 获取影响行数失败: %v", err) return err } - + log.Printf("📊 UpdateExchange: 影响行数 = %d", rowsAffected) - + // 如果没有行被更新,说明用户没有这个交易所的配置,需要创建 if rowsAffected == 0 { log.Printf("💡 UpdateExchange: 没有现有记录,创建新记录") - + // 根据交易所ID确定基本信息 var name, typ string if id == "binance" { @@ -735,16 +738,16 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre name = id + " Exchange" typ = "cex" } - + log.Printf("🆕 UpdateExchange: 创建新记录 ID=%s, name=%s, type=%s", id, name, typ) - + // 创建用户特定的配置,使用原始的交易所ID _, err = d.db.Exec(` INSERT INTO exchanges (id, user_id, name, type, enabled, api_key, secret_key, testnet, hyperliquid_wallet_addr, aster_user, aster_signer, aster_private_key, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) `, id, userID, name, typ, enabled, apiKey, secretKey, testnet, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey) - + if err != nil { log.Printf("❌ UpdateExchange: 创建记录失败: %v", err) } else { @@ -752,7 +755,7 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre } return err } - + log.Printf("✅ UpdateExchange: 更新现有记录成功") return nil } @@ -801,9 +804,9 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) { } defer rows.Close() - var traders []*TraderRecord + var traders []*TraderRecord for rows.Next() { - var trader TraderRecord + var trader TraderRecord err := rows.Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, @@ -858,18 +861,13 @@ func (d *Database) DeleteTrader(userID, id string) error { // GetTraderConfig 获取交易员完整配置(包含AI模型和交易所信息) func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) { - var trader TraderRecord + var trader TraderRecord var aiModel AIModelConfig var exchange ExchangeConfig err := d.db.QueryRow(` SELECT - t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, - COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage, - COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool, - COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt, - COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin, - t.created_at, t.updated_at, + t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at, a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at, e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet, COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr, @@ -884,8 +882,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM `, traderID, userID).Scan( &trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID, &trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning, - &trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool, - &trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin, &trader.CreatedAt, &trader.UpdatedAt, &aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey, &aiModel.CreatedAt, &aiModel.UpdatedAt, @@ -951,6 +947,35 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string) return err } +// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies +func (d *Database) GetCustomCoins() []string { + var symbol string + var symbols []string + _ = d.db.QueryRow(` + SELECT GROUP_CONCAT(custom_coins , ',') as symbol + FROM main.traders where custom_coins != '' + `).Scan(&symbol) + // 检测用户是否未配置币种 - 兼容性 + if symbol == "" { + symbolJSON, _ := d.GetSystemConfig("default_coins") + if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil { + log.Printf("⚠️ 解析default_coins配置失败: %v,使用硬编码默认值", err) + symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"} + } + } + // filter Symbol + for _, s := range strings.Split(symbol, ",") { + if s == "" { + continue + } + coin := market.Normalize(s) + if !slices.Contains(symbols, coin) { + symbols = append(symbols, coin) + } + } + return symbols +} + // Close 关闭数据库连接 func (d *Database) Close() error { return d.db.Close() @@ -1056,4 +1081,4 @@ func (d *Database) GetBetaCodeStats() (total, used int, err error) { } return total, used, nil -} \ No newline at end of file +} diff --git a/decision/engine.go b/decision/engine.go index 174ff7d1..df48d534 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -115,7 +115,7 @@ func GetFullDecisionWithCustomPrompt(ctx *Context, mcpClient *mcp.Client, custom // 4. 解析AI响应 decision, err := parseFullDecisionResponse(aiResponse, ctx.Account.TotalEquity, ctx.BTCETHLeverage, ctx.AltcoinLeverage) if err != nil { - return nil, fmt.Errorf("解析AI响应失败: %w", err) + return decision, fmt.Errorf("解析AI响应失败: %w", err) } decision.Timestamp = time.Now() @@ -397,7 +397,7 @@ func parseFullDecisionResponse(aiResponse string, accountEquity float64, btcEthL return &FullDecision{ CoTTrace: cotTrace, Decisions: []Decision{}, - }, fmt.Errorf("提取决策失败: %w\n\n=== AI思维链分析 ===\n%s", err, cotTrace) + }, fmt.Errorf("提取决策失败: %w", err) } // 3. 验证决策 @@ -405,7 +405,7 @@ func parseFullDecisionResponse(aiResponse string, accountEquity float64, btcEthL return &FullDecision{ CoTTrace: cotTrace, Decisions: decisions, - }, fmt.Errorf("决策验证失败: %w\n\n=== AI思维链分析 ===\n%s", err, cotTrace) + }, fmt.Errorf("决策验证失败: %w", err) } return &FullDecision{ diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 2c1a2f6f..fb233a31 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -51,7 +51,12 @@ nofx/ │ ├── market/ # Market data fetching │ └── data.go # Market data & technical indicators (TA-Lib) -│ +│ └── api_client.go # Market data acquisition API +│ └── websocket_client.go # Market data acquisition WebSocket interface +│ └── combined_streams.go # Market data acquisition: Combined streaming (single link to subscribe to multiple cryptocurrencies) +│ └── monitor.go # Market data cache +│ └── types.go # market structure + ├── pool/ # Coin pool management │ └── coin_pool.go # AI500 + OI Top merged pool │ diff --git a/docs/architecture/README.zh-CN.md b/docs/architecture/README.zh-CN.md index 36732b09..4acc0f90 100644 --- a/docs/architecture/README.zh-CN.md +++ b/docs/architecture/README.zh-CN.md @@ -51,6 +51,11 @@ nofx/ │ ├── market/ # 市场数据获取 │ └── data.go # 市场数据与技术指标(TA-Lib) +│ └── api_client.go # 行情获取 Api接口 +│ └── websocket_client.go # 行情获取 Websocket接口 +│ └── combined_streams.go # 行情获取 组合流式(单链接订阅多个币种) +│ └── monitor.go # 行情数据缓存 +│ └── types.go # market结构体 │ ├── pool/ # 币种池管理 │ └── coin_pool.go # AI500 + OI Top 合并池 diff --git a/docs/i18n/ru/README.md b/docs/i18n/ru/README.md index 2feaa824..0554d0b7 100644 --- a/docs/i18n/ru/README.md +++ b/docs/i18n/ru/README.md @@ -117,11 +117,12 @@ NOFX теперь поддерживает **три основные биржи* - 🌐 **Поддержка нескольких цепей** - торгуйте на вашей любимой EVM цепи **Быстрый старт:** -1. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Подключите основной кошелек и создайте API кошелек -3. Скопируйте адрес API Signer и приватный ключ -4. Установите `"exchange": "aster"` в config.json -5. Добавьте `"aster_user"`, `"aster_signer"` и `"aster_private_key"` +1. Зарегистрируйтесь по [реферальной ссылке Aster](https://www.asterdex.com/en/referral/fdfc0e) (получите скидку на комиссии!) +2. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Подключите основной кошелек и создайте API кошелек +4. Скопируйте адрес API Signer и приватный ключ +5. Установите `"exchange": "aster"` в config.json +6. Добавьте `"aster_user"`, `"aster_signer"` и `"aster_private_key"` --- @@ -399,7 +400,7 @@ cd .. **Как получить Qwen API ключ:** -1. **Посетите**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Посетите**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Зарегистрируйтесь**: Используя аккаунт Alibaba Cloud 3. **Активируйте сервис**: Активируйте DashScope сервис 4. **Создайте API ключ**: @@ -534,12 +535,13 @@ cp config.example.jsonc config.json - 🌐 Поддержка нескольких цепей (ETH, BSC, Polygon) - 🌍 Не нужна KYC -**Шаг 1**: Создайте Aster API кошелек +**Шаг 1**: Зарегистрируйтесь и создайте Aster API кошелек -1. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Подключите основной кошелек (MetaMask, WalletConnect и т.д.) -3. Нажмите "Создать API кошелек" -4. **Сохраните эти 3 элемента немедленно:** +1. Зарегистрируйтесь по [реферальной ссылке Aster](https://www.asterdex.com/en/referral/fdfc0e) (получите скидку на комиссии!) +2. Посетите [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Подключите основной кошелек (MetaMask, WalletConnect и т.д.) +4. Нажмите "Создать API кошелек" +5. **Сохраните эти 3 элемента немедленно:** - Адрес основного кошелька (User) - Адрес API кошелька (Signer) - Приватный ключ API кошелька (⚠️ показывается только один раз!) @@ -1094,7 +1096,7 @@ sudo apt-get install libta-lib0-dev - [Binance API](https://binance-docs.github.io/apidocs/futures/en/) - Binance Futures API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - Alibaba Cloud Qwen +- [Qwen](https://dashscope.console.aliyun.com/) - Alibaba Cloud Qwen - [TA-Lib](https://ta-lib.org/) - Библиотека технических индикаторов - [Recharts](https://recharts.org/) - Библиотека графиков React diff --git a/docs/i18n/uk/README.md b/docs/i18n/uk/README.md index 19d506ef..d663c4e0 100644 --- a/docs/i18n/uk/README.md +++ b/docs/i18n/uk/README.md @@ -118,11 +118,12 @@ NOFX тепер підтримує **три основні біржі**: Binance - 🌐 **Підтримка кількох ланцюгів** - торгуйте на вашому улюбленому EVM ланцюзі **Швидкий старт:** -1. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Підключіть основний гаманець і створіть API гаманець -3. Скопіюйте адресу API Signer та приватний ключ -4. Встановіть `"exchange": "aster"` в config.json -5. Додайте `"aster_user"`, `"aster_signer"` та `"aster_private_key"` +1. Зареєструйтеся за [реферальним посиланням Aster](https://www.asterdex.com/en/referral/fdfc0e) (отримайте знижку на комісії!) +2. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Підключіть основний гаманець і створіть API гаманець +4. Скопіюйте адресу API Signer та приватний ключ +5. Встановіть `"exchange": "aster"` в config.json +6. Додайте `"aster_user"`, `"aster_signer"` та `"aster_private_key"` --- @@ -402,7 +403,7 @@ cd .. **Як отримати Qwen API ключ:** -1. **Відвідайте**: [https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **Відвідайте**: [https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **Зареєструйтеся**: Використовуючи акаунт Alibaba Cloud 3. **Активуйте сервіс**: Активуйте DashScope сервіс 4. **Створіть API ключ**: @@ -537,12 +538,13 @@ cp config.example.jsonc config.json - 🌐 Підтримка кількох ланцюгів (ETH, BSC, Polygon) - 🌍 Не потрібна KYC -**Крок 1**: Створіть Aster API гаманець +**Крок 1**: Зареєструйтеся та створіть Aster API гаманець -1. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) -2. Підключіть основний гаманець (MetaMask, WalletConnect тощо) -3. Натисніть "Створити API гаманець" -4. **Збережіть ці 3 елементи негайно:** +1. Зареєструйтеся за [реферальним посиланням Aster](https://www.asterdex.com/en/referral/fdfc0e) (отримайте знижку на комісії!) +2. Відвідайте [Aster API Wallet](https://www.asterdex.com/en/api-wallet) +3. Підключіть основний гаманець (MetaMask, WalletConnect тощо) +4. Натисніть "Створити API гаманець" +5. **Збережіть ці 3 елементи негайно:** - Адреса основного гаманця (User) - Адреса API гаманця (Signer) - Приватний ключ API гаманця (⚠️ показується лише один раз!) diff --git a/docs/i18n/zh-CN/README.md b/docs/i18n/zh-CN/README.md index 29d69c8e..472dc56b 100644 --- a/docs/i18n/zh-CN/README.md +++ b/docs/i18n/zh-CN/README.md @@ -126,11 +126,12 @@ NOFX现已支持**三大交易所**:Binance、Hyperliquid和Aster DEX! - 🌐 **多链支持** - 在你喜欢的EVM链上交易 **快速开始:** -1. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) -2. 连接你的主钱包并创建API钱包 -3. 复制API Signer地址和私钥 -4. ~~在config.json中设置`"exchange": "aster"`~~ *通过Web界面配置* -5. 添加`"aster_user"`、`"aster_signer"`和`"aster_private_key"` +1. 通过[推荐链接注册Aster](https://www.asterdex.com/en/referral/fdfc0e)(享手续费优惠) +2. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) +3. 连接你的主钱包并创建API钱包 +4. 复制API Signer地址和私钥 +5. 在config.json中设置`"exchange": "aster"` +6. 添加`"aster_user"`、`"aster_signer"`和`"aster_private_key"` --- @@ -398,7 +399,7 @@ cd .. **如何获取Qwen API密钥:** -1. **访问**:[https://dashscope.aliyuncs.com](https://dashscope.aliyuncs.com) +1. **访问**:[https://dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) 2. **注册**:使用阿里云账户注册 3. **开通服务**:激活DashScope服务 4. **创建API密钥**: @@ -535,12 +536,13 @@ cp config.example.jsonc config.json - 🌐 多链支持(ETH、BSC、Polygon) - 🌍 无需KYC -**步骤1**:创建Aster API钱包 +**步骤1**:注册并创建Aster API钱包 -1. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) -2. 连接你的主钱包(MetaMask、WalletConnect等) -3. 点击"创建API钱包" -4. **立即保存这3项:** +1. 通过[推荐链接注册Aster](https://www.asterdex.com/en/referral/fdfc0e)(享手续费优惠) +2. 访问[Aster API钱包](https://www.asterdex.com/en/api-wallet) +3. 连接你的主钱包(MetaMask、WalletConnect等) +4. 点击"创建API钱包" +5. **立即保存这3项:** - 主钱包地址(User) - API钱包地址(Signer) - API钱包私钥(⚠️ 仅显示一次!) @@ -1290,7 +1292,7 @@ MIT License - 详见 [LICENSE](LICENSE) 文件 - [Binance API](https://binance-docs.github.io/apidocs/futures/cn/) - 币安合约API - [DeepSeek](https://platform.deepseek.com/) - DeepSeek AI API -- [Qwen](https://dashscope.aliyuncs.com/) - 阿里云通义千问 +- [Qwen](https://dashscope.console.aliyun.com/) - 阿里云通义千问 - [TA-Lib](https://ta-lib.org/) - 技术指标库 - [Recharts](https://recharts.org/) - React图表库 diff --git a/go.mod b/go.mod index 0c6dcfde..72291ee0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/gin-gonic/gin v1.11.0 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/google/uuid v1.6.0 - github.com/mattn/go-sqlite3 v1.14.32 + github.com/gorilla/websocket v1.5.3 + github.com/mattn/go-sqlite3 v1.14.16 github.com/pquerna/otp v1.4.0 github.com/sonirico/go-hyperliquid v0.17.0 golang.org/x/crypto v0.42.0 @@ -37,7 +38,6 @@ require ( github.com/go-playground/validator/v10 v10.27.0 // indirect github.com/goccy/go-json v0.10.4 // indirect github.com/goccy/go-yaml v1.18.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/go.sum b/go.sum index d0d7d69a..655fcf92 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= -github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= diff --git a/main.go b/main.go index 69fa8064..8aa83dde 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "nofx/auth" "nofx/config" "nofx/manager" + "nofx/market" "nofx/pool" "os" "os/signal" @@ -36,6 +37,7 @@ type ConfigFile struct { StopTradingMinutes int `json:"stop_trading_minutes"` Leverage LeverageConfig `json:"leverage"` JWTSecret string `json:"jwt_secret"` + DataKLineTime string `json:"data_k_line_time"` } // syncConfigToDatabase 从config.json读取配置并同步到数据库 @@ -175,11 +177,11 @@ func main() { useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins") useDefaultCoins := useDefaultCoinsStr == "true" apiPortStr, _ := database.GetSystemConfig("api_server_port") - + // 获取管理员模式配置 adminModeStr, _ := database.GetSystemConfig("admin_mode") adminMode := adminModeStr != "false" // 默认为true - + // 设置JWT密钥 jwtSecret, _ := database.GetSystemConfig("jwt_secret") if jwtSecret == "" { @@ -187,7 +189,7 @@ func main() { log.Printf("⚠️ 使用默认JWT密钥,建议在生产环境中配置") } auth.SetJWTSecret(jwtSecret) - + // 在管理员模式下,确保admin用户存在 if adminMode { err := database.EnsureAdminUser() @@ -198,7 +200,7 @@ func main() { } auth.SetAdminMode(true) } - + log.Printf("✓ 配置数据库初始化成功") fmt.Println() @@ -221,7 +223,6 @@ func main() { } pool.SetDefaultCoins(defaultCoins) - // 设置是否使用默认主流币种 pool.SetUseDefaultCoins(useDefaultCoins) if useDefaultCoins { @@ -234,7 +235,7 @@ func main() { pool.SetCoinPoolAPI(coinPoolAPIURL) log.Printf("✓ 已配置AI500币种池API") } - + oiTopAPIURL, _ := database.GetSystemConfig("oi_top_api_url") if oiTopAPIURL != "" { pool.SetOITopAPI(oiTopAPIURL) @@ -250,37 +251,26 @@ func main() { log.Fatalf("❌ 加载交易员失败: %v", err) } - // 获取所有用户的交易员配置(用于显示) - userIDs, err := database.GetAllUsers() + // 获取数据库中的所有交易员配置(用于显示,使用default用户) + traders, err := database.GetTraders("default") if err != nil { - log.Printf("⚠️ 获取用户列表失败: %v", err) - userIDs = []string{"default"} // 回退到default用户 - } - - var allTraders []*config.TraderRecord - for _, userID := range userIDs { - traders, err := database.GetTraders(userID) - if err != nil { - log.Printf("⚠️ 获取用户 %s 的交易员失败: %v", userID, err) - continue - } - allTraders = append(allTraders, traders...) + log.Fatalf("❌ 获取交易员列表失败: %v", err) } // 显示加载的交易员信息 fmt.Println() fmt.Println("🤖 数据库中的AI交易员配置:") - if len(allTraders) == 0 { + if len(traders) == 0 { fmt.Println(" • 暂无配置的交易员,请通过Web界面创建") } else { - for _, trader := range allTraders { + for _, trader := range traders { status := "停止" if trader.IsRunning { status = "运行中" } - fmt.Printf(" • %s (%s + %s) - 用户: %s - 初始资金: %.0f USDT [%s]\n", - trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), - trader.UserID, trader.InitialBalance, status) + fmt.Printf(" • %s (%s + %s) - 初始资金: %.0f USDT [%s]\n", + trader.Name, strings.ToUpper(trader.AIModelID), strings.ToUpper(trader.ExchangeID), + trader.InitialBalance, status) } } @@ -298,7 +288,7 @@ func main() { fmt.Println() // 获取API服务器端口 - apiPort := 8080 // 默认端口 + apiPort := 8080 // 默认端口 if apiPortStr != "" { if port, err := strconv.Atoi(apiPortStr); err == nil { apiPort = port @@ -313,6 +303,9 @@ func main() { } }() + // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 + go market.NewWSMonitor(150).Start(database.GetCustomCoins()) + //go market.NewWSMonitor(150).Start([]string{}) //这里是一个使用方式 传入空的话 则使用market市场的所有币种 // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/api_client.go b/market/api_client.go new file mode 100644 index 00000000..70bb1150 --- /dev/null +++ b/market/api_client.go @@ -0,0 +1,150 @@ +package market + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strconv" + "time" +) + +const ( + baseURL = "https://fapi.binance.com" +) + +type APIClient struct { + client *http.Client +} + +func NewAPIClient() *APIClient { + return &APIClient{ + client: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (c *APIClient) GetExchangeInfo() (*ExchangeInfo, error) { + url := fmt.Sprintf("%s/fapi/v1/exchangeInfo", baseURL) + resp, err := c.client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var exchangeInfo ExchangeInfo + err = json.Unmarshal(body, &exchangeInfo) + if err != nil { + return nil, err + } + + return &exchangeInfo, nil +} + +func (c *APIClient) GetKlines(symbol, interval string, limit int) ([]Kline, error) { + url := fmt.Sprintf("%s/fapi/v1/klines", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + q.Add("interval", interval) + q.Add("limit", strconv.Itoa(limit)) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var klineResponses []KlineResponse + err = json.Unmarshal(body, &klineResponses) + if err != nil { + return nil, err + } + + var klines []Kline + for _, kr := range klineResponses { + kline, err := parseKline(kr) + if err != nil { + log.Printf("解析K线数据失败: %v", err) + continue + } + klines = append(klines, kline) + } + + return klines, nil +} + +func parseKline(kr KlineResponse) (Kline, error) { + var kline Kline + + if len(kr) < 11 { + return kline, fmt.Errorf("invalid kline data") + } + + // 解析各个字段 + kline.OpenTime = int64(kr[0].(float64)) + kline.Open, _ = strconv.ParseFloat(kr[1].(string), 64) + kline.High, _ = strconv.ParseFloat(kr[2].(string), 64) + kline.Low, _ = strconv.ParseFloat(kr[3].(string), 64) + kline.Close, _ = strconv.ParseFloat(kr[4].(string), 64) + kline.Volume, _ = strconv.ParseFloat(kr[5].(string), 64) + kline.CloseTime = int64(kr[6].(float64)) + kline.QuoteVolume, _ = strconv.ParseFloat(kr[7].(string), 64) + kline.Trades = int(kr[8].(float64)) + kline.TakerBuyBaseVolume, _ = strconv.ParseFloat(kr[9].(string), 64) + kline.TakerBuyQuoteVolume, _ = strconv.ParseFloat(kr[10].(string), 64) + + return kline, nil +} + +func (c *APIClient) GetCurrentPrice(symbol string) (float64, error) { + url := fmt.Sprintf("%s/fapi/v1/ticker/price", baseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + + q := req.URL.Query() + q.Add("symbol", symbol) + req.URL.RawQuery = q.Encode() + + resp, err := c.client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var ticker PriceTicker + err = json.Unmarshal(body, &ticker) + if err != nil { + return 0, err + } + + price, err := strconv.ParseFloat(ticker.Price, 64) + if err != nil { + return 0, err + } + + return price, nil +} diff --git a/market/combined_streams.go b/market/combined_streams.go new file mode 100644 index 00000000..801d423e --- /dev/null +++ b/market/combined_streams.go @@ -0,0 +1,202 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type CombinedStreamsClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} + batchSize int // 每批订阅的流数量 +} + +func NewCombinedStreamsClient(batchSize int) *CombinedStreamsClient { + return &CombinedStreamsClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + batchSize: batchSize, + } +} + +func (c *CombinedStreamsClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + // 组合流使用不同的端点 + conn, _, err := dialer.Dial("wss://fstream.binance.com/stream", nil) + if err != nil { + return fmt.Errorf("组合流WebSocket连接失败: %v", err) + } + + c.mu.Lock() + c.conn = conn + c.mu.Unlock() + + log.Println("组合流WebSocket连接成功") + go c.readMessages() + + return nil +} + +// BatchSubscribeKlines 批量订阅K线 +func (c *CombinedStreamsClient) BatchSubscribeKlines(symbols []string, interval string) error { + // 将symbols分批处理 + batches := c.splitIntoBatches(symbols, c.batchSize) + + for i, batch := range batches { + log.Printf("订阅第 %d 批, 数量: %d", i+1, len(batch)) + + streams := make([]string, len(batch)) + for j, symbol := range batch { + streams[j] = fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), interval) + } + + if err := c.subscribeStreams(streams); err != nil { + return fmt.Errorf("第 %d 批订阅失败: %v", i+1, err) + } + + // 批次间延迟,避免被限制 + if i < len(batches)-1 { + time.Sleep(100 * time.Millisecond) + } + } + + return nil +} + +// splitIntoBatches 将切片分成指定大小的批次 +func (c *CombinedStreamsClient) splitIntoBatches(symbols []string, batchSize int) [][]string { + var batches [][]string + + for i := 0; i < len(symbols); i += batchSize { + end := i + batchSize + if end > len(symbols) { + end = len(symbols) + } + batches = append(batches, symbols[i:end]) + } + + return batches +} + +// subscribeStreams 订阅多个流 +func (c *CombinedStreamsClient) subscribeStreams(streams []string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": streams, + "id": time.Now().UnixNano(), + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if c.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + log.Printf("订阅流: %v", streams) + return c.conn.WriteJSON(subscribeMsg) +} + +func (c *CombinedStreamsClient) readMessages() { + for { + select { + case <-c.done: + return + default: + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取组合流消息失败: %v", err) + c.handleReconnect() + return + } + + c.handleCombinedMessage(message) + } + } +} + +func (c *CombinedStreamsClient) handleCombinedMessage(message []byte) { + var combinedMsg struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` + } + + if err := json.Unmarshal(message, &combinedMsg); err != nil { + log.Printf("解析组合消息失败: %v", err) + return + } + + c.mu.RLock() + ch, exists := c.subscribers[combinedMsg.Stream] + c.mu.RUnlock() + + if exists { + select { + case ch <- combinedMsg.Data: + default: + log.Printf("订阅者通道已满: %s", combinedMsg.Stream) + } + } +} + +func (c *CombinedStreamsClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + c.mu.Lock() + c.subscribers[stream] = ch + c.mu.Unlock() + return ch +} + +func (c *CombinedStreamsClient) handleReconnect() { + if !c.reconnect { + return + } + + log.Println("组合流尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := c.Connect(); err != nil { + log.Printf("组合流重新连接失败: %v", err) + go c.handleReconnect() + } +} + +func (c *CombinedStreamsClient) Close() { + c.reconnect = false + close(c.done) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + for stream, ch := range c.subscribers { + close(ch) + delete(c.subscribers, stream) + } +} diff --git a/market/data.go b/market/data.go index 97812e64..cd40be75 100644 --- a/market/data.go +++ b/market/data.go @@ -10,72 +10,20 @@ import ( "strings" ) -// Data 市场数据结构 -type Data struct { - Symbol string - CurrentPrice float64 - PriceChange1h float64 // 1小时价格变化百分比 - PriceChange4h float64 // 4小时价格变化百分比 - CurrentEMA20 float64 - CurrentMACD float64 - CurrentRSI7 float64 - OpenInterest *OIData - FundingRate float64 - IntradaySeries *IntradayData - LongerTermContext *LongerTermData -} - -// OIData Open Interest数据 -type OIData struct { - Latest float64 - Average float64 -} - -// IntradayData 日内数据(3分钟间隔) -type IntradayData struct { - MidPrices []float64 - EMA20Values []float64 - MACDValues []float64 - RSI7Values []float64 - RSI14Values []float64 -} - -// LongerTermData 长期数据(4小时时间框架) -type LongerTermData struct { - EMA20 float64 - EMA50 float64 - ATR3 float64 - ATR14 float64 - CurrentVolume float64 - AverageVolume float64 - MACDValues []float64 - RSI14Values []float64 -} - -// Kline K线数据 -type Kline struct { - OpenTime int64 - Open float64 - High float64 - Low float64 - Close float64 - Volume float64 - CloseTime int64 -} - // Get 获取指定代币的市场数据 func Get(symbol string) (*Data, error) { + var klines3m, klines4h []Kline + var err error // 标准化symbol symbol = Normalize(symbol) - // 获取3分钟K线数据 (最近10个) - klines3m, err := getKlines(symbol, "3m", 40) // 多获取一些用于计算 + klines3m, err = WSMonitorCli.GetCurrentKlines(symbol, "3m") // 多获取一些用于计算 if err != nil { return nil, fmt.Errorf("获取3分钟K线失败: %v", err) } // 获取4小时K线数据 (最近10个) - klines4h, err := getKlines(symbol, "4h", 60) // 多获取用于计算指标 + klines4h, err = WSMonitorCli.GetCurrentKlines(symbol, "4h") // 多获取用于计算指标 if err != nil { return nil, fmt.Errorf("获取4小时K线失败: %v", err) } @@ -136,51 +84,6 @@ func Get(symbol string) (*Data, error) { }, nil } -// getKlines 从Binance获取K线数据 -func getKlines(symbol, interval string, limit int) ([]Kline, error) { - url := fmt.Sprintf("https://fapi.binance.com/fapi/v1/klines?symbol=%s&interval=%s&limit=%d", - symbol, interval, limit) - - resp, err := http.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - var rawData [][]interface{} - if err := json.Unmarshal(body, &rawData); err != nil { - return nil, err - } - - klines := make([]Kline, len(rawData)) - for i, item := range rawData { - openTime := int64(item[0].(float64)) - open, _ := parseFloat(item[1]) - high, _ := parseFloat(item[2]) - low, _ := parseFloat(item[3]) - close, _ := parseFloat(item[4]) - volume, _ := parseFloat(item[5]) - closeTime := int64(item[6].(float64)) - - klines[i] = Kline{ - OpenTime: openTime, - Open: open, - High: high, - Low: low, - Close: close, - Volume: volume, - CloseTime: closeTime, - } - } - - return klines, nil -} - // calculateEMA 计算EMA func calculateEMA(klines []Kline, period int) float64 { if len(klines) < period { diff --git a/market/monitor.go b/market/monitor.go new file mode 100644 index 00000000..337640d8 --- /dev/null +++ b/market/monitor.go @@ -0,0 +1,260 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" +) + +type WSMonitor struct { + wsClient *WSClient + combinedClient *CombinedStreamsClient + symbols []string + featuresMap sync.Map + alertsChan chan Alert + klineDataMap3m sync.Map // 存储每个交易对的K线历史数据 + klineDataMap4h sync.Map // 存储每个交易对的K线历史数据 + tickerDataMap sync.Map // 存储每个交易对的ticker数据 + batchSize int + filterSymbols sync.Map // 使用sync.Map来存储需要监控的币种和其状态 + symbolStats sync.Map // 存储币种统计信息 + FilterSymbol []string //经过筛选的币种 +} +type SymbolStats struct { + LastActiveTime time.Time + AlertCount int + VolumeSpikeCount int + LastAlertTime time.Time + Score float64 // 综合评分 +} + +var WSMonitorCli *WSMonitor +var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期 + +func NewWSMonitor(batchSize int) *WSMonitor { + WSMonitorCli = &WSMonitor{ + wsClient: NewWSClient(), + combinedClient: NewCombinedStreamsClient(batchSize), + alertsChan: make(chan Alert, 1000), + batchSize: batchSize, + } + return WSMonitorCli +} + +func (m *WSMonitor) Initialize(coins []string) error { + log.Println("初始化WebSocket监控器...") + // 获取交易对信息 + apiClient := NewAPIClient() + // 如果不指定交易对,则使用market市场的所有交易对币种 + if len(coins) == 0 { + exchangeInfo, err := apiClient.GetExchangeInfo() + if err != nil { + return err + } + // 筛选永续合约交易对 --仅测试时使用 + //exchangeInfo.Symbols = exchangeInfo.Symbols[0:2] + for _, symbol := range exchangeInfo.Symbols { + if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { + m.symbols = append(m.symbols, symbol.Symbol) + m.filterSymbols.Store(symbol.Symbol, true) + } + } + } else { + m.symbols = coins + } + + log.Printf("找到 %d 个交易对", len(m.symbols)) + // 初始化历史数据 + if err := m.initializeHistoricalData(); err != nil { + log.Printf("初始化历史数据失败: %v", err) + } + + return nil +} + +func (m *WSMonitor) initializeHistoricalData() error { + apiClient := NewAPIClient() + + var wg sync.WaitGroup + semaphore := make(chan struct{}, 5) // 限制并发数 + + for _, symbol := range m.symbols { + wg.Add(1) + semaphore <- struct{}{} + + go func(s string) { + defer wg.Done() + defer func() { <-semaphore }() + + // 获取历史K线数据 + klines, err := apiClient.GetKlines(s, "3m", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines) > 0 { + m.klineDataMap3m.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-3m: %d 条", s, len(klines)) + } + // 获取历史K线数据 + klines4h, err := apiClient.GetKlines(s, "4h", 100) + if err != nil { + log.Printf("获取 %s 历史数据失败: %v", s, err) + return + } + if len(klines4h) > 0 { + m.klineDataMap4h.Store(s, klines) + log.Printf("已加载 %s 的历史K线数据-4h: %d 条", s, len(klines)) + } + }(symbol) + } + + wg.Wait() + return nil +} + +func (m *WSMonitor) Start(coins []string) { + log.Printf("启动WebSocket实时监控...") + // 初始化交易对 + err := m.Initialize(coins) + if err != nil { + log.Fatalf("❌ 初始化币种: %v", err) + return + } + + err = m.combinedClient.Connect() + if err != nil { + log.Fatalf("❌ 批量订阅流: %v", err) + return + } + // 订阅所有交易对 + err = m.subscribeAll() + if err != nil { + log.Fatalf("❌ 订阅币种交易对: %v", err) + return + } +} + +// subscribeSymbol 注册监听 +func (m *WSMonitor) subscribeSymbol(symbol, st string) []string { + var streams []string + stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st) + ch := m.combinedClient.AddSubscriber(stream, 100) + streams = append(streams, stream) + go m.handleKlineData(symbol, ch, st) + + return streams +} +func (m *WSMonitor) subscribeAll() error { + // 执行批量订阅 + log.Println("开始订阅所有交易对...") + for _, symbol := range m.symbols { + for _, st := range subKlineTime { + m.subscribeSymbol(symbol, st) + } + } + for _, st := range subKlineTime { + err := m.combinedClient.BatchSubscribeKlines(m.symbols, st) + if err != nil { + log.Fatalf("❌ 订阅3m K线: %v", err) + return err + } + } + log.Println("所有交易对订阅完成") + return nil +} + +func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time string) { + for data := range ch { + var klineData KlineWSData + if err := json.Unmarshal(data, &klineData); err != nil { + log.Printf("解析Kline数据失败: %v", err) + continue + } + m.processKlineUpdate(symbol, klineData, _time) + } +} + +func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map { + var klineDataMap *sync.Map + if _time == "3m" { + klineDataMap = &m.klineDataMap3m + } else if _time == "4h" { + klineDataMap = &m.klineDataMap4h + } else { + klineDataMap = &sync.Map{} + } + return klineDataMap +} +func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time string) { + // 转换WebSocket数据为Kline结构 + kline := Kline{ + OpenTime: wsData.Kline.StartTime, + CloseTime: wsData.Kline.CloseTime, + Trades: wsData.Kline.NumberOfTrades, + } + kline.Open, _ = parseFloat(wsData.Kline.OpenPrice) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.Low, _ = parseFloat(wsData.Kline.LowPrice) + kline.Close, _ = parseFloat(wsData.Kline.ClosePrice) + kline.Volume, _ = parseFloat(wsData.Kline.Volume) + kline.High, _ = parseFloat(wsData.Kline.HighPrice) + kline.QuoteVolume, _ = parseFloat(wsData.Kline.QuoteVolume) + kline.TakerBuyBaseVolume, _ = parseFloat(wsData.Kline.TakerBuyBaseVolume) + kline.TakerBuyQuoteVolume, _ = parseFloat(wsData.Kline.TakerBuyQuoteVolume) + // 更新K线数据 + var klineDataMap = m.getKlineDataMap(_time) + value, exists := klineDataMap.Load(symbol) + var klines []Kline + if exists { + klines = value.([]Kline) + + // 检查是否是新的K线 + if len(klines) > 0 && klines[len(klines)-1].OpenTime == kline.OpenTime { + // 更新当前K线 + klines[len(klines)-1] = kline + } else { + // 添加新K线 + klines = append(klines, kline) + + // 保持数据长度 + if len(klines) > 100 { + klines = klines[1:] + } + } + } else { + klines = []Kline{kline} + } + + klineDataMap.Store(symbol, klines) +} + +func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { + // 对每一个进来的symbol检测是否存在内类 是否的话就订阅它 + value, exists := m.getKlineDataMap(_time).Load(symbol) + if !exists { + // 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行) + apiClient := NewAPIClient() + klines, err := apiClient.GetKlines(symbol, _time, 100) + m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存 + subStr := m.subscribeSymbol(symbol, _time) + subErr := m.combinedClient.subscribeStreams(subStr) + log.Printf("动态订阅流: %v", subStr) + if subErr != nil { + return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr) + } + if err != nil { + return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err) + } + return klines, fmt.Errorf("symbol不存在") + } + return value.([]Kline), nil +} + +func (m *WSMonitor) Close() { + m.wsClient.Close() + close(m.alertsChan) +} diff --git a/market/types.go b/market/types.go new file mode 100644 index 00000000..82f44415 --- /dev/null +++ b/market/types.go @@ -0,0 +1,157 @@ +package market + +import "time" + +// Data 市场数据结构 +type Data struct { + Symbol string + CurrentPrice float64 + PriceChange1h float64 // 1小时价格变化百分比 + PriceChange4h float64 // 4小时价格变化百分比 + CurrentEMA20 float64 + CurrentMACD float64 + CurrentRSI7 float64 + OpenInterest *OIData + FundingRate float64 + IntradaySeries *IntradayData + LongerTermContext *LongerTermData +} + +// OIData Open Interest数据 +type OIData struct { + Latest float64 + Average float64 +} + +// IntradayData 日内数据(3分钟间隔) +type IntradayData struct { + MidPrices []float64 + EMA20Values []float64 + MACDValues []float64 + RSI7Values []float64 + RSI14Values []float64 +} + +// LongerTermData 长期数据(4小时时间框架) +type LongerTermData struct { + EMA20 float64 + EMA50 float64 + ATR3 float64 + ATR14 float64 + CurrentVolume float64 + AverageVolume float64 + MACDValues []float64 + RSI14Values []float64 +} + +// Binance API 响应结构 +type ExchangeInfo struct { + Symbols []SymbolInfo `json:"symbols"` +} + +type SymbolInfo struct { + Symbol string `json:"symbol"` + Status string `json:"status"` + BaseAsset string `json:"baseAsset"` + QuoteAsset string `json:"quoteAsset"` + ContractType string `json:"contractType"` + PricePrecision int `json:"pricePrecision"` + QuantityPrecision int `json:"quantityPrecision"` +} + +type Kline struct { + OpenTime int64 `json:"openTime"` + Open float64 `json:"open"` + High float64 `json:"high"` + Low float64 `json:"low"` + Close float64 `json:"close"` + Volume float64 `json:"volume"` + CloseTime int64 `json:"closeTime"` + QuoteVolume float64 `json:"quoteVolume"` + Trades int `json:"trades"` + TakerBuyBaseVolume float64 `json:"takerBuyBaseVolume"` + TakerBuyQuoteVolume float64 `json:"takerBuyQuoteVolume"` +} + +type KlineResponse []interface{} + +type PriceTicker struct { + Symbol string `json:"symbol"` + Price string `json:"price"` +} + +type Ticker24hr struct { + Symbol string `json:"symbol"` + PriceChange string `json:"priceChange"` + PriceChangePercent string `json:"priceChangePercent"` + Volume string `json:"volume"` + QuoteVolume string `json:"quoteVolume"` +} + +// 特征数据结构 +type SymbolFeatures struct { + Symbol string `json:"symbol"` + Timestamp time.Time `json:"timestamp"` + Price float64 `json:"price"` + PriceChange15Min float64 `json:"price_change_15min"` + PriceChange1H float64 `json:"price_change_1h"` + PriceChange4H float64 `json:"price_change_4h"` + Volume float64 `json:"volume"` + VolumeRatio5 float64 `json:"volume_ratio_5"` + VolumeRatio20 float64 `json:"volume_ratio_20"` + VolumeTrend float64 `json:"volume_trend"` + RSI14 float64 `json:"rsi_14"` + SMA5 float64 `json:"sma_5"` + SMA10 float64 `json:"sma_10"` + SMA20 float64 `json:"sma_20"` + HighLowRatio float64 `json:"high_low_ratio"` + Volatility20 float64 `json:"volatility_20"` + PositionInRange float64 `json:"position_in_range"` +} + +// 警报数据结构 +type Alert struct { + Type string `json:"type"` + Symbol string `json:"symbol"` + Value float64 `json:"value"` + Threshold float64 `json:"threshold"` + Message string `json:"message"` + Timestamp time.Time `json:"timestamp"` +} + +type Config struct { + AlertThresholds AlertThresholds `json:"alert_thresholds"` + UpdateInterval int `json:"update_interval"` // seconds + CleanupConfig CleanupConfig `json:"cleanup_config"` +} + +type AlertThresholds struct { + VolumeSpike float64 `json:"volume_spike"` + PriceChange15Min float64 `json:"price_change_15min"` + VolumeTrend float64 `json:"volume_trend"` + RSIOverbought float64 `json:"rsi_overbought"` + RSIOversold float64 `json:"rsi_oversold"` +} +type CleanupConfig struct { + InactiveTimeout time.Duration `json:"inactive_timeout"` // 不活跃超时时间 + MinScoreThreshold float64 `json:"min_score_threshold"` // 最低评分阈值 + NoAlertTimeout time.Duration `json:"no_alert_timeout"` // 无警报超时时间 + CheckInterval time.Duration `json:"check_interval"` // 检查间隔 +} + +var config = Config{ + AlertThresholds: AlertThresholds{ + VolumeSpike: 3.0, + PriceChange15Min: 0.05, + VolumeTrend: 2.0, + RSIOverbought: 70, + RSIOversold: 30, + }, + CleanupConfig: CleanupConfig{ + InactiveTimeout: 30 * time.Minute, + MinScoreThreshold: 15.0, + NoAlertTimeout: 20 * time.Minute, + CheckInterval: 5 * time.Minute, + }, + UpdateInterval: 60, // 1 minute +} diff --git a/market/websocket_client.go b/market/websocket_client.go new file mode 100644 index 00000000..ce151691 --- /dev/null +++ b/market/websocket_client.go @@ -0,0 +1,231 @@ +package market + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WSClient struct { + conn *websocket.Conn + mu sync.RWMutex + subscribers map[string]chan []byte + reconnect bool + done chan struct{} +} + +type WSMessage struct { + Stream string `json:"stream"` + Data json.RawMessage `json:"data"` +} + +type KlineWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + Kline struct { + StartTime int64 `json:"t"` + CloseTime int64 `json:"T"` + Symbol string `json:"s"` + Interval string `json:"i"` + FirstTradeID int64 `json:"f"` + LastTradeID int64 `json:"L"` + OpenPrice string `json:"o"` + ClosePrice string `json:"c"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + NumberOfTrades int `json:"n"` + IsFinal bool `json:"x"` + QuoteVolume string `json:"q"` + TakerBuyBaseVolume string `json:"V"` + TakerBuyQuoteVolume string `json:"Q"` + } `json:"k"` +} + +type TickerWSData struct { + EventType string `json:"e"` + EventTime int64 `json:"E"` + Symbol string `json:"s"` + PriceChange string `json:"p"` + PriceChangePercent string `json:"P"` + WeightedAvgPrice string `json:"w"` + LastPrice string `json:"c"` + LastQty string `json:"Q"` + OpenPrice string `json:"o"` + HighPrice string `json:"h"` + LowPrice string `json:"l"` + Volume string `json:"v"` + QuoteVolume string `json:"q"` + OpenTime int64 `json:"O"` + CloseTime int64 `json:"C"` + FirstID int64 `json:"F"` + LastID int64 `json:"L"` + Count int `json:"n"` +} + +func NewWSClient() *WSClient { + return &WSClient{ + subscribers: make(map[string]chan []byte), + reconnect: true, + done: make(chan struct{}), + } +} + +func (w *WSClient) Connect() error { + dialer := websocket.Dialer{ + HandshakeTimeout: 10 * time.Second, + } + + conn, _, err := dialer.Dial("wss://ws-fapi.binance.com/ws-fapi/v1", nil) + if err != nil { + return fmt.Errorf("WebSocket连接失败: %v", err) + } + + w.mu.Lock() + w.conn = conn + w.mu.Unlock() + + log.Println("WebSocket连接成功") + + // 启动消息读取循环 + go w.readMessages() + + return nil +} + +func (w *WSClient) SubscribeKline(symbol, interval string) error { + stream := fmt.Sprintf("%s@kline_%s", symbol, interval) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeTicker(symbol string) error { + stream := fmt.Sprintf("%s@ticker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) SubscribeMiniTicker(symbol string) error { + stream := fmt.Sprintf("%s@miniTicker", symbol) + return w.subscribe(stream) +} + +func (w *WSClient) subscribe(stream string) error { + subscribeMsg := map[string]interface{}{ + "method": "SUBSCRIBE", + "params": []string{stream}, + "id": time.Now().Unix(), + } + + w.mu.RLock() + defer w.mu.RUnlock() + + if w.conn == nil { + return fmt.Errorf("WebSocket未连接") + } + + err := w.conn.WriteJSON(subscribeMsg) + if err != nil { + return err + } + + log.Printf("订阅流: %s", stream) + return nil +} + +func (w *WSClient) readMessages() { + for { + select { + case <-w.done: + return + default: + w.mu.RLock() + conn := w.conn + w.mu.RUnlock() + + if conn == nil { + time.Sleep(1 * time.Second) + continue + } + + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("读取WebSocket消息失败: %v", err) + w.handleReconnect() + return + } + + w.handleMessage(message) + } + } +} + +func (w *WSClient) handleMessage(message []byte) { + var wsMsg WSMessage + if err := json.Unmarshal(message, &wsMsg); err != nil { + // 可能是其他格式的消息 + return + } + + w.mu.RLock() + ch, exists := w.subscribers[wsMsg.Stream] + w.mu.RUnlock() + + if exists { + select { + case ch <- wsMsg.Data: + default: + log.Printf("订阅者通道已满: %s", wsMsg.Stream) + } + } +} + +func (w *WSClient) handleReconnect() { + if !w.reconnect { + return + } + + log.Println("尝试重新连接...") + time.Sleep(3 * time.Second) + + if err := w.Connect(); err != nil { + log.Printf("重新连接失败: %v", err) + go w.handleReconnect() + } +} + +func (w *WSClient) AddSubscriber(stream string, bufferSize int) <-chan []byte { + ch := make(chan []byte, bufferSize) + w.mu.Lock() + w.subscribers[stream] = ch + w.mu.Unlock() + return ch +} + +func (w *WSClient) RemoveSubscriber(stream string) { + w.mu.Lock() + delete(w.subscribers, stream) + w.mu.Unlock() +} + +func (w *WSClient) Close() { + w.reconnect = false + close(w.done) + + w.mu.Lock() + defer w.mu.Unlock() + + if w.conn != nil { + w.conn.Close() + w.conn = nil + } + + // 关闭所有订阅者通道 + for stream, ch := range w.subscribers { + close(ch) + delete(w.subscribers, stream) + } +} diff --git a/web/index.html b/web/index.html index badfe608..574bc83a 100644 --- a/web/index.html +++ b/web/index.html @@ -2,11 +2,22 @@
+ + +