mirror of
https://github.com/laoxong/nofx.git
synced 2026-06-04 09:58:22 +08:00
093d2a329d
Gate.io Integration: - Add Gate trader with full Trader interface implementation - Add order_sync.go for background trade synchronization - Fix quantity display (convert contracts to actual tokens via quanto_multiplier) - Fix fill price return in OpenLong/OpenShort/CloseLong/CloseShort - Add Gate-specific CoinAnk K-line data source support - Add Gate to supported exchanges in frontend and backend - Add Gate/KuCoin logo SVG icons Trader Package Refactoring: - Move exchange-specific code into subdirectories (binance/, bybit/, okx/, bitget/, hyperliquid/, aster/, lighter/, gate/) - Create types/ package for shared types to avoid circular dependencies - Move TraderTestSuite to trader/testutil package to avoid import cycles - Update market.GetWithExchange to support exchange-specific data
372 lines
12 KiB
Go
372 lines
12 KiB
Go
package binance
|
|
|
|
import (
|
|
"fmt"
|
|
"nofx/logger"
|
|
"nofx/market"
|
|
"nofx/store"
|
|
"nofx/trader/types"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// syncState stores the last sync time (Unix ms) for incremental sync
|
|
var (
|
|
binanceSyncState = make(map[string]int64) // exchangeID -> lastSyncTimeMs (Unix ms)
|
|
binanceSyncStateMutex sync.RWMutex
|
|
)
|
|
|
|
// SyncOrdersFromBinance syncs Binance Futures trade history to local database
|
|
// Uses COMMISSION detection + fromId for efficient incremental sync
|
|
// Also creates/updates position records to ensure orders/fills/positions data consistency
|
|
func (t *FuturesTrader) SyncOrdersFromBinance(traderID string, exchangeID string, exchangeType string, st *store.Store) error {
|
|
if st == nil {
|
|
return fmt.Errorf("store is nil")
|
|
}
|
|
|
|
orderStore := st.Order()
|
|
|
|
// Get last sync time (Unix ms) - first try memory, then database, then default
|
|
binanceSyncStateMutex.RLock()
|
|
lastSyncTimeMs, exists := binanceSyncState[exchangeID]
|
|
binanceSyncStateMutex.RUnlock()
|
|
|
|
nowMs := time.Now().UTC().UnixMilli()
|
|
if !exists {
|
|
// Try to get last fill time from database (persist across restarts)
|
|
lastFillTimeMs, err := orderStore.GetLastFillTimeByExchange(exchangeID)
|
|
if err == nil && lastFillTimeMs > 0 {
|
|
// If recovered time is in the future, it's clearly wrong - use default
|
|
if lastFillTimeMs > nowMs {
|
|
logger.Infof("⚠️ DB sync time %d is in the future (now: %d), using default",
|
|
lastFillTimeMs, nowMs)
|
|
lastSyncTimeMs = nowMs - 24*60*60*1000 // 24 hours ago
|
|
} else {
|
|
// Add 1 second buffer to avoid re-fetching the same fill
|
|
lastSyncTimeMs = lastFillTimeMs + 1000
|
|
logger.Infof("📅 Recovered last sync time from DB: %s (UTC)",
|
|
time.UnixMilli(lastSyncTimeMs).UTC().Format("2006-01-02 15:04:05"))
|
|
}
|
|
} else {
|
|
// First sync: go back 24 hours
|
|
lastSyncTimeMs = nowMs - 24*60*60*1000
|
|
logger.Infof("📅 First sync, starting from 24 hours ago: %s (UTC)",
|
|
time.UnixMilli(lastSyncTimeMs).UTC().Format("2006-01-02 15:04:05"))
|
|
}
|
|
}
|
|
|
|
logger.Infof("🔄 Syncing Binance trades from: %s (UTC) [ms: %d, now: %d]",
|
|
time.UnixMilli(lastSyncTimeMs).UTC().Format("2006-01-02 15:04:05"), lastSyncTimeMs, nowMs)
|
|
|
|
// Step 1: Get max trade IDs from local DB for incremental sync
|
|
maxTradeIDs, err := orderStore.GetMaxTradeIDsByExchange(exchangeID)
|
|
if err != nil {
|
|
logger.Infof(" ⚠️ Failed to get max trade IDs: %v, will use time-based query", err)
|
|
maxTradeIDs = make(map[string]int64)
|
|
}
|
|
|
|
// Step 2: Detect symbols to sync using multiple methods
|
|
// COMMISSION detection may miss trades (VIP users, BNB discount, 0-fee trades)
|
|
symbolMap := make(map[string]bool)
|
|
lastSyncTime := time.UnixMilli(lastSyncTimeMs) // Convert to time.Time for API calls
|
|
|
|
// Method 1: COMMISSION income detection
|
|
commissionSymbols, err := t.GetCommissionSymbols(lastSyncTime)
|
|
if err != nil {
|
|
logger.Infof(" ⚠️ Failed to get commission symbols: %v", err)
|
|
} else {
|
|
logger.Infof(" 📋 COMMISSION symbols found: %d - %v", len(commissionSymbols), commissionSymbols)
|
|
for _, s := range commissionSymbols {
|
|
symbolMap[s] = true
|
|
}
|
|
}
|
|
|
|
// Method 2: Always include active positions (catches trades that COMMISSION missed)
|
|
positionSymbols := t.getPositionSymbols()
|
|
logger.Infof(" 📋 Position symbols found: %d - %v", len(positionSymbols), positionSymbols)
|
|
for _, s := range positionSymbols {
|
|
symbolMap[s] = true
|
|
}
|
|
|
|
// Method 3: Include symbols from recent fills in DB (in case some were partially synced)
|
|
recentSymbols, _ := orderStore.GetRecentFillSymbolsByExchange(exchangeID, lastSyncTimeMs)
|
|
logger.Infof(" 📋 Recent fill symbols found: %d - %v", len(recentSymbols), recentSymbols)
|
|
for _, s := range recentSymbols {
|
|
symbolMap[s] = true
|
|
}
|
|
|
|
// Method 4: ALWAYS query REALIZED_PNL income to find symbols with closed trades
|
|
// This catches trades that COMMISSION missed (VIP users, BNB fee discount)
|
|
// IMPORTANT: Must run always, not just when symbolMap is empty,
|
|
// because a position might be fully closed (no active position) but have PnL
|
|
pnlSymbols, err := t.GetPnLSymbols(lastSyncTime)
|
|
if err != nil {
|
|
logger.Infof(" ⚠️ Failed to get PnL symbols: %v", err)
|
|
} else {
|
|
logger.Infof(" 📋 REALIZED_PNL symbols found: %d - %v", len(pnlSymbols), pnlSymbols)
|
|
for _, s := range pnlSymbols {
|
|
symbolMap[s] = true
|
|
}
|
|
}
|
|
|
|
var changedSymbols []string
|
|
for s := range symbolMap {
|
|
changedSymbols = append(changedSymbols, s)
|
|
}
|
|
|
|
if len(changedSymbols) == 0 {
|
|
logger.Infof("📭 No symbols with new trades to sync")
|
|
// DON'T update lastSyncTime to current time here!
|
|
// Keep using the last actual trade time from DB to avoid creating gaps
|
|
// The lastSyncTimeMs from DB already has +1000ms buffer added
|
|
return nil
|
|
}
|
|
|
|
logger.Infof("📊 Found %d symbols with new trades: %v", len(changedSymbols), changedSymbols)
|
|
|
|
// Step 3: Query trades for changed symbols using fromId (incremental) or time-based (new symbols)
|
|
var allTrades []types.TradeRecord
|
|
var failedSymbols []string
|
|
apiCalls := 0
|
|
for _, symbol := range changedSymbols {
|
|
var trades []types.TradeRecord
|
|
var queryErr error
|
|
|
|
if lastID, ok := maxTradeIDs[symbol]; ok && lastID > 0 {
|
|
// Incremental sync: query from last known trade ID
|
|
trades, queryErr = t.GetTradesForSymbolFromID(symbol, lastID+1, 500)
|
|
} else {
|
|
// New symbol or first sync: query by time
|
|
trades, queryErr = t.GetTradesForSymbol(symbol, lastSyncTime, 500)
|
|
}
|
|
apiCalls++
|
|
|
|
if queryErr != nil {
|
|
logger.Infof(" ⚠️ Failed to get trades for %s: %v", symbol, queryErr)
|
|
failedSymbols = append(failedSymbols, symbol)
|
|
continue
|
|
}
|
|
allTrades = append(allTrades, trades...)
|
|
}
|
|
|
|
logger.Infof("📥 Received %d trades from Binance (%d API calls)", len(allTrades), apiCalls)
|
|
|
|
if len(allTrades) == 0 {
|
|
// No trades returned, but symbols were detected - might be false positive from COMMISSION/PnL detection
|
|
// Don't update lastSyncTime, keep using DB value
|
|
if len(failedSymbols) > 0 {
|
|
logger.Infof(" ⚠️ %d symbols failed: %v", len(failedSymbols), failedSymbols)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sort trades by time ASC (oldest first) for proper position building
|
|
sort.Slice(allTrades, func(i, j int) bool {
|
|
return allTrades[i].Time.UnixMilli() < allTrades[j].Time.UnixMilli()
|
|
})
|
|
|
|
// Process trades one by one
|
|
positionStore := st.Position()
|
|
posBuilder := store.NewPositionBuilder(positionStore)
|
|
syncedCount := 0
|
|
|
|
skippedCount := 0
|
|
for _, trade := range allTrades {
|
|
// Check if trade already exists
|
|
existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID)
|
|
if err == nil && existing != nil {
|
|
skippedCount++
|
|
continue // Trade already exists, skip
|
|
}
|
|
|
|
// Normalize symbol
|
|
symbol := market.Normalize(trade.Symbol)
|
|
|
|
// Determine order action based on side and position side
|
|
orderAction := t.determineOrderAction(trade.Side, trade.PositionSide, trade.RealizedPnL)
|
|
|
|
// Determine position side for position builder
|
|
positionSide := trade.PositionSide
|
|
if positionSide == "" || positionSide == "BOTH" {
|
|
// Infer from order action
|
|
if strings.Contains(orderAction, "long") {
|
|
positionSide = "LONG"
|
|
} else {
|
|
positionSide = "SHORT"
|
|
}
|
|
}
|
|
|
|
// Normalize side
|
|
side := strings.ToUpper(trade.Side)
|
|
|
|
// Create order record - use Unix milliseconds UTC
|
|
tradeTimeMs := trade.Time.UTC().UnixMilli()
|
|
orderRecord := &store.TraderOrder{
|
|
TraderID: traderID,
|
|
ExchangeID: exchangeID,
|
|
ExchangeType: exchangeType,
|
|
ExchangeOrderID: trade.TradeID,
|
|
Symbol: symbol,
|
|
Side: side,
|
|
PositionSide: positionSide,
|
|
Type: "MARKET",
|
|
OrderAction: orderAction,
|
|
Quantity: trade.Quantity,
|
|
Price: trade.Price,
|
|
Status: "FILLED",
|
|
FilledQuantity: trade.Quantity,
|
|
AvgFillPrice: trade.Price,
|
|
Commission: trade.Fee,
|
|
FilledAt: tradeTimeMs,
|
|
CreatedAt: tradeTimeMs,
|
|
UpdatedAt: tradeTimeMs,
|
|
}
|
|
|
|
// Insert order record
|
|
if err := orderStore.CreateOrder(orderRecord); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err)
|
|
continue
|
|
}
|
|
|
|
// Create fill record - use Unix milliseconds UTC
|
|
fillRecord := &store.TraderFill{
|
|
TraderID: traderID,
|
|
ExchangeID: exchangeID,
|
|
ExchangeType: exchangeType,
|
|
OrderID: orderRecord.ID,
|
|
ExchangeOrderID: trade.TradeID,
|
|
ExchangeTradeID: trade.TradeID,
|
|
Symbol: symbol,
|
|
Side: side,
|
|
Price: trade.Price,
|
|
Quantity: trade.Quantity,
|
|
QuoteQuantity: trade.Price * trade.Quantity,
|
|
Commission: trade.Fee,
|
|
CommissionAsset: "USDT",
|
|
RealizedPnL: trade.RealizedPnL,
|
|
IsMaker: false,
|
|
CreatedAt: tradeTimeMs,
|
|
}
|
|
|
|
if err := orderStore.CreateFill(fillRecord); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err)
|
|
}
|
|
|
|
// Create/update position record using PositionBuilder
|
|
if err := posBuilder.ProcessTrade(
|
|
traderID, exchangeID, exchangeType,
|
|
symbol, positionSide, orderAction,
|
|
trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL,
|
|
tradeTimeMs, trade.TradeID,
|
|
); err != nil {
|
|
logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err)
|
|
} else {
|
|
logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, orderAction, trade.Quantity)
|
|
}
|
|
|
|
syncedCount++
|
|
logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s time=%s(UTC)",
|
|
trade.TradeID, symbol, side, trade.Quantity, trade.Price, trade.RealizedPnL, trade.Fee, orderAction,
|
|
trade.Time.UTC().Format("01-02 15:04:05"))
|
|
}
|
|
|
|
// Update lastSyncTime to the LATEST trade time (not current time!)
|
|
// This ensures next sync starts from where we left off, not from "now"
|
|
// allTrades is already sorted by time ASC, so last element is the latest
|
|
if len(allTrades) > 0 && len(failedSymbols) == 0 {
|
|
latestTradeTimeMs := allTrades[len(allTrades)-1].Time.UTC().UnixMilli()
|
|
binanceSyncStateMutex.Lock()
|
|
binanceSyncState[exchangeID] = latestTradeTimeMs
|
|
binanceSyncStateMutex.Unlock()
|
|
logger.Infof("📅 Updated lastSyncTime to latest trade: %s (UTC)",
|
|
time.UnixMilli(latestTradeTimeMs).UTC().Format("2006-01-02 15:04:05"))
|
|
} else if len(failedSymbols) > 0 {
|
|
logger.Infof(" ⚠️ %d symbols failed, not updating lastSyncTime to retry next time: %v", len(failedSymbols), failedSymbols)
|
|
}
|
|
|
|
logger.Infof("✅ Binance order sync completed: %d new trades synced, %d skipped (already exist)", syncedCount, skippedCount)
|
|
return nil
|
|
}
|
|
|
|
// getPositionSymbols returns list of symbols that have active positions
|
|
// Used as fallback when COMMISSION detection fails
|
|
func (t *FuturesTrader) getPositionSymbols() []string {
|
|
positions, err := t.GetPositions()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
var symbols []string
|
|
for _, pos := range positions {
|
|
if symbol, ok := pos["symbol"].(string); ok && symbol != "" {
|
|
symbols = append(symbols, symbol)
|
|
}
|
|
}
|
|
return symbols
|
|
}
|
|
|
|
// determineOrderAction determines the order action based on trade data
|
|
func (t *FuturesTrader) determineOrderAction(side, positionSide string, realizedPnL float64) string {
|
|
side = strings.ToUpper(side)
|
|
positionSide = strings.ToUpper(positionSide)
|
|
|
|
// If there's realized PnL, it's likely a close trade
|
|
isClose := realizedPnL != 0
|
|
|
|
if positionSide == "LONG" || positionSide == "" {
|
|
if side == "BUY" {
|
|
if isClose {
|
|
return "close_short" // Buying to close short
|
|
}
|
|
return "open_long"
|
|
} else {
|
|
if isClose {
|
|
return "close_long" // Selling to close long
|
|
}
|
|
return "open_short"
|
|
}
|
|
} else if positionSide == "SHORT" {
|
|
if side == "SELL" {
|
|
if isClose {
|
|
return "close_long"
|
|
}
|
|
return "open_short"
|
|
} else {
|
|
if isClose {
|
|
return "close_short"
|
|
}
|
|
return "open_long"
|
|
}
|
|
}
|
|
|
|
// Default fallback
|
|
if side == "BUY" {
|
|
return "open_long"
|
|
}
|
|
return "open_short"
|
|
}
|
|
|
|
// StartOrderSync starts background order sync task for Binance
|
|
func (t *FuturesTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) {
|
|
// Run first sync immediately
|
|
go func() {
|
|
logger.Infof("🔄 Running initial Binance order sync...")
|
|
if err := t.SyncOrdersFromBinance(traderID, exchangeID, exchangeType, st); err != nil {
|
|
logger.Infof("⚠️ Initial Binance order sync failed: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Then run periodically
|
|
ticker := time.NewTicker(interval)
|
|
go func() {
|
|
for range ticker.C {
|
|
if err := t.SyncOrdersFromBinance(traderID, exchangeID, exchangeType, st); err != nil {
|
|
logger.Infof("⚠️ Binance order sync failed: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
logger.Infof("🔄 Binance order sync started (interval: %v)", interval)
|
|
}
|