Files
nofx/trader/binance_order_sync.go
T
tinkle-community 5c9e134e99 fix: ensure all timestamps use UTC timezone
- Add NowFunc to GORM config for UTC auto-generated timestamps
- Add .UTC() to all time.UnixMilli() calls in trader files
- Add .UTC() to all time.Now() calls in store and api files
- Fix TypeScript unused imports in frontend
2026-01-04 20:03:56 +08:00

292 lines
8.9 KiB
Go

package trader
import (
"fmt"
"nofx/logger"
"nofx/market"
"nofx/store"
"sort"
"strings"
"sync"
"time"
)
// syncState stores the last sync time for incremental sync
var (
binanceSyncState = make(map[string]time.Time) // exchangeID -> lastSyncTime
binanceSyncStateMutex sync.RWMutex
)
// SyncOrdersFromBinance syncs Binance Futures trade history to local database
// Uses COMMISSION detection + fromId for efficient incremental sync
// Also creates/updates position records to ensure orders/fills/positions data consistency
func (t *FuturesTrader) SyncOrdersFromBinance(traderID string, exchangeID string, exchangeType string, st *store.Store) error {
if st == nil {
return fmt.Errorf("store is nil")
}
// Get last sync time (default to 24 hours ago for first sync)
binanceSyncStateMutex.RLock()
lastSyncTime, exists := binanceSyncState[exchangeID]
binanceSyncStateMutex.RUnlock()
if !exists {
lastSyncTime = time.Now().Add(-24 * time.Hour)
}
// Record current time BEFORE querying, to avoid missing trades during sync
// This prevents race condition where trades happen between query and lastSyncTime update
syncStartTime := time.Now()
logger.Infof("🔄 Syncing Binance trades from: %s", lastSyncTime.Format(time.RFC3339))
// Step 1: Get max trade IDs from local DB for incremental sync
orderStore := st.Order()
maxTradeIDs, err := orderStore.GetMaxTradeIDsByExchange(exchangeID)
if err != nil {
logger.Infof(" ⚠️ Failed to get max trade IDs: %v, will use time-based query", err)
maxTradeIDs = make(map[string]int64)
}
// Step 2: Use COMMISSION to detect which symbols have new trades (1 API call)
changedSymbols, err := t.GetCommissionSymbols(lastSyncTime)
if err != nil {
logger.Infof(" ⚠️ Failed to get commission symbols: %v, falling back to positions", err)
// Fallback: only sync symbols with active positions
changedSymbols = t.getPositionSymbols()
}
if len(changedSymbols) == 0 {
logger.Infof("📭 No symbols with new trades to sync")
// Update last sync time even if no changes
binanceSyncStateMutex.Lock()
binanceSyncState[exchangeID] = syncStartTime
binanceSyncStateMutex.Unlock()
return nil
}
logger.Infof("📊 Found %d symbols with new trades: %v", len(changedSymbols), changedSymbols)
// Step 3: Query trades for changed symbols using fromId (incremental) or time-based (new symbols)
var allTrades []TradeRecord
var failedSymbols []string
apiCalls := 0
for _, symbol := range changedSymbols {
var trades []TradeRecord
var queryErr error
if lastID, ok := maxTradeIDs[symbol]; ok && lastID > 0 {
// Incremental sync: query from last known trade ID
trades, queryErr = t.GetTradesForSymbolFromID(symbol, lastID+1, 500)
} else {
// New symbol or first sync: query by time
trades, queryErr = t.GetTradesForSymbol(symbol, lastSyncTime, 500)
}
apiCalls++
if queryErr != nil {
logger.Infof(" ⚠️ Failed to get trades for %s: %v", symbol, queryErr)
failedSymbols = append(failedSymbols, symbol)
continue
}
allTrades = append(allTrades, trades...)
}
logger.Infof("📥 Received %d trades from Binance (%d API calls)", len(allTrades), apiCalls)
// Only update last sync time if ALL symbols were successfully queried
// This prevents data loss when some symbols fail due to rate limit or network issues
if len(failedSymbols) == 0 {
binanceSyncStateMutex.Lock()
binanceSyncState[exchangeID] = syncStartTime
binanceSyncStateMutex.Unlock()
} else {
logger.Infof(" ⚠️ %d symbols failed, not updating lastSyncTime to retry next time: %v", len(failedSymbols), failedSymbols)
}
if len(allTrades) == 0 {
return nil
}
// Sort trades by time ASC (oldest first) for proper position building
sort.Slice(allTrades, func(i, j int) bool {
return allTrades[i].Time.Before(allTrades[j].Time)
})
// Process trades one by one
positionStore := st.Position()
posBuilder := store.NewPositionBuilder(positionStore)
syncedCount := 0
for _, trade := range allTrades {
// Check if trade already exists
existing, err := orderStore.GetOrderByExchangeID(exchangeID, trade.TradeID)
if err == nil && existing != nil {
continue // Trade already exists, skip
}
// Normalize symbol
symbol := market.Normalize(trade.Symbol)
// Determine order action based on side and position side
orderAction := t.determineOrderAction(trade.Side, trade.PositionSide, trade.RealizedPnL)
// Determine position side for position builder
positionSide := trade.PositionSide
if positionSide == "" || positionSide == "BOTH" {
// Infer from order action
if strings.Contains(orderAction, "long") {
positionSide = "LONG"
} else {
positionSide = "SHORT"
}
}
// Normalize side
side := strings.ToUpper(trade.Side)
// Create order record - use UTC time to avoid timezone issues
tradeTimeUTC := trade.Time.UTC()
orderRecord := &store.TraderOrder{
TraderID: traderID,
ExchangeID: exchangeID,
ExchangeType: exchangeType,
ExchangeOrderID: trade.TradeID,
Symbol: symbol,
Side: side,
PositionSide: positionSide,
Type: "MARKET",
OrderAction: orderAction,
Quantity: trade.Quantity,
Price: trade.Price,
Status: "FILLED",
FilledQuantity: trade.Quantity,
AvgFillPrice: trade.Price,
Commission: trade.Fee,
FilledAt: tradeTimeUTC,
CreatedAt: tradeTimeUTC,
UpdatedAt: tradeTimeUTC,
}
// Insert order record
if err := orderStore.CreateOrder(orderRecord); err != nil {
logger.Infof(" ⚠️ Failed to sync trade %s: %v", trade.TradeID, err)
continue
}
// Create fill record - use UTC time
fillRecord := &store.TraderFill{
TraderID: traderID,
ExchangeID: exchangeID,
ExchangeType: exchangeType,
OrderID: orderRecord.ID,
ExchangeOrderID: trade.TradeID,
ExchangeTradeID: trade.TradeID,
Symbol: symbol,
Side: side,
Price: trade.Price,
Quantity: trade.Quantity,
QuoteQuantity: trade.Price * trade.Quantity,
Commission: trade.Fee,
CommissionAsset: "USDT",
RealizedPnL: trade.RealizedPnL,
IsMaker: false,
CreatedAt: tradeTimeUTC,
}
if err := orderStore.CreateFill(fillRecord); err != nil {
logger.Infof(" ⚠️ Failed to sync fill for trade %s: %v", trade.TradeID, err)
}
// Create/update position record using PositionBuilder
if err := posBuilder.ProcessTrade(
traderID, exchangeID, exchangeType,
symbol, positionSide, orderAction,
trade.Quantity, trade.Price, trade.Fee, trade.RealizedPnL,
trade.Time, trade.TradeID,
); err != nil {
logger.Infof(" ⚠️ Failed to sync position for trade %s: %v", trade.TradeID, err)
} else {
logger.Infof(" 📍 Position updated for trade: %s (action: %s, qty: %.6f)", trade.TradeID, orderAction, trade.Quantity)
}
syncedCount++
logger.Infof(" ✅ Synced trade: %s %s %s qty=%.6f price=%.6f pnl=%.2f fee=%.6f action=%s",
trade.TradeID, symbol, side, trade.Quantity, trade.Price, trade.RealizedPnL, trade.Fee, orderAction)
}
logger.Infof("✅ Binance order sync completed: %d new trades synced", syncedCount)
return nil
}
// getPositionSymbols returns list of symbols that have active positions
// Used as fallback when COMMISSION detection fails
func (t *FuturesTrader) getPositionSymbols() []string {
positions, err := t.GetPositions()
if err != nil {
return nil
}
var symbols []string
for _, pos := range positions {
if symbol, ok := pos["symbol"].(string); ok && symbol != "" {
symbols = append(symbols, symbol)
}
}
return symbols
}
// determineOrderAction determines the order action based on trade data
func (t *FuturesTrader) determineOrderAction(side, positionSide string, realizedPnL float64) string {
side = strings.ToUpper(side)
positionSide = strings.ToUpper(positionSide)
// If there's realized PnL, it's likely a close trade
isClose := realizedPnL != 0
if positionSide == "LONG" || positionSide == "" {
if side == "BUY" {
if isClose {
return "close_short" // Buying to close short
}
return "open_long"
} else {
if isClose {
return "close_long" // Selling to close long
}
return "open_short"
}
} else if positionSide == "SHORT" {
if side == "SELL" {
if isClose {
return "close_long"
}
return "open_short"
} else {
if isClose {
return "close_short"
}
return "open_long"
}
}
// Default fallback
if side == "BUY" {
return "open_long"
}
return "open_short"
}
// StartOrderSync starts background order sync task for Binance
func (t *FuturesTrader) StartOrderSync(traderID string, exchangeID string, exchangeType string, st *store.Store, interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
if err := t.SyncOrdersFromBinance(traderID, exchangeID, exchangeType, st); err != nil {
logger.Infof("⚠️ Binance order sync failed: %v", err)
}
}
}()
logger.Infof("🔄 Binance order sync started (interval: %v)", interval)
}