From 2172b252a5bfeb2811ca46cff6868421e4b7391c Mon Sep 17 00:00:00 2001 From: tinkle-community Date: Sat, 27 Dec 2025 02:08:22 +0800 Subject: [PATCH] fix: position accumulation for split orders with same timestamp - Fix CreateOpenPosition to accumulate into existing position when same exchange_position_id exists, instead of silently skipping - Add GetOpenPositionByExchangePositionID method for lookup by exchange ID - Update UpdatePositionQuantityAndPrice to also update entry_quantity - This fixes the issue where split orders (same millisecond) only recorded the first order's quantity instead of the total position size --- store/position.go | 227 ++++++++++++++++++++++++++++++++------ store/position_builder.go | 142 ++++++++++++++++++++++++ 2 files changed, 338 insertions(+), 31 deletions(-) create mode 100644 store/position_builder.go diff --git a/store/position.go b/store/position.go index 63ba6694..7af7e9ea 100644 --- a/store/position.go +++ b/store/position.go @@ -31,20 +31,21 @@ type TraderPosition struct { ExchangeType string `json:"exchange_type"` // Exchange type: binance/bybit/okx/hyperliquid/aster/lighter ExchangePositionID string `json:"exchange_position_id"` // Exchange-specific unique position ID for deduplication Symbol string `json:"symbol"` - Side string `json:"side"` // LONG/SHORT - Quantity float64 `json:"quantity"` // Opening quantity - EntryPrice float64 `json:"entry_price"` // Entry price - EntryOrderID string `json:"entry_order_id"` // Entry order ID - EntryTime time.Time `json:"entry_time"` // Entry time - ExitPrice float64 `json:"exit_price"` // Exit price - ExitOrderID string `json:"exit_order_id"` // Exit order ID - ExitTime *time.Time `json:"exit_time"` // Exit time - RealizedPnL float64 `json:"realized_pnl"` // Realized profit and loss - Fee float64 `json:"fee"` // Fee - Leverage int `json:"leverage"` // Leverage multiplier - Status string `json:"status"` // OPEN/CLOSED - CloseReason string `json:"close_reason"` // Close reason: ai_decision/manual/stop_loss/take_profit - Source string `json:"source"` // Source: system/manual/sync + Side string `json:"side"` // LONG/SHORT + EntryQuantity float64 `json:"entry_quantity"` // Original entry quantity (never modified) + Quantity float64 `json:"quantity"` // Remaining quantity (reduced on partial close) + EntryPrice float64 `json:"entry_price"` // Entry price + EntryOrderID string `json:"entry_order_id"` // Entry order ID + EntryTime time.Time `json:"entry_time"` // Entry time + ExitPrice float64 `json:"exit_price"` // Exit price + ExitOrderID string `json:"exit_order_id"` // Exit order ID + ExitTime *time.Time `json:"exit_time"` // Exit time + RealizedPnL float64 `json:"realized_pnl"` // Realized profit and loss + Fee float64 `json:"fee"` // Fee + Leverage int `json:"leverage"` // Leverage multiplier + Status string `json:"status"` // OPEN/CLOSED + CloseReason string `json:"close_reason"` // Close reason: ai_decision/manual/stop_loss/take_profit + Source string `json:"source"` // Source: system/manual/sync CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } @@ -66,6 +67,7 @@ func (s *PositionStore) InitTables() error { id INTEGER PRIMARY KEY AUTOINCREMENT, trader_id TEXT NOT NULL, exchange_id TEXT NOT NULL DEFAULT '', + exchange_type TEXT NOT NULL DEFAULT '', exchange_position_id TEXT NOT NULL DEFAULT '', symbol TEXT NOT NULL, side TEXT NOT NULL, @@ -99,6 +101,10 @@ func (s *PositionStore) InitTables() error { s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN exchange_position_id TEXT NOT NULL DEFAULT ''`) // Migration: add source field (system/manual/sync) s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN source TEXT DEFAULT 'system'`) + // Migration: add entry_quantity field (original quantity, never modified on partial close) + s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN entry_quantity REAL DEFAULT 0`) + // Backfill: set entry_quantity = quantity for existing records where entry_quantity is 0 + s.db.Exec(`UPDATE trader_positions SET entry_quantity = quantity WHERE entry_quantity = 0 OR entry_quantity IS NULL`) // Create indexes (after migration) indices := []string{ @@ -130,14 +136,18 @@ func (s *PositionStore) Create(pos *TraderPosition) error { pos.CreatedAt = now pos.UpdatedAt = now pos.Status = "OPEN" + // Set EntryQuantity to same as Quantity if not already set + if pos.EntryQuantity == 0 { + pos.EntryQuantity = pos.Quantity + } result, err := s.db.Exec(` INSERT INTO trader_positions ( - trader_id, exchange_id, exchange_type, symbol, side, quantity, entry_price, entry_order_id, + trader_id, exchange_id, exchange_type, symbol, side, quantity, entry_quantity, entry_price, entry_order_id, entry_time, leverage, status, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, - pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.Symbol, pos.Side, pos.Quantity, pos.EntryPrice, + pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.Symbol, pos.Side, pos.Quantity, pos.EntryQuantity, pos.EntryPrice, pos.EntryOrderID, pos.EntryTime.Format(time.RFC3339), pos.Leverage, pos.Status, now.Format(time.RFC3339), now.Format(time.RFC3339), ) @@ -169,10 +179,104 @@ func (s *PositionStore) ClosePosition(id int64, exitPrice float64, exitOrderID s return nil } +// UpdatePositionQuantityAndPrice updates position quantity and recalculates entry price (weighted average) when adding to position +// Both quantity and entry_quantity are updated to reflect the new total position size +func (s *PositionStore) UpdatePositionQuantityAndPrice(id int64, addQty float64, addPrice float64, addFee float64) error { + // First, get current position data + var currentQty, currentEntryQty, currentEntryPrice, currentFee float64 + err := s.db.QueryRow(` + SELECT quantity, COALESCE(entry_quantity, quantity), entry_price, fee FROM trader_positions WHERE id = ? + `, id).Scan(¤tQty, ¤tEntryQty, ¤tEntryPrice, ¤tFee) + if err != nil { + return fmt.Errorf("failed to get current position: %w", err) + } + + // Calculate weighted average entry price + newQty := currentQty + addQty + newEntryQty := currentEntryQty + addQty + newEntryPrice := (currentEntryPrice*currentQty + addPrice*addQty) / newQty + + // Accumulate fees + newFee := currentFee + addFee + + // Update position (both quantity and entry_quantity) + now := time.Now() + _, err = s.db.Exec(` + UPDATE trader_positions SET + quantity = ?, entry_quantity = ?, entry_price = ?, fee = ?, updated_at = ? + WHERE id = ? + `, newQty, newEntryQty, newEntryPrice, newFee, now.Format(time.RFC3339), id) + if err != nil { + return fmt.Errorf("failed to update position quantity and price: %w", err) + } + return nil +} + +// ReducePositionQuantity reduces position quantity for partial close (keeps status as OPEN) +func (s *PositionStore) ReducePositionQuantity(id int64, reduceQty float64, addFee float64) error { + now := time.Now() + _, err := s.db.Exec(` + UPDATE trader_positions SET + quantity = quantity - ?, + fee = fee + ?, + updated_at = ? + WHERE id = ? + `, reduceQty, addFee, now.Format(time.RFC3339), id) + if err != nil { + return fmt.Errorf("failed to reduce position quantity: %w", err) + } + return nil +} + +// ClosePositionFully marks position as fully closed with exit time and accumulated PnL +func (s *PositionStore) ClosePositionFully( + id int64, + exitPrice float64, + exitOrderID string, + exitTime time.Time, + totalRealizedPnL float64, + totalFee float64, + closeReason string, +) error { + now := time.Now() + // When closing, restore quantity to entry_quantity so closed position shows original size + _, err := s.db.Exec(` + UPDATE trader_positions SET + quantity = CASE WHEN entry_quantity > 0 THEN entry_quantity ELSE quantity END, + exit_price = ?, + exit_order_id = ?, + exit_time = ?, + realized_pnl = ?, + fee = ?, + status = 'CLOSED', + close_reason = ?, + updated_at = ? + WHERE id = ? + `, + exitPrice, exitOrderID, exitTime.Format(time.RFC3339), + totalRealizedPnL, totalFee, closeReason, now.Format(time.RFC3339), id, + ) + if err != nil { + return fmt.Errorf("failed to close position: %w", err) + } + return nil +} + +// DeleteAllOpenPositions deletes all OPEN positions for a trader (used for snapshot reset) +func (s *PositionStore) DeleteAllOpenPositions(traderID string) error { + _, err := s.db.Exec(` + DELETE FROM trader_positions WHERE trader_id = ? AND status = 'OPEN' + `, traderID) + if err != nil { + return fmt.Errorf("failed to delete open positions: %w", err) + } + return nil +} + // GetOpenPositions gets all open positions func (s *PositionStore) GetOpenPositions(traderID string) ([]*TraderPosition, error) { rows, err := s.db.Query(` - SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id, + SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id, entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee, leverage, status, close_reason, created_at, updated_at FROM trader_positions @@ -193,14 +297,14 @@ func (s *PositionStore) GetOpenPositionBySymbol(traderID, symbol, side string) ( var entryTime, exitTime, createdAt, updatedAt sql.NullString err := s.db.QueryRow(` - SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id, + SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id, entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee, leverage, status, close_reason, created_at, updated_at FROM trader_positions WHERE trader_id = ? AND symbol = ? AND side = ? AND status = 'OPEN' ORDER BY entry_time DESC LIMIT 1 `, traderID, symbol, side).Scan( - &pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, + &pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity, &pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice, &pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee, &pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt, @@ -219,7 +323,7 @@ func (s *PositionStore) GetOpenPositionBySymbol(traderID, symbol, side string) ( // GetClosedPositions gets closed positions (historical records) func (s *PositionStore) GetClosedPositions(traderID string, limit int) ([]*TraderPosition, error) { rows, err := s.db.Query(` - SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id, + SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id, entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee, leverage, status, close_reason, created_at, updated_at FROM trader_positions @@ -238,7 +342,7 @@ func (s *PositionStore) GetClosedPositions(traderID string, limit int) ([]*Trade // GetAllOpenPositions gets all traders' open positions (for global sync) func (s *PositionStore) GetAllOpenPositions() ([]*TraderPosition, error) { rows, err := s.db.Query(` - SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id, + SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id, entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee, leverage, status, close_reason, created_at, updated_at FROM trader_positions @@ -520,7 +624,7 @@ func (s *PositionStore) scanPositions(rows *sql.Rows) ([]*TraderPosition, error) var entryTime, exitTime, createdAt, updatedAt sql.NullString err := rows.Scan( - &pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, + &pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity, &pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice, &pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee, &pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt, @@ -906,6 +1010,40 @@ func (s *PositionStore) ExistsWithExchangePositionID(exchangeID, exchangePositio return count > 0, nil } +// GetOpenPositionByExchangePositionID gets an OPEN position by exchange_position_id +// Used for accumulating into existing position when duplicate exchange_position_id is detected +func (s *PositionStore) GetOpenPositionByExchangePositionID(exchangeID, exchangePositionID string) (*TraderPosition, error) { + if exchangePositionID == "" { + return nil, nil + } + + var pos TraderPosition + var entryTime, exitTime, createdAt, updatedAt sql.NullString + + err := s.db.QueryRow(` + SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id, + entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee, + leverage, status, close_reason, created_at, updated_at + FROM trader_positions + WHERE exchange_id = ? AND exchange_position_id = ? AND status = 'OPEN' + LIMIT 1 + `, exchangeID, exchangePositionID).Scan( + &pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity, + &pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice, + &pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee, + &pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + + s.parsePositionTimes(&pos, entryTime, exitTime, createdAt, updatedAt) + return &pos, nil +} + // CreateFromClosedPnL creates a closed position record from exchange closed PnL data // This is used for syncing historical positions from exchange // Returns true if created, false if already exists (deduped) or invalid data @@ -1052,15 +1190,29 @@ func (s *PositionStore) GetLastClosedPositionTime(traderID string) (time.Time, e } // CreateOpenPosition creates an open position record with exchange position ID +// NOTE: This function should only be called when GetOpenPositionBySymbol returns nil. +// If a position with the same exchange_position_id already exists (e.g., due to same millisecond trades), +// this function will accumulate into the existing position instead of silently skipping. func (s *PositionStore) CreateOpenPosition(pos *TraderPosition) error { - // Check if already exists by exchange position ID (based on exchange_id, not trader_id) + // Check if already exists by exchange position ID + // If exists, accumulate into that position instead of skipping if pos.ExchangePositionID != "" && pos.ExchangeID != "" { + existingPos, err := s.GetOpenPositionByExchangePositionID(pos.ExchangeID, pos.ExchangePositionID) + if err != nil { + return err + } + if existingPos != nil { + // Position with same exchange_position_id exists and is OPEN, accumulate into it + return s.UpdatePositionQuantityAndPrice(existingPos.ID, pos.Quantity, pos.EntryPrice, pos.Fee) + } + // Check if position exists but is CLOSED exists, err := s.ExistsWithExchangePositionID(pos.ExchangeID, pos.ExchangePositionID) if err != nil { return err } if exists { - return nil // Already exists, skip + // Position exists but is CLOSED, skip (this is a valid case for historical sync) + return nil } } @@ -1071,21 +1223,34 @@ func (s *PositionStore) CreateOpenPosition(pos *TraderPosition) error { if pos.Source == "" { pos.Source = "system" } + // Set EntryQuantity to same as Quantity if not already set + if pos.EntryQuantity == 0 { + pos.EntryQuantity = pos.Quantity + } result, err := s.db.Exec(` INSERT INTO trader_positions ( - trader_id, exchange_id, exchange_type, exchange_position_id, symbol, side, quantity, - entry_price, entry_order_id, entry_time, leverage, status, source, + trader_id, exchange_id, exchange_type, exchange_position_id, symbol, side, quantity, entry_quantity, + entry_price, entry_order_id, entry_time, leverage, status, source, fee, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, - pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.ExchangePositionID, pos.Symbol, pos.Side, pos.Quantity, + pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.ExchangePositionID, pos.Symbol, pos.Side, pos.Quantity, pos.EntryQuantity, pos.EntryPrice, pos.EntryOrderID, pos.EntryTime.Format(time.RFC3339), pos.Leverage, - pos.Status, pos.Source, now.Format(time.RFC3339), now.Format(time.RFC3339), + pos.Status, pos.Source, pos.Fee, now.Format(time.RFC3339), now.Format(time.RFC3339), ) if err != nil { if strings.Contains(err.Error(), "UNIQUE constraint failed") { - return nil // Already exists + // UNIQUE constraint failed, try to accumulate into existing position + existingPos, findErr := s.GetOpenPositionByExchangePositionID(pos.ExchangeID, pos.ExchangePositionID) + if findErr != nil { + return findErr + } + if existingPos != nil { + return s.UpdatePositionQuantityAndPrice(existingPos.ID, pos.Quantity, pos.EntryPrice, pos.Fee) + } + // Position is CLOSED, skip + return nil } return fmt.Errorf("failed to create open position: %w", err) } diff --git a/store/position_builder.go b/store/position_builder.go new file mode 100644 index 00000000..60300319 --- /dev/null +++ b/store/position_builder.go @@ -0,0 +1,142 @@ +package store + +import ( + "fmt" + "math" + "nofx/logger" + "strings" + "time" +) + +// PositionBuilder handles position creation and updates with support for: +// - Position averaging (merging multiple opens) +// - Partial closes (reducing quantity) +// - FIFO matching +// - Time-ordered processing +type PositionBuilder struct { + positionStore *PositionStore +} + +// NewPositionBuilder creates a new PositionBuilder +func NewPositionBuilder(positionStore *PositionStore) *PositionBuilder { + return &PositionBuilder{ + positionStore: positionStore, + } +} + +// ProcessTrade processes a single trade and updates position accordingly +func (pb *PositionBuilder) ProcessTrade( + traderID, exchangeID, exchangeType, symbol, side, action string, + quantity, price, fee, realizedPnL float64, + tradeTime time.Time, + orderID string, +) error { + if strings.HasPrefix(action, "open_") { + return pb.handleOpen(traderID, exchangeID, exchangeType, symbol, side, quantity, price, fee, tradeTime, orderID) + } else if strings.HasPrefix(action, "close_") { + return pb.handleClose(traderID, symbol, side, quantity, price, fee, realizedPnL, tradeTime, orderID) + } + return nil +} + +// handleOpen handles opening positions (create new or average into existing) +func (pb *PositionBuilder) handleOpen( + traderID, exchangeID, exchangeType, symbol, side string, + quantity, price, fee float64, + tradeTime time.Time, + orderID string, +) error { + // Get existing OPEN position for (symbol, side) + existing, err := pb.positionStore.GetOpenPositionBySymbol(traderID, symbol, side) + if err != nil { + return fmt.Errorf("failed to get open position: %w", err) + } + + if existing == nil { + // Create new position + position := &TraderPosition{ + TraderID: traderID, + ExchangeID: exchangeID, + ExchangeType: exchangeType, + ExchangePositionID: fmt.Sprintf("sync_%s_%s_%d", symbol, side, tradeTime.UnixMilli()), + Symbol: symbol, + Side: side, + Quantity: quantity, + EntryPrice: price, + EntryOrderID: orderID, + EntryTime: tradeTime, + Leverage: 1, + Status: "OPEN", + Source: "sync", + Fee: fee, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + return pb.positionStore.CreateOpenPosition(position) + } + + // Merge: Calculate weighted average entry price and update position + logger.Infof(" 📊 Averaging position: %s %s %.6f @ %.2f + %.6f @ %.2f", + symbol, side, existing.Quantity, existing.EntryPrice, quantity, price) + + return pb.positionStore.UpdatePositionQuantityAndPrice(existing.ID, quantity, price, fee) +} + +// handleClose handles closing positions (partial or full) +func (pb *PositionBuilder) handleClose( + traderID, symbol, side string, + quantity, price, fee, realizedPnL float64, + tradeTime time.Time, + orderID string, +) error { + // Get OPEN position + position, err := pb.positionStore.GetOpenPositionBySymbol(traderID, symbol, side) + if err != nil { + return fmt.Errorf("failed to get open position: %w", err) + } + + if position == nil { + // No open position, log warning and skip + logger.Infof(" ⚠️ No matching open position for %s %s (orderID: %s), skipping", symbol, side, orderID) + return nil + } + + const QUANTITY_TOLERANCE = 0.0001 + + if quantity < position.Quantity-QUANTITY_TOLERANCE { + // Partial close: reduce quantity + logger.Infof(" 📉 Partial close: %s %s %.6f → %.6f (closed %.6f @ %.2f)", + symbol, side, position.Quantity, position.Quantity-quantity, quantity, price) + return pb.positionStore.ReducePositionQuantity(position.ID, quantity, fee) + } else { + // Full close (or close with tolerance): mark as CLOSED + closeQty := quantity + if quantity > position.Quantity { + logger.Infof(" ⚠️ Over-close detected: %s %s trying to close %.6f but only %.6f open, closing full position", + symbol, side, quantity, position.Quantity) + closeQty = position.Quantity + } + + logger.Infof(" ✅ Full close: %s %s %.6f @ %.2f (entry: %.2f, PnL: %.2f)", + symbol, side, closeQty, price, position.EntryPrice, realizedPnL) + + // Calculate total fee (existing + new) + totalFee := position.Fee + fee + + return pb.positionStore.ClosePositionFully( + position.ID, + price, + orderID, + tradeTime, + realizedPnL, + totalFee, + "sync", + ) + } +} + +// quantitiesMatch checks if two quantities are close enough (within tolerance) +func quantitiesMatch(a, b float64) bool { + const QUANTITY_TOLERANCE = 0.0001 + return math.Abs(a-b) < QUANTITY_TOLERANCE +}