fix: position accumulation for split orders with same timestamp

- Fix CreateOpenPosition to accumulate into existing position when same
  exchange_position_id exists, instead of silently skipping
- Add GetOpenPositionByExchangePositionID method for lookup by exchange ID
- Update UpdatePositionQuantityAndPrice to also update entry_quantity
- This fixes the issue where split orders (same millisecond) only recorded
  the first order's quantity instead of the total position size
This commit is contained in:
tinkle-community
2025-12-27 02:08:22 +08:00
parent 24cd329f3d
commit 2172b252a5
2 changed files with 338 additions and 31 deletions
+196 -31
View File
@@ -31,20 +31,21 @@ type TraderPosition struct {
ExchangeType string `json:"exchange_type"` // Exchange type: binance/bybit/okx/hyperliquid/aster/lighter
ExchangePositionID string `json:"exchange_position_id"` // Exchange-specific unique position ID for deduplication
Symbol string `json:"symbol"`
Side string `json:"side"` // LONG/SHORT
Quantity float64 `json:"quantity"` // Opening quantity
EntryPrice float64 `json:"entry_price"` // Entry price
EntryOrderID string `json:"entry_order_id"` // Entry order ID
EntryTime time.Time `json:"entry_time"` // Entry time
ExitPrice float64 `json:"exit_price"` // Exit price
ExitOrderID string `json:"exit_order_id"` // Exit order ID
ExitTime *time.Time `json:"exit_time"` // Exit time
RealizedPnL float64 `json:"realized_pnl"` // Realized profit and loss
Fee float64 `json:"fee"` // Fee
Leverage int `json:"leverage"` // Leverage multiplier
Status string `json:"status"` // OPEN/CLOSED
CloseReason string `json:"close_reason"` // Close reason: ai_decision/manual/stop_loss/take_profit
Source string `json:"source"` // Source: system/manual/sync
Side string `json:"side"` // LONG/SHORT
EntryQuantity float64 `json:"entry_quantity"` // Original entry quantity (never modified)
Quantity float64 `json:"quantity"` // Remaining quantity (reduced on partial close)
EntryPrice float64 `json:"entry_price"` // Entry price
EntryOrderID string `json:"entry_order_id"` // Entry order ID
EntryTime time.Time `json:"entry_time"` // Entry time
ExitPrice float64 `json:"exit_price"` // Exit price
ExitOrderID string `json:"exit_order_id"` // Exit order ID
ExitTime *time.Time `json:"exit_time"` // Exit time
RealizedPnL float64 `json:"realized_pnl"` // Realized profit and loss
Fee float64 `json:"fee"` // Fee
Leverage int `json:"leverage"` // Leverage multiplier
Status string `json:"status"` // OPEN/CLOSED
CloseReason string `json:"close_reason"` // Close reason: ai_decision/manual/stop_loss/take_profit
Source string `json:"source"` // Source: system/manual/sync
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
@@ -66,6 +67,7 @@ func (s *PositionStore) InitTables() error {
id INTEGER PRIMARY KEY AUTOINCREMENT,
trader_id TEXT NOT NULL,
exchange_id TEXT NOT NULL DEFAULT '',
exchange_type TEXT NOT NULL DEFAULT '',
exchange_position_id TEXT NOT NULL DEFAULT '',
symbol TEXT NOT NULL,
side TEXT NOT NULL,
@@ -99,6 +101,10 @@ func (s *PositionStore) InitTables() error {
s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN exchange_position_id TEXT NOT NULL DEFAULT ''`)
// Migration: add source field (system/manual/sync)
s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN source TEXT DEFAULT 'system'`)
// Migration: add entry_quantity field (original quantity, never modified on partial close)
s.db.Exec(`ALTER TABLE trader_positions ADD COLUMN entry_quantity REAL DEFAULT 0`)
// Backfill: set entry_quantity = quantity for existing records where entry_quantity is 0
s.db.Exec(`UPDATE trader_positions SET entry_quantity = quantity WHERE entry_quantity = 0 OR entry_quantity IS NULL`)
// Create indexes (after migration)
indices := []string{
@@ -130,14 +136,18 @@ func (s *PositionStore) Create(pos *TraderPosition) error {
pos.CreatedAt = now
pos.UpdatedAt = now
pos.Status = "OPEN"
// Set EntryQuantity to same as Quantity if not already set
if pos.EntryQuantity == 0 {
pos.EntryQuantity = pos.Quantity
}
result, err := s.db.Exec(`
INSERT INTO trader_positions (
trader_id, exchange_id, exchange_type, symbol, side, quantity, entry_price, entry_order_id,
trader_id, exchange_id, exchange_type, symbol, side, quantity, entry_quantity, entry_price, entry_order_id,
entry_time, leverage, status, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.Symbol, pos.Side, pos.Quantity, pos.EntryPrice,
pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.Symbol, pos.Side, pos.Quantity, pos.EntryQuantity, pos.EntryPrice,
pos.EntryOrderID, pos.EntryTime.Format(time.RFC3339), pos.Leverage,
pos.Status, now.Format(time.RFC3339), now.Format(time.RFC3339),
)
@@ -169,10 +179,104 @@ func (s *PositionStore) ClosePosition(id int64, exitPrice float64, exitOrderID s
return nil
}
// UpdatePositionQuantityAndPrice updates position quantity and recalculates entry price (weighted average) when adding to position
// Both quantity and entry_quantity are updated to reflect the new total position size
func (s *PositionStore) UpdatePositionQuantityAndPrice(id int64, addQty float64, addPrice float64, addFee float64) error {
// First, get current position data
var currentQty, currentEntryQty, currentEntryPrice, currentFee float64
err := s.db.QueryRow(`
SELECT quantity, COALESCE(entry_quantity, quantity), entry_price, fee FROM trader_positions WHERE id = ?
`, id).Scan(&currentQty, &currentEntryQty, &currentEntryPrice, &currentFee)
if err != nil {
return fmt.Errorf("failed to get current position: %w", err)
}
// Calculate weighted average entry price
newQty := currentQty + addQty
newEntryQty := currentEntryQty + addQty
newEntryPrice := (currentEntryPrice*currentQty + addPrice*addQty) / newQty
// Accumulate fees
newFee := currentFee + addFee
// Update position (both quantity and entry_quantity)
now := time.Now()
_, err = s.db.Exec(`
UPDATE trader_positions SET
quantity = ?, entry_quantity = ?, entry_price = ?, fee = ?, updated_at = ?
WHERE id = ?
`, newQty, newEntryQty, newEntryPrice, newFee, now.Format(time.RFC3339), id)
if err != nil {
return fmt.Errorf("failed to update position quantity and price: %w", err)
}
return nil
}
// ReducePositionQuantity reduces position quantity for partial close (keeps status as OPEN)
func (s *PositionStore) ReducePositionQuantity(id int64, reduceQty float64, addFee float64) error {
now := time.Now()
_, err := s.db.Exec(`
UPDATE trader_positions SET
quantity = quantity - ?,
fee = fee + ?,
updated_at = ?
WHERE id = ?
`, reduceQty, addFee, now.Format(time.RFC3339), id)
if err != nil {
return fmt.Errorf("failed to reduce position quantity: %w", err)
}
return nil
}
// ClosePositionFully marks position as fully closed with exit time and accumulated PnL
func (s *PositionStore) ClosePositionFully(
id int64,
exitPrice float64,
exitOrderID string,
exitTime time.Time,
totalRealizedPnL float64,
totalFee float64,
closeReason string,
) error {
now := time.Now()
// When closing, restore quantity to entry_quantity so closed position shows original size
_, err := s.db.Exec(`
UPDATE trader_positions SET
quantity = CASE WHEN entry_quantity > 0 THEN entry_quantity ELSE quantity END,
exit_price = ?,
exit_order_id = ?,
exit_time = ?,
realized_pnl = ?,
fee = ?,
status = 'CLOSED',
close_reason = ?,
updated_at = ?
WHERE id = ?
`,
exitPrice, exitOrderID, exitTime.Format(time.RFC3339),
totalRealizedPnL, totalFee, closeReason, now.Format(time.RFC3339), id,
)
if err != nil {
return fmt.Errorf("failed to close position: %w", err)
}
return nil
}
// DeleteAllOpenPositions deletes all OPEN positions for a trader (used for snapshot reset)
func (s *PositionStore) DeleteAllOpenPositions(traderID string) error {
_, err := s.db.Exec(`
DELETE FROM trader_positions WHERE trader_id = ? AND status = 'OPEN'
`, traderID)
if err != nil {
return fmt.Errorf("failed to delete open positions: %w", err)
}
return nil
}
// GetOpenPositions gets all open positions
func (s *PositionStore) GetOpenPositions(traderID string) ([]*TraderPosition, error) {
rows, err := s.db.Query(`
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id,
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id,
entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee,
leverage, status, close_reason, created_at, updated_at
FROM trader_positions
@@ -193,14 +297,14 @@ func (s *PositionStore) GetOpenPositionBySymbol(traderID, symbol, side string) (
var entryTime, exitTime, createdAt, updatedAt sql.NullString
err := s.db.QueryRow(`
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id,
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id,
entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee,
leverage, status, close_reason, created_at, updated_at
FROM trader_positions
WHERE trader_id = ? AND symbol = ? AND side = ? AND status = 'OPEN'
ORDER BY entry_time DESC LIMIT 1
`, traderID, symbol, side).Scan(
&pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity,
&pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity,
&pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice,
&pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee,
&pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt,
@@ -219,7 +323,7 @@ func (s *PositionStore) GetOpenPositionBySymbol(traderID, symbol, side string) (
// GetClosedPositions gets closed positions (historical records)
func (s *PositionStore) GetClosedPositions(traderID string, limit int) ([]*TraderPosition, error) {
rows, err := s.db.Query(`
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id,
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id,
entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee,
leverage, status, close_reason, created_at, updated_at
FROM trader_positions
@@ -238,7 +342,7 @@ func (s *PositionStore) GetClosedPositions(traderID string, limit int) ([]*Trade
// GetAllOpenPositions gets all traders' open positions (for global sync)
func (s *PositionStore) GetAllOpenPositions() ([]*TraderPosition, error) {
rows, err := s.db.Query(`
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, entry_price, entry_order_id,
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id,
entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee,
leverage, status, close_reason, created_at, updated_at
FROM trader_positions
@@ -520,7 +624,7 @@ func (s *PositionStore) scanPositions(rows *sql.Rows) ([]*TraderPosition, error)
var entryTime, exitTime, createdAt, updatedAt sql.NullString
err := rows.Scan(
&pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity,
&pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity,
&pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice,
&pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee,
&pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt,
@@ -906,6 +1010,40 @@ func (s *PositionStore) ExistsWithExchangePositionID(exchangeID, exchangePositio
return count > 0, nil
}
// GetOpenPositionByExchangePositionID gets an OPEN position by exchange_position_id
// Used for accumulating into existing position when duplicate exchange_position_id is detected
func (s *PositionStore) GetOpenPositionByExchangePositionID(exchangeID, exchangePositionID string) (*TraderPosition, error) {
if exchangePositionID == "" {
return nil, nil
}
var pos TraderPosition
var entryTime, exitTime, createdAt, updatedAt sql.NullString
err := s.db.QueryRow(`
SELECT id, trader_id, exchange_id, COALESCE(exchange_type, '') as exchange_type, symbol, side, quantity, COALESCE(entry_quantity, quantity) as entry_quantity, entry_price, entry_order_id,
entry_time, exit_price, exit_order_id, exit_time, realized_pnl, fee,
leverage, status, close_reason, created_at, updated_at
FROM trader_positions
WHERE exchange_id = ? AND exchange_position_id = ? AND status = 'OPEN'
LIMIT 1
`, exchangeID, exchangePositionID).Scan(
&pos.ID, &pos.TraderID, &pos.ExchangeID, &pos.ExchangeType, &pos.Symbol, &pos.Side, &pos.Quantity, &pos.EntryQuantity,
&pos.EntryPrice, &pos.EntryOrderID, &entryTime, &pos.ExitPrice,
&pos.ExitOrderID, &exitTime, &pos.RealizedPnL, &pos.Fee,
&pos.Leverage, &pos.Status, &pos.CloseReason, &createdAt, &updatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
s.parsePositionTimes(&pos, entryTime, exitTime, createdAt, updatedAt)
return &pos, nil
}
// CreateFromClosedPnL creates a closed position record from exchange closed PnL data
// This is used for syncing historical positions from exchange
// Returns true if created, false if already exists (deduped) or invalid data
@@ -1052,15 +1190,29 @@ func (s *PositionStore) GetLastClosedPositionTime(traderID string) (time.Time, e
}
// CreateOpenPosition creates an open position record with exchange position ID
// NOTE: This function should only be called when GetOpenPositionBySymbol returns nil.
// If a position with the same exchange_position_id already exists (e.g., due to same millisecond trades),
// this function will accumulate into the existing position instead of silently skipping.
func (s *PositionStore) CreateOpenPosition(pos *TraderPosition) error {
// Check if already exists by exchange position ID (based on exchange_id, not trader_id)
// Check if already exists by exchange position ID
// If exists, accumulate into that position instead of skipping
if pos.ExchangePositionID != "" && pos.ExchangeID != "" {
existingPos, err := s.GetOpenPositionByExchangePositionID(pos.ExchangeID, pos.ExchangePositionID)
if err != nil {
return err
}
if existingPos != nil {
// Position with same exchange_position_id exists and is OPEN, accumulate into it
return s.UpdatePositionQuantityAndPrice(existingPos.ID, pos.Quantity, pos.EntryPrice, pos.Fee)
}
// Check if position exists but is CLOSED
exists, err := s.ExistsWithExchangePositionID(pos.ExchangeID, pos.ExchangePositionID)
if err != nil {
return err
}
if exists {
return nil // Already exists, skip
// Position exists but is CLOSED, skip (this is a valid case for historical sync)
return nil
}
}
@@ -1071,21 +1223,34 @@ func (s *PositionStore) CreateOpenPosition(pos *TraderPosition) error {
if pos.Source == "" {
pos.Source = "system"
}
// Set EntryQuantity to same as Quantity if not already set
if pos.EntryQuantity == 0 {
pos.EntryQuantity = pos.Quantity
}
result, err := s.db.Exec(`
INSERT INTO trader_positions (
trader_id, exchange_id, exchange_type, exchange_position_id, symbol, side, quantity,
entry_price, entry_order_id, entry_time, leverage, status, source,
trader_id, exchange_id, exchange_type, exchange_position_id, symbol, side, quantity, entry_quantity,
entry_price, entry_order_id, entry_time, leverage, status, source, fee,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.ExchangePositionID, pos.Symbol, pos.Side, pos.Quantity,
pos.TraderID, pos.ExchangeID, pos.ExchangeType, pos.ExchangePositionID, pos.Symbol, pos.Side, pos.Quantity, pos.EntryQuantity,
pos.EntryPrice, pos.EntryOrderID, pos.EntryTime.Format(time.RFC3339), pos.Leverage,
pos.Status, pos.Source, now.Format(time.RFC3339), now.Format(time.RFC3339),
pos.Status, pos.Source, pos.Fee, now.Format(time.RFC3339), now.Format(time.RFC3339),
)
if err != nil {
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
return nil // Already exists
// UNIQUE constraint failed, try to accumulate into existing position
existingPos, findErr := s.GetOpenPositionByExchangePositionID(pos.ExchangeID, pos.ExchangePositionID)
if findErr != nil {
return findErr
}
if existingPos != nil {
return s.UpdatePositionQuantityAndPrice(existingPos.ID, pos.Quantity, pos.EntryPrice, pos.Fee)
}
// Position is CLOSED, skip
return nil
}
return fmt.Errorf("failed to create open position: %w", err)
}
+142
View File
@@ -0,0 +1,142 @@
package store
import (
"fmt"
"math"
"nofx/logger"
"strings"
"time"
)
// PositionBuilder handles position creation and updates with support for:
// - Position averaging (merging multiple opens)
// - Partial closes (reducing quantity)
// - FIFO matching
// - Time-ordered processing
type PositionBuilder struct {
positionStore *PositionStore
}
// NewPositionBuilder creates a new PositionBuilder
func NewPositionBuilder(positionStore *PositionStore) *PositionBuilder {
return &PositionBuilder{
positionStore: positionStore,
}
}
// ProcessTrade processes a single trade and updates position accordingly
func (pb *PositionBuilder) ProcessTrade(
traderID, exchangeID, exchangeType, symbol, side, action string,
quantity, price, fee, realizedPnL float64,
tradeTime time.Time,
orderID string,
) error {
if strings.HasPrefix(action, "open_") {
return pb.handleOpen(traderID, exchangeID, exchangeType, symbol, side, quantity, price, fee, tradeTime, orderID)
} else if strings.HasPrefix(action, "close_") {
return pb.handleClose(traderID, symbol, side, quantity, price, fee, realizedPnL, tradeTime, orderID)
}
return nil
}
// handleOpen handles opening positions (create new or average into existing)
func (pb *PositionBuilder) handleOpen(
traderID, exchangeID, exchangeType, symbol, side string,
quantity, price, fee float64,
tradeTime time.Time,
orderID string,
) error {
// Get existing OPEN position for (symbol, side)
existing, err := pb.positionStore.GetOpenPositionBySymbol(traderID, symbol, side)
if err != nil {
return fmt.Errorf("failed to get open position: %w", err)
}
if existing == nil {
// Create new position
position := &TraderPosition{
TraderID: traderID,
ExchangeID: exchangeID,
ExchangeType: exchangeType,
ExchangePositionID: fmt.Sprintf("sync_%s_%s_%d", symbol, side, tradeTime.UnixMilli()),
Symbol: symbol,
Side: side,
Quantity: quantity,
EntryPrice: price,
EntryOrderID: orderID,
EntryTime: tradeTime,
Leverage: 1,
Status: "OPEN",
Source: "sync",
Fee: fee,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
return pb.positionStore.CreateOpenPosition(position)
}
// Merge: Calculate weighted average entry price and update position
logger.Infof(" 📊 Averaging position: %s %s %.6f @ %.2f + %.6f @ %.2f",
symbol, side, existing.Quantity, existing.EntryPrice, quantity, price)
return pb.positionStore.UpdatePositionQuantityAndPrice(existing.ID, quantity, price, fee)
}
// handleClose handles closing positions (partial or full)
func (pb *PositionBuilder) handleClose(
traderID, symbol, side string,
quantity, price, fee, realizedPnL float64,
tradeTime time.Time,
orderID string,
) error {
// Get OPEN position
position, err := pb.positionStore.GetOpenPositionBySymbol(traderID, symbol, side)
if err != nil {
return fmt.Errorf("failed to get open position: %w", err)
}
if position == nil {
// No open position, log warning and skip
logger.Infof(" ⚠️ No matching open position for %s %s (orderID: %s), skipping", symbol, side, orderID)
return nil
}
const QUANTITY_TOLERANCE = 0.0001
if quantity < position.Quantity-QUANTITY_TOLERANCE {
// Partial close: reduce quantity
logger.Infof(" 📉 Partial close: %s %s %.6f → %.6f (closed %.6f @ %.2f)",
symbol, side, position.Quantity, position.Quantity-quantity, quantity, price)
return pb.positionStore.ReducePositionQuantity(position.ID, quantity, fee)
} else {
// Full close (or close with tolerance): mark as CLOSED
closeQty := quantity
if quantity > position.Quantity {
logger.Infof(" ⚠️ Over-close detected: %s %s trying to close %.6f but only %.6f open, closing full position",
symbol, side, quantity, position.Quantity)
closeQty = position.Quantity
}
logger.Infof(" ✅ Full close: %s %s %.6f @ %.2f (entry: %.2f, PnL: %.2f)",
symbol, side, closeQty, price, position.EntryPrice, realizedPnL)
// Calculate total fee (existing + new)
totalFee := position.Fee + fee
return pb.positionStore.ClosePositionFully(
position.ID,
price,
orderID,
tradeTime,
realizedPnL,
totalFee,
"sync",
)
}
}
// quantitiesMatch checks if two quantities are close enough (within tolerance)
func quantitiesMatch(a, b float64) bool {
const QUANTITY_TOLERANCE = 0.0001
return math.Abs(a-b) < QUANTITY_TOLERANCE
}