Files
nofx/trader/position_sync.go
T
2025-12-08 01:43:22 +08:00

323 lines
8.2 KiB
Go

package trader
import (
"fmt"
"nofx/logger"
"nofx/store"
"sync"
"time"
)
// PositionSyncManager Position status synchronization manager
// Responsible for periodically synchronizing exchange positions, detecting manual closures and other changes
type PositionSyncManager struct {
store *store.Store
interval time.Duration
stopCh chan struct{}
wg sync.WaitGroup
traderCache map[string]Trader // trader_id -> Trader instance cache
configCache map[string]*store.TraderFullConfig // trader_id -> config cache
cacheMutex sync.RWMutex
}
// NewPositionSyncManager Create position synchronization manager
func NewPositionSyncManager(st *store.Store, interval time.Duration) *PositionSyncManager {
if interval == 0 {
interval = 10 * time.Second
}
return &PositionSyncManager{
store: st,
interval: interval,
stopCh: make(chan struct{}),
traderCache: make(map[string]Trader),
configCache: make(map[string]*store.TraderFullConfig),
}
}
// Start Start position synchronization service
func (m *PositionSyncManager) Start() {
m.wg.Add(1)
go m.run()
logger.Info("📊 Position sync manager started")
}
// Stop Stop position synchronization service
func (m *PositionSyncManager) Stop() {
close(m.stopCh)
m.wg.Wait()
// Clear cache
m.cacheMutex.Lock()
m.traderCache = make(map[string]Trader)
m.configCache = make(map[string]*store.TraderFullConfig)
m.cacheMutex.Unlock()
logger.Info("📊 Position sync manager stopped")
}
// run Main loop
func (m *PositionSyncManager) run() {
defer m.wg.Done()
// Execute immediately on startup
m.syncPositions()
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.syncPositions()
}
}
}
// syncPositions Synchronize all position statuses
func (m *PositionSyncManager) syncPositions() {
// Get all OPEN status positions
localPositions, err := m.store.Position().GetAllOpenPositions()
if err != nil {
logger.Infof("⚠️ Failed to get local positions: %v", err)
return
}
if len(localPositions) == 0 {
return
}
// Group by trader_id
positionsByTrader := make(map[string][]*store.TraderPosition)
for _, pos := range localPositions {
positionsByTrader[pos.TraderID] = append(positionsByTrader[pos.TraderID], pos)
}
// Process each trader
for traderID, traderPositions := range positionsByTrader {
m.syncTraderPositions(traderID, traderPositions)
}
}
// syncTraderPositions Synchronize positions for a single trader
func (m *PositionSyncManager) syncTraderPositions(traderID string, localPositions []*store.TraderPosition) {
// Get or create trader instance
trader, err := m.getOrCreateTrader(traderID)
if err != nil {
logger.Infof("⚠️ Failed to get trader instance (ID: %s): %v", traderID, err)
return
}
// Get current exchange positions
exchangePositions, err := trader.GetPositions()
if err != nil {
logger.Infof("⚠️ Failed to get exchange positions (ID: %s): %v", traderID, err)
return
}
// Build exchange position map: symbol_side -> position
exchangeMap := make(map[string]map[string]interface{})
for _, pos := range exchangePositions {
symbol, _ := pos["symbol"].(string)
side, _ := pos["positionSide"].(string)
if symbol == "" || side == "" {
continue
}
key := fmt.Sprintf("%s_%s", symbol, side)
exchangeMap[key] = pos
}
// Compare local and exchange positions
for _, localPos := range localPositions {
key := fmt.Sprintf("%s_%s", localPos.Symbol, localPos.Side)
exchangePos, exists := exchangeMap[key]
if !exists {
// Exchange doesn't have this position → it has been closed
m.closeLocalPosition(localPos, trader, "manual")
continue
}
// Check if quantity is 0 or very small
qty := getFloatFromMap(exchangePos, "positionAmt")
if qty < 0 {
qty = -qty // Short position quantity is negative
}
if qty < 0.0000001 {
// Quantity is 0, position closed
m.closeLocalPosition(localPos, trader, "manual")
}
}
}
// closeLocalPosition Mark local position as closed
func (m *PositionSyncManager) closeLocalPosition(pos *store.TraderPosition, trader Trader, reason string) {
// Try to get last trade price as exit price
exitPrice := pos.EntryPrice // Default to entry price
// Try to get latest price from exchange
if price, err := trader.GetMarketPrice(pos.Symbol); err == nil && price > 0 {
exitPrice = price
}
// Calculate PnL
var realizedPnL float64
if pos.Side == "LONG" {
realizedPnL = (exitPrice - pos.EntryPrice) * pos.Quantity
} else {
realizedPnL = (pos.EntryPrice - exitPrice) * pos.Quantity
}
// Update database
err := m.store.Position().ClosePosition(
pos.ID,
exitPrice,
"", // Manual close has no order ID
realizedPnL,
0, // Manual close cannot get fee
reason,
)
if err != nil {
logger.Infof("⚠️ Failed to update position status: %v", err)
} else {
logger.Infof("📊 Position closed [%s] %s %s @ %.4f → %.4f, PnL: %.2f (%s)",
pos.TraderID[:8], pos.Symbol, pos.Side, pos.EntryPrice, exitPrice, realizedPnL, reason)
}
}
// getOrCreateTrader Get or create trader instance
func (m *PositionSyncManager) getOrCreateTrader(traderID string) (Trader, error) {
m.cacheMutex.RLock()
trader, exists := m.traderCache[traderID]
m.cacheMutex.RUnlock()
if exists && trader != nil {
return trader, nil
}
// Need to create new trader instance
config, err := m.getTraderConfig(traderID)
if err != nil {
return nil, fmt.Errorf("failed to get trader config: %w", err)
}
trader, err = m.createTrader(config)
if err != nil {
return nil, fmt.Errorf("failed to create trader instance: %w", err)
}
m.cacheMutex.Lock()
m.traderCache[traderID] = trader
m.cacheMutex.Unlock()
return trader, nil
}
// getTraderConfig Get trader configuration
func (m *PositionSyncManager) getTraderConfig(traderID string) (*store.TraderFullConfig, error) {
m.cacheMutex.RLock()
config, exists := m.configCache[traderID]
m.cacheMutex.RUnlock()
if exists {
return config, nil
}
// Get from database
traders, err := m.store.Trader().ListAll()
if err != nil {
return nil, fmt.Errorf("failed to get trader list: %w", err)
}
var userID string
for _, t := range traders {
if t.ID == traderID {
userID = t.UserID
break
}
}
if userID == "" {
return nil, fmt.Errorf("trader not found: %s", traderID)
}
config, err = m.store.Trader().GetFullConfig(userID, traderID)
if err != nil {
return nil, err
}
m.cacheMutex.Lock()
m.configCache[traderID] = config
m.cacheMutex.Unlock()
return config, nil
}
// createTrader Create trader instance based on configuration
func (m *PositionSyncManager) createTrader(config *store.TraderFullConfig) (Trader, error) {
exchange := config.Exchange
// Use exchange.ID to determine specific exchange, not exchange.Type (cex/dex)
switch exchange.ID {
case "binance":
return NewFuturesTrader(exchange.APIKey, exchange.SecretKey, config.Trader.UserID), nil
case "bybit":
return NewBybitTrader(exchange.APIKey, exchange.SecretKey), nil
case "okx":
return NewOKXTrader(exchange.APIKey, exchange.SecretKey, exchange.Passphrase), nil
case "hyperliquid":
return NewHyperliquidTrader(exchange.SecretKey, exchange.HyperliquidWalletAddr, exchange.Testnet)
case "aster":
return NewAsterTrader(exchange.AsterUser, exchange.AsterSigner, exchange.AsterPrivateKey)
case "lighter":
if exchange.LighterAPIKeyPrivateKey != "" {
return NewLighterTraderV2(
exchange.LighterPrivateKey,
exchange.LighterWalletAddr,
exchange.LighterAPIKeyPrivateKey,
exchange.Testnet,
)
}
return NewLighterTrader(exchange.LighterPrivateKey, exchange.LighterWalletAddr, exchange.Testnet)
default:
return nil, fmt.Errorf("unsupported exchange: %s", exchange.ID)
}
}
// InvalidateCache Invalidate cache
func (m *PositionSyncManager) InvalidateCache(traderID string) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.traderCache, traderID)
delete(m.configCache, traderID)
}
// getFloatFromMap Get float64 value from map
func getFloatFromMap(m map[string]interface{}, key string) float64 {
if v, ok := m[key]; ok {
switch val := v.(type) {
case float64:
return val
case int64:
return float64(val)
case int:
return float64(val)
case string:
var f float64
fmt.Sscanf(val, "%f", &f)
return f
}
}
return 0
}