K线获取方式改为websocket组合流. 带重拨机制

流程为下:
1. 启动时使用所有交易员设置的币种(去重) 如果交易员未配置,则使用系统默认
2. 在决策获取K线时 如果没有缓存 则先实时获取后再添加订阅. ps: 适用于Api方式的币种列表
This commit is contained in:
yuanshi2016
2025-11-02 14:03:13 +08:00
parent 0347705df5
commit 7302f96e8e
4 changed files with 179 additions and 179 deletions
+1 -1
View File
@@ -54,7 +54,7 @@ type LeverageConfig struct {
type Config struct {
Traders []TraderConfig `json:"traders"`
UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表
InsideCoins bool `json:"inside_coins"` // 是否使用内置AI评分币种列表
UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表
DefaultCoins []string `json:"default_coins"` // 默认主流币种池
APIServerPort int `json:"api_server_port"`
MaxDailyLoss float64 `json:"max_daily_loss"`
+121 -94
View File
@@ -4,8 +4,11 @@ import (
"crypto/rand"
"database/sql"
"encoding/base32"
"encoding/json"
"fmt"
"log"
"nofx/market"
"slices"
"strings"
"time"
@@ -177,17 +180,18 @@ func (d *Database) createTables() error {
`ALTER TABLE exchanges ADD COLUMN aster_private_key TEXT DEFAULT ''`,
`ALTER TABLE traders ADD COLUMN custom_prompt TEXT DEFAULT ''`,
`ALTER TABLE traders ADD COLUMN override_base_prompt BOOLEAN DEFAULT 0`,
`ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式
`ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种
`ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式)
`ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数
`ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数
`ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔
`ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源
`ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源
`ALTER TABLE traders ADD COLUMN is_cross_margin BOOLEAN DEFAULT 1`, // 默认为全仓模式
`ALTER TABLE traders ADD COLUMN use_default_coins BOOLEAN DEFAULT 1`, // 默认使用默认币种
`ALTER TABLE traders ADD COLUMN custom_coins TEXT DEFAULT ''`, // 自定义币种列表(JSON格式)
`ALTER TABLE traders ADD COLUMN btc_eth_leverage INTEGER DEFAULT 5`, // BTC/ETH杠杆倍数
`ALTER TABLE traders ADD COLUMN altcoin_leverage INTEGER DEFAULT 5`, // 山寨币杠杆倍数
`ALTER TABLE traders ADD COLUMN trading_symbols TEXT DEFAULT ''`, // 交易币种,逗号分隔
`ALTER TABLE traders ADD COLUMN use_coin_pool BOOLEAN DEFAULT 0`, // 是否使用COIN POOL信号源
`ALTER TABLE traders ADD COLUMN use_oi_top BOOLEAN DEFAULT 0`, // 是否使用OI TOP信号源
`ALTER TABLE traders ADD COLUMN use_inside_coins BOOLEAN DEFAULT 0`, // 是否使用内置AI评分信号源
`ALTER TABLE traders ADD COLUMN system_prompt_template TEXT DEFAULT 'default'`, // 系统提示词模板名称
`ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址
`ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称
`ALTER TABLE ai_models ADD COLUMN custom_api_url TEXT DEFAULT ''`, // 自定义API地址
`ALTER TABLE ai_models ADD COLUMN custom_model_name TEXT DEFAULT ''`, // 自定义模型名称
}
for _, query := range alterQueries {
@@ -245,16 +249,16 @@ func (d *Database) initDefaultData() error {
// 初始化系统配置 - 创建所有字段,设置默认值,后续由config.json同步更新
systemConfigs := map[string]string{
"admin_mode": "true", // 默认开启管理员模式,便于首次使用
"api_server_port": "8080", // 默认API端口
"use_default_coins": "true", // 默认使用内置币种列表
"default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式)
"max_daily_loss": "10.0", // 最大日损失百分比
"max_drawdown": "20.0", // 最大回撤百分比
"stop_trading_minutes": "60", // 停止交易时间(分钟)
"btc_eth_leverage": "5", // BTC/ETH杠杆倍数
"altcoin_leverage": "5", // 山寨币杠杆倍数
"jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成
"admin_mode": "true", // 默认开启管理员模式,便于首次使用
"api_server_port": "8080", // 默认API端口
"use_default_coins": "true", // 默认使用内置币种列表
"default_coins": `["BTCUSDT","ETHUSDT","SOLUSDT","BNBUSDT","XRPUSDT","DOGEUSDT","ADAUSDT","HYPEUSDT"]`, // 默认币种列表(JSON格式)
"max_daily_loss": "10.0", // 最大日损失百分比
"max_drawdown": "20.0", // 最大回撤百分比
"stop_trading_minutes": "60", // 停止交易时间(分钟)
"btc_eth_leverage": "5", // BTC/ETH杠杆倍数
"altcoin_leverage": "5", // 山寨币杠杆倍数
"jwt_secret": "", // JWT密钥,默认为空,由config.json或系统生成
}
for key, value := range systemConfigs {
@@ -281,14 +285,14 @@ func (d *Database) migrateExchangesTable() error {
if err != nil {
return err
}
// 如果已经迁移过,直接返回
if count > 0 {
return nil
}
log.Printf("🔄 开始迁移exchanges表...")
// 创建新的exchanges表,使用复合主键
_, err = d.db.Exec(`
CREATE TABLE exchanges_new (
@@ -313,7 +317,7 @@ func (d *Database) migrateExchangesTable() error {
if err != nil {
return fmt.Errorf("创建新exchanges表失败: %w", err)
}
// 复制数据到新表
_, err = d.db.Exec(`
INSERT INTO exchanges_new
@@ -322,19 +326,19 @@ func (d *Database) migrateExchangesTable() error {
if err != nil {
return fmt.Errorf("复制数据失败: %w", err)
}
// 删除旧表
_, err = d.db.Exec(`DROP TABLE exchanges`)
if err != nil {
return fmt.Errorf("删除旧表失败: %w", err)
}
// 重命名新表
_, err = d.db.Exec(`ALTER TABLE exchanges_new RENAME TO exchanges`)
if err != nil {
return fmt.Errorf("重命名表失败: %w", err)
}
// 重新创建触发器
_, err = d.db.Exec(`
CREATE TRIGGER IF NOT EXISTS update_exchanges_updated_at
@@ -347,20 +351,20 @@ func (d *Database) migrateExchangesTable() error {
if err != nil {
return fmt.Errorf("创建触发器失败: %w", err)
}
log.Printf("✅ exchanges表迁移完成")
return nil
}
// User 用户配置
type User struct {
ID string `json:"id"`
Email string `json:"email"`
PasswordHash string `json:"-"` // 不返回到前端
OTPSecret string `json:"-"` // 不返回到前端
OTPVerified bool `json:"otp_verified"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID string `json:"id"`
Email string `json:"email"`
PasswordHash string `json:"-"` // 不返回到前端
OTPSecret string `json:"-"` // 不返回到前端
OTPVerified bool `json:"otp_verified"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// AIModelConfig AI模型配置
@@ -379,39 +383,40 @@ type AIModelConfig struct {
// ExchangeConfig 交易所配置
type ExchangeConfig struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
Type string `json:"type"`
Enabled bool `json:"enabled"`
APIKey string `json:"apiKey"`
SecretKey string `json:"secretKey"`
Testnet bool `json:"testnet"`
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
Type string `json:"type"`
Enabled bool `json:"enabled"`
APIKey string `json:"apiKey"`
SecretKey string `json:"secretKey"`
Testnet bool `json:"testnet"`
// Hyperliquid 特定字段
HyperliquidWalletAddr string `json:"hyperliquidWalletAddr"`
// Aster 特定字段
AsterUser string `json:"asterUser"`
AsterSigner string `json:"asterSigner"`
AsterPrivateKey string `json:"asterPrivateKey"`
AsterUser string `json:"asterUser"`
AsterSigner string `json:"asterSigner"`
AsterPrivateKey string `json:"asterPrivateKey"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// TraderRecord 交易员配置(数据库实体)
type TraderRecord struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
AIModelID string `json:"ai_model_id"`
ExchangeID string `json:"exchange_id"`
InitialBalance float64 `json:"initial_balance"`
ScanIntervalMinutes int `json:"scan_interval_minutes"`
IsRunning bool `json:"is_running"`
BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数
AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数
ID string `json:"id"`
UserID string `json:"user_id"`
Name string `json:"name"`
AIModelID string `json:"ai_model_id"`
ExchangeID string `json:"exchange_id"`
InitialBalance float64 `json:"initial_balance"`
ScanIntervalMinutes int `json:"scan_interval_minutes"`
IsRunning bool `json:"is_running"`
BTCETHLeverage int `json:"btc_eth_leverage"` // BTC/ETH杠杆倍数
AltcoinLeverage int `json:"altcoin_leverage"` // 山寨币杠杆倍数
TradingSymbols string `json:"trading_symbols"` // 交易币种,逗号分隔
UseCoinPool bool `json:"use_coin_pool"` // 是否使用COIN POOL信号源
UseOITop bool `json:"use_oi_top"` // 是否使用OI TOP信号源
UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置评分信号源
CustomPrompt string `json:"custom_prompt"` // 自定义交易策略prompt
OverrideBasePrompt bool `json:"override_base_prompt"` // 是否覆盖基础prompt
SystemPromptTemplate string `json:"system_prompt_template"` // 系统提示词模板名称
@@ -422,12 +427,12 @@ type TraderRecord struct {
// UserSignalSource 用户信号源配置
type UserSignalSource struct {
ID int `json:"id"`
UserID string `json:"user_id"`
CoinPoolURL string `json:"coin_pool_url"`
OITopURL string `json:"oi_top_url"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID int `json:"id"`
UserID string `json:"user_id"`
CoinPoolURL string `json:"coin_pool_url"`
OITopURL string `json:"oi_top_url"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// GenerateOTPSecret 生成OTP密钥
@@ -457,12 +462,12 @@ func (d *Database) EnsureAdminUser() error {
if err != nil {
return err
}
// 如果已存在,直接返回
if count > 0 {
return nil
}
// 创建admin用户(密码为空,因为管理员模式下不需要密码)
adminUser := &User{
ID: "admin",
@@ -471,7 +476,7 @@ func (d *Database) EnsureAdminUser() error {
OTPSecret: "",
OTPVerified: true,
}
return d.CreateUser(adminUser)
}
@@ -482,7 +487,7 @@ func (d *Database) GetUserByEmail(email string) (*User, error) {
SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at
FROM users WHERE email = ?
`, email).Scan(
&user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret,
&user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret,
&user.OTPVerified, &user.CreatedAt, &user.UpdatedAt,
)
if err != nil {
@@ -498,7 +503,7 @@ func (d *Database) GetUserByID(userID string) (*User, error) {
SELECT id, email, password_hash, otp_secret, otp_verified, created_at, updated_at
FROM users WHERE id = ?
`, userID).Scan(
&user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret,
&user.ID, &user.Email, &user.PasswordHash, &user.OTPSecret,
&user.OTPVerified, &user.CreatedAt, &user.UpdatedAt,
)
if err != nil {
@@ -668,7 +673,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) {
err := rows.Scan(
&exchange.ID, &exchange.UserID, &exchange.Name, &exchange.Type,
&exchange.Enabled, &exchange.APIKey, &exchange.SecretKey, &exchange.Testnet,
&exchange.HyperliquidWalletAddr, &exchange.AsterUser,
&exchange.HyperliquidWalletAddr, &exchange.AsterUser,
&exchange.AsterSigner, &exchange.AsterPrivateKey,
&exchange.CreatedAt, &exchange.UpdatedAt,
)
@@ -684,7 +689,7 @@ func (d *Database) GetExchanges(userID string) ([]*ExchangeConfig, error) {
// UpdateExchange 更新交易所配置,如果不存在则创建用户特定配置
func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secretKey string, testnet bool, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey string) error {
log.Printf("🔧 UpdateExchange: userID=%s, id=%s, enabled=%v", userID, id, enabled)
// 首先尝试更新现有的用户配置
result, err := d.db.Exec(`
UPDATE exchanges SET enabled = ?, api_key = ?, secret_key = ?, testnet = ?,
@@ -695,20 +700,20 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre
log.Printf("❌ UpdateExchange: 更新失败: %v", err)
return err
}
// 检查是否有行被更新
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Printf("❌ UpdateExchange: 获取影响行数失败: %v", err)
return err
}
log.Printf("📊 UpdateExchange: 影响行数 = %d", rowsAffected)
// 如果没有行被更新,说明用户没有这个交易所的配置,需要创建
if rowsAffected == 0 {
log.Printf("💡 UpdateExchange: 没有现有记录,创建新记录")
// 根据交易所ID确定基本信息
var name, typ string
if id == "binance" {
@@ -724,16 +729,16 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre
name = id + " Exchange"
typ = "cex"
}
log.Printf("🆕 UpdateExchange: 创建新记录 ID=%s, name=%s, type=%s", id, name, typ)
// 创建用户特定的配置,使用原始的交易所ID
_, err = d.db.Exec(`
INSERT INTO exchanges (id, user_id, name, type, enabled, api_key, secret_key, testnet,
hyperliquid_wallet_addr, aster_user, aster_signer, aster_private_key, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
`, id, userID, name, typ, enabled, apiKey, secretKey, testnet, hyperliquidWalletAddr, asterUser, asterSigner, asterPrivateKey)
if err != nil {
log.Printf("❌ UpdateExchange: 创建记录失败: %v", err)
} else {
@@ -741,7 +746,7 @@ func (d *Database) UpdateExchange(userID, id string, enabled bool, apiKey, secre
}
return err
}
log.Printf("✅ UpdateExchange: 更新现有记录成功")
return nil
}
@@ -767,9 +772,9 @@ func (d *Database) CreateExchange(userID, id, name, typ string, enabled bool, ap
// CreateTrader 创建交易员
func (d *Database) CreateTrader(trader *TraderRecord) error {
_, err := d.db.Exec(`
INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin)
INSERT INTO traders (id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running, btc_eth_leverage, altcoin_leverage, trading_symbols, use_coin_pool, use_oi_top, use_inside_coins, custom_prompt, override_base_prompt, system_prompt_template, is_cross_margin)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?)
`, trader.ID, trader.UserID, trader.Name, trader.AIModelID, trader.ExchangeID, trader.InitialBalance, trader.ScanIntervalMinutes, trader.IsRunning, trader.BTCETHLeverage, trader.AltcoinLeverage, trader.TradingSymbols, trader.UseCoinPool, trader.UseOITop, trader.UseInsideCoins, trader.CustomPrompt, trader.OverrideBasePrompt, trader.SystemPromptTemplate, trader.IsCrossMargin)
return err
}
@@ -779,7 +784,7 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) {
SELECT id, user_id, name, ai_model_id, exchange_id, initial_balance, scan_interval_minutes, is_running,
COALESCE(btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(altcoin_leverage, 5) as altcoin_leverage,
COALESCE(trading_symbols, '') as trading_symbols,
COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,
COALESCE(use_coin_pool, 0) as use_coin_pool, COALESCE(use_oi_top, 0) as use_oi_top,COALESCE(use_inside_coins, 0) as use_inside_coins,
COALESCE(custom_prompt, '') as custom_prompt, COALESCE(override_base_prompt, 0) as override_base_prompt,
COALESCE(system_prompt_template, 'default') as system_prompt_template,
COALESCE(is_cross_margin, 1) as is_cross_margin, created_at, updated_at
@@ -790,14 +795,14 @@ func (d *Database) GetTraders(userID string) ([]*TraderRecord, error) {
}
defer rows.Close()
var traders []*TraderRecord
var traders []*TraderRecord
for rows.Next() {
var trader TraderRecord
var trader TraderRecord
err := rows.Scan(
&trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID,
&trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning,
&trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols,
&trader.UseCoinPool, &trader.UseOITop,
&trader.UseCoinPool, &trader.UseOITop, &trader.UseInsideCoins,
&trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.SystemPromptTemplate,
&trader.IsCrossMargin,
&trader.CreatedAt, &trader.UpdatedAt,
@@ -847,18 +852,13 @@ func (d *Database) DeleteTrader(userID, id string) error {
// GetTraderConfig 获取交易员完整配置(包含AI模型和交易所信息)
func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIModelConfig, *ExchangeConfig, error) {
var trader TraderRecord
var trader TraderRecord
var aiModel AIModelConfig
var exchange ExchangeConfig
err := d.db.QueryRow(`
SELECT
t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running,
COALESCE(t.btc_eth_leverage, 5) as btc_eth_leverage, COALESCE(t.altcoin_leverage, 5) as altcoin_leverage,
COALESCE(t.trading_symbols, '') as trading_symbols, COALESCE(t.use_coin_pool, 0) as use_coin_pool,
COALESCE(t.use_oi_top, 0) as use_oi_top, COALESCE(t.custom_prompt, '') as custom_prompt,
COALESCE(t.override_base_prompt, 0) as override_base_prompt, COALESCE(t.is_cross_margin, 1) as is_cross_margin,
t.created_at, t.updated_at,
t.id, t.user_id, t.name, t.ai_model_id, t.exchange_id, t.initial_balance, t.scan_interval_minutes, t.is_running, t.created_at, t.updated_at,
a.id, a.user_id, a.name, a.provider, a.enabled, a.api_key, a.created_at, a.updated_at,
e.id, e.user_id, e.name, e.type, e.enabled, e.api_key, e.secret_key, e.testnet,
COALESCE(e.hyperliquid_wallet_addr, '') as hyperliquid_wallet_addr,
@@ -873,8 +873,6 @@ func (d *Database) GetTraderConfig(userID, traderID string) (*TraderRecord, *AIM
`, traderID, userID).Scan(
&trader.ID, &trader.UserID, &trader.Name, &trader.AIModelID, &trader.ExchangeID,
&trader.InitialBalance, &trader.ScanIntervalMinutes, &trader.IsRunning,
&trader.BTCETHLeverage, &trader.AltcoinLeverage, &trader.TradingSymbols, &trader.UseCoinPool,
&trader.UseOITop, &trader.CustomPrompt, &trader.OverrideBasePrompt, &trader.IsCrossMargin,
&trader.CreatedAt, &trader.UpdatedAt,
&aiModel.ID, &aiModel.UserID, &aiModel.Name, &aiModel.Provider, &aiModel.Enabled, &aiModel.APIKey,
&aiModel.CreatedAt, &aiModel.UpdatedAt,
@@ -940,7 +938,36 @@ func (d *Database) UpdateUserSignalSource(userID, coinPoolURL, oiTopURL string)
return err
}
// GetCustomCoins 获取所有交易员自定义币种 / Get all trader-customized currencies
func (d *Database) GetCustomCoins() []string {
var symbol string
var symbols []string
_ = d.db.QueryRow(`
SELECT GROUP_CONCAT(custom_coins , ',') as symbol
FROM main.traders where custom_coins != ''
`).Scan(&symbol)
// 检测用户是否未配置币种 - 兼容性
if symbol == "" {
symbolJSON, _ := d.GetSystemConfig("default_coins")
if err := json.Unmarshal([]byte(symbolJSON), &symbols); err != nil {
log.Printf("⚠️ 解析default_coins配置失败: %v,使用硬编码默认值", err)
symbols = []string{"BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT"}
}
}
// filter Symbol
for _, s := range strings.Split(symbol, ",") {
if s == "" {
continue
}
coin := market.Normalize(s)
if !slices.Contains(symbols, coin) {
symbols = append(symbols, coin)
}
}
return symbols
}
// Close 关闭数据库连接
func (d *Database) Close() error {
return d.db.Close()
}
}
+4 -25
View File
@@ -15,7 +15,6 @@ import (
"strconv"
"strings"
"syscall"
"time"
)
// LeverageConfig 杠杆配置
@@ -30,9 +29,9 @@ type ConfigFile struct {
APIServerPort int `json:"api_server_port"`
UseDefaultCoins bool `json:"use_default_coins"`
DefaultCoins []string `json:"default_coins"`
InsideCoins bool `json:"inside_coins"`
CoinPoolAPIURL string `json:"coin_pool_api_url"`
OITopAPIURL string `json:"oi_top_api_url"`
InsideCoins bool `json:"inside_coins"`
MaxDailyLoss float64 `json:"max_daily_loss"`
MaxDrawdown float64 `json:"max_drawdown"`
StopTradingMinutes int `json:"stop_trading_minutes"`
@@ -68,9 +67,9 @@ func syncConfigToDatabase(database *config.Database) error {
"admin_mode": fmt.Sprintf("%t", configFile.AdminMode),
"api_server_port": strconv.Itoa(configFile.APIServerPort),
"use_default_coins": fmt.Sprintf("%t", configFile.UseDefaultCoins),
"inside_coins": fmt.Sprintf("%t", configFile.InsideCoins),
"coin_pool_api_url": configFile.CoinPoolAPIURL,
"oi_top_api_url": configFile.OITopAPIURL,
"inside_coins": fmt.Sprintf("%t", configFile.InsideCoins),
"max_daily_loss": fmt.Sprintf("%.1f", configFile.MaxDailyLoss),
"max_drawdown": fmt.Sprintf("%.1f", configFile.MaxDrawdown),
"stop_trading_minutes": strconv.Itoa(configFile.StopTradingMinutes),
@@ -137,8 +136,6 @@ func main() {
// 获取系统配置
useDefaultCoinsStr, _ := database.GetSystemConfig("use_default_coins")
useDefaultCoins := useDefaultCoinsStr == "true"
InsideCoinsStr, _ := database.GetSystemConfig("inside_coins")
insideCoins := InsideCoinsStr == "true"
apiPortStr, _ := database.GetSystemConfig("api_server_port")
// 获取管理员模式配置
@@ -186,26 +183,6 @@ func main() {
}
pool.SetDefaultCoins(defaultCoins)
//内置AI评分
if insideCoins {
log.Printf("✓ 启用内置AI评分币种列表")
monitor := market.NewWSMonitor(150)
go func() {
monitor.Start()
// 定时器设置默认的币种列表 - 覆蓋defaultCoins设置
for {
if len(monitor.FilterSymbol) > 0 {
for _, coin := range defaultCoins {
monitor.FilterSymbol = append(monitor.FilterSymbol, coin)
}
pool.SetDefaultCoins(monitor.FilterSymbol)
monitor.FilterSymbol = nil
}
time.Sleep(1 * time.Minute)
}
}()
}
// 设置是否使用默认主流币种
pool.SetUseDefaultCoins(useDefaultCoins)
if useDefaultCoins {
@@ -286,6 +263,8 @@ func main() {
}
}()
// 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认
go market.NewWSMonitor(150).Start(database.GetCustomCoins())
// 设置优雅退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+53 -59
View File
@@ -35,6 +35,7 @@ type SymbolStats struct {
}
var WSMonitorCli *WSMonitor
var subKlineTime = []string{"3m", "4h"} // 管理订阅流的K线周期
func NewWSMonitor(batchSize int) *WSMonitor {
WSMonitorCli = &WSMonitor{
@@ -47,23 +48,27 @@ func NewWSMonitor(batchSize int) *WSMonitor {
return WSMonitorCli
}
func (m *WSMonitor) Initialize() error {
func (m *WSMonitor) Initialize(coins []string) error {
log.Println("初始化WebSocket监控器...")
// 获取交易对信息
apiClient := NewAPIClient()
exchangeInfo, err := apiClient.GetExchangeInfo()
if err != nil {
return err
// 如果不指定交易对,则使用market市场的所有交易对币种
if len(coins) == 0 {
exchangeInfo, err := apiClient.GetExchangeInfo()
if err != nil {
return err
}
// 筛选永续合约交易对 --仅测试时使用
//exchangeInfo.Symbols = exchangeInfo.Symbols[0:2]
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" {
m.symbols = append(m.symbols, symbol.Symbol)
}
}
} else {
m.symbols = coins
}
// 筛选永续合约交易对 --仅测试时使用
//exchangeInfo.Symbols = exchangeInfo.Symbols[0:2]
for _, symbol := range exchangeInfo.Symbols {
if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" {
m.symbols = append(m.symbols, Normalize(symbol.Symbol))
}
}
log.Printf("找到 %d 个交易对", len(m.symbols))
// 初始化历史数据
if err := m.initializeHistoricalData(); err != nil {
@@ -114,10 +119,10 @@ func (m *WSMonitor) initializeHistoricalData() error {
return nil
}
func (m *WSMonitor) Start() {
func (m *WSMonitor) Start(coins []string) {
log.Printf("启动WebSocket实时监控...")
// 初始化交易对
err := m.Initialize()
err := m.Initialize(coins)
if err != nil {
log.Fatalf("❌ 初始化币种: %v", err)
return
@@ -129,42 +134,43 @@ func (m *WSMonitor) Start() {
return
}
// 启动警报处理器
go m.handleAlerts()
//go m.handleAlerts()
// 启动定期清理任务
go m.cleanupInactiveSymbols()
//go m.cleanupInactiveSymbols()
// 输出监控统计 - 评分前十名
go m.printFilterStats(50)
//go m.printFilterStats(20)
// 订阅所有交易对
err = m.subscribeAll()
if err != nil {
log.Fatalf("❌ 订阅币种交易对: %v", err)
return
}
}
// subscribeSymbol 注册监听
func (m *WSMonitor) subscribeSymbol(symbol, st string) []string {
var streams []string
stream := fmt.Sprintf("%s@kline_%s", strings.ToLower(symbol), st)
ch := m.combinedClient.AddSubscriber(stream, 100)
streams = append(streams, stream)
go m.handleKlineData(symbol, ch, st)
return streams
}
func (m *WSMonitor) subscribeAll() error {
// 执行批量订阅
log.Println("开始订阅所有交易对...")
for _, symbol := range m.symbols {
stream3m := fmt.Sprintf("%s@kline_3m", strings.ToLower(symbol))
ch3m := m.combinedClient.AddSubscriber(stream3m, 100)
go m.handleKlineData(symbol, ch3m, "3m")
stream4h := fmt.Sprintf("%s@kline_4h", strings.ToLower(symbol))
ch4h := m.combinedClient.AddSubscriber(stream4h, 100)
go m.handleKlineData(symbol, ch4h, "4h")
for _, st := range subKlineTime {
m.subscribeSymbol(symbol, st)
}
}
err := m.combinedClient.BatchSubscribeKlines(m.symbols, "3m")
if err != nil {
log.Fatalf("❌ 订阅3m K线: %v", err)
return err
}
err = m.combinedClient.BatchSubscribeKlines(m.symbols, "4h")
if err != nil {
log.Fatalf("❌ 订阅4h K线: %v", err)
return err
for _, st := range subKlineTime {
err := m.combinedClient.BatchSubscribeKlines(m.symbols, st)
if err != nil {
log.Fatalf("❌ 订阅3m K线: %v", err)
return err
}
}
log.Println("所有交易对订阅完成")
return nil
@@ -181,34 +187,14 @@ func (m *WSMonitor) handleKlineData(symbol string, ch <-chan []byte, _time strin
}
}
func (m *WSMonitor) handleTickerData(symbol string, ch <-chan []byte) {
for data := range ch {
var tickerData TickerWSData
if err := json.Unmarshal(data, &tickerData); err != nil {
log.Printf("解析Ticker数据失败: %v", err)
continue
}
m.processTickerUpdate(symbol, tickerData)
}
}
func (m *WSMonitor) handleTickerDatas(ch <-chan []byte) {
for data := range ch {
var tickerData []TickerWSData
if err := json.Unmarshal(data, &tickerData); err != nil {
log.Printf("解析Ticker数据失败: %v", err)
continue
}
log.Fatalln(tickerData)
//m.processTickerUpdate(symbol, tickerData)
}
}
func (m *WSMonitor) getKlineDataMap(_time string) *sync.Map {
var klineDataMap *sync.Map
if _time == "3m" {
klineDataMap = &m.klineDataMap3m
} else {
} else if _time == "4h" {
klineDataMap = &m.klineDataMap4h
} else {
klineDataMap = &sync.Map{}
}
return klineDataMap
}
@@ -310,11 +296,19 @@ func (m *WSMonitor) handleAlerts() {
}
func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) {
// 对每一个进来的symbol检测是否存在内类 是否的话就订阅它
value, exists := m.getKlineDataMap(_time).Load(symbol)
if !exists {
// 如果Ws数据未初始化完成时,单独使用api获取 - 兼容性代码 (防止在未初始化完成是,已经有交易员运行)
apiClient := NewAPIClient()
klines, err := apiClient.GetKlines(symbol, _time, 40)
klines, err := apiClient.GetKlines(symbol, _time, 100)
m.getKlineDataMap(_time).Store(strings.ToUpper(symbol), klines) //动态缓存进缓存
subStr := m.subscribeSymbol(symbol, _time)
subErr := m.combinedClient.subscribeStreams(subStr)
log.Printf("动态订阅流: %v", subStr)
if subErr != nil {
return nil, fmt.Errorf("动态订阅%v分钟K线失败: %v", _time, subErr)
}
if err != nil {
return nil, fmt.Errorf("获取%v分钟K线失败: %v", _time, err)
}