Files
nofx/store/order.go
T
tinkle-community 5e65ae7077 fix: chart order markers not displaying due to timestamp format mismatch
- Fix milliseconds to seconds conversion in parseCustomTime (AdvancedChart & ChartWithOrders)
- Add GetTraderOrdersFiltered to filter orders at database level by symbol/status
- Increase order limit from 50 to 200 for more historical orders
- Group multiple orders at same candle time and show count (B3, S5, etc.)
- Buy markers shown below bar (green), sell markers above bar (red)
2026-01-06 21:08:42 +08:00

424 lines
16 KiB
Go

package store
import (
"fmt"
"strconv"
"time"
"gorm.io/gorm"
)
// TraderOrder order record
// All time fields use int64 millisecond timestamps (UTC) to avoid timezone issues
type TraderOrder struct {
ID int64 `gorm:"primaryKey;autoIncrement" json:"id"`
TraderID string `gorm:"column:trader_id;not null;index:idx_orders_trader_id" json:"trader_id"`
ExchangeID string `gorm:"column:exchange_id;not null;default:''" json:"exchange_id"`
ExchangeType string `gorm:"column:exchange_type;not null;default:''" json:"exchange_type"`
ExchangeOrderID string `gorm:"column:exchange_order_id;not null;uniqueIndex:idx_orders_exchange_unique,priority:2" json:"exchange_order_id"`
ClientOrderID string `gorm:"column:client_order_id;default:''" json:"client_order_id"`
Symbol string `gorm:"column:symbol;not null;index:idx_orders_symbol" json:"symbol"`
Side string `gorm:"column:side;not null" json:"side"`
PositionSide string `gorm:"column:position_side;default:''" json:"position_side"`
Type string `gorm:"column:type;not null" json:"type"`
TimeInForce string `gorm:"column:time_in_force;default:GTC" json:"time_in_force"`
Quantity float64 `gorm:"column:quantity;not null" json:"quantity"`
Price float64 `gorm:"column:price;default:0" json:"price"`
StopPrice float64 `gorm:"column:stop_price;default:0" json:"stop_price"`
Status string `gorm:"column:status;not null;default:NEW;index:idx_orders_status" json:"status"`
FilledQuantity float64 `gorm:"column:filled_quantity;default:0" json:"filled_quantity"`
AvgFillPrice float64 `gorm:"column:avg_fill_price;default:0" json:"avg_fill_price"`
Commission float64 `gorm:"column:commission;default:0" json:"commission"`
CommissionAsset string `gorm:"column:commission_asset;default:USDT" json:"commission_asset"`
Leverage int `gorm:"column:leverage;default:1" json:"leverage"`
ReduceOnly bool `gorm:"column:reduce_only;default:false" json:"reduce_only"`
ClosePosition bool `gorm:"column:close_position;default:false" json:"close_position"`
WorkingType string `gorm:"column:working_type;default:CONTRACT_PRICE" json:"working_type"`
PriceProtect bool `gorm:"column:price_protect;default:false" json:"price_protect"`
OrderAction string `gorm:"column:order_action;default:''" json:"order_action"`
RelatedPositionID int64 `gorm:"column:related_position_id;default:0" json:"related_position_id"`
CreatedAt int64 `gorm:"column:created_at" json:"created_at"` // Unix milliseconds UTC
UpdatedAt int64 `gorm:"column:updated_at" json:"updated_at"` // Unix milliseconds UTC
FilledAt int64 `gorm:"column:filled_at" json:"filled_at"` // Unix milliseconds UTC
}
// TableName returns the table name for TraderOrder
func (TraderOrder) TableName() string {
return "trader_orders"
}
// TraderFill trade record
// All time fields use int64 millisecond timestamps (UTC) to avoid timezone issues
type TraderFill struct {
ID int64 `gorm:"primaryKey;autoIncrement" json:"id"`
TraderID string `gorm:"column:trader_id;not null;index:idx_fills_trader_id" json:"trader_id"`
ExchangeID string `gorm:"column:exchange_id;not null;default:''" json:"exchange_id"`
ExchangeType string `gorm:"column:exchange_type;not null;default:''" json:"exchange_type"`
OrderID int64 `gorm:"column:order_id;not null;index:idx_fills_order_id" json:"order_id"`
ExchangeOrderID string `gorm:"column:exchange_order_id;not null" json:"exchange_order_id"`
ExchangeTradeID string `gorm:"column:exchange_trade_id;not null;uniqueIndex:idx_fills_exchange_unique,priority:2" json:"exchange_trade_id"`
Symbol string `gorm:"column:symbol;not null" json:"symbol"`
Side string `gorm:"column:side;not null" json:"side"`
Price float64 `gorm:"column:price;not null" json:"price"`
Quantity float64 `gorm:"column:quantity;not null" json:"quantity"`
QuoteQuantity float64 `gorm:"column:quote_quantity;not null" json:"quote_quantity"`
Commission float64 `gorm:"column:commission;not null" json:"commission"`
CommissionAsset string `gorm:"column:commission_asset;not null" json:"commission_asset"`
RealizedPnL float64 `gorm:"column:realized_pnl;default:0" json:"realized_pnl"`
IsMaker bool `gorm:"column:is_maker;default:false" json:"is_maker"`
CreatedAt int64 `gorm:"column:created_at" json:"created_at"` // Unix milliseconds UTC
}
// TableName returns the table name for TraderFill
func (TraderFill) TableName() string {
return "trader_fills"
}
// OrderStore order storage
type OrderStore struct {
db *gorm.DB
}
// NewOrderStore creates order storage instance
func NewOrderStore(db *gorm.DB) *OrderStore {
return &OrderStore{db: db}
}
// InitTables initializes order tables
func (s *OrderStore) InitTables() error {
// For PostgreSQL, check if tables exist to avoid AutoMigrate index conflicts
if s.db.Dialector.Name() == "postgres" {
var ordersExist, fillsExist int64
s.db.Raw(`SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'trader_orders'`).Scan(&ordersExist)
s.db.Raw(`SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'trader_fills'`).Scan(&fillsExist)
if ordersExist > 0 && fillsExist > 0 {
// Tables exist - fix INTEGER columns to BOOLEAN (from earlier migrations)
// Need to: drop default -> change type -> set new default
boolColumns := []struct{ table, col string }{
{"trader_orders", "reduce_only"},
{"trader_orders", "close_position"},
{"trader_orders", "price_protect"},
{"trader_fills", "is_maker"},
}
for _, c := range boolColumns {
s.db.Exec(fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT", c.table, c.col))
s.db.Exec(fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE BOOLEAN USING %s::int::boolean", c.table, c.col, c.col))
s.db.Exec(fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET DEFAULT false", c.table, c.col))
}
// Migrate timestamp columns to bigint (Unix milliseconds UTC)
// Check if column is still timestamp type before migrating
timestampColumns := []struct{ table, col string }{
{"trader_orders", "created_at"},
{"trader_orders", "updated_at"},
{"trader_orders", "filled_at"},
{"trader_fills", "created_at"},
}
for _, c := range timestampColumns {
var dataType string
s.db.Raw(`SELECT data_type FROM information_schema.columns WHERE table_name = ? AND column_name = ?`, c.table, c.col).Scan(&dataType)
if dataType == "timestamp with time zone" || dataType == "timestamp without time zone" {
// Convert timestamp to Unix milliseconds (bigint)
s.db.Exec(fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s TYPE BIGINT USING EXTRACT(EPOCH FROM %s) * 1000`, c.table, c.col, c.col))
}
}
// Ensure indexes exist
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_orders_exchange_unique ON trader_orders(exchange_id, exchange_order_id)`)
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_fills_exchange_unique ON trader_fills(exchange_id, exchange_trade_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_trader_id ON trader_orders(trader_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_symbol ON trader_orders(symbol)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_status ON trader_orders(status)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_fills_trader_id ON trader_fills(trader_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_fills_order_id ON trader_fills(order_id)`)
return nil
}
}
if err := s.db.AutoMigrate(&TraderOrder{}, &TraderFill{}); err != nil {
return fmt.Errorf("failed to migrate order tables: %w", err)
}
// Create unique composite index for exchange_id + exchange_order_id
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_orders_exchange_unique ON trader_orders(exchange_id, exchange_order_id)`)
// Create unique composite index for exchange_id + exchange_trade_id
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_fills_exchange_unique ON trader_fills(exchange_id, exchange_trade_id)`)
return nil
}
// CreateOrder creates order record
func (s *OrderStore) CreateOrder(order *TraderOrder) error {
// Check if order already exists
existing, err := s.GetOrderByExchangeID(order.ExchangeID, order.ExchangeOrderID)
if err != nil {
return fmt.Errorf("failed to check existing order: %w", err)
}
if existing != nil {
order.ID = existing.ID
order.CreatedAt = existing.CreatedAt
order.UpdatedAt = existing.UpdatedAt
return nil
}
return s.db.Create(order).Error
}
// UpdateOrderStatus updates order status
func (s *OrderStore) UpdateOrderStatus(id int64, status string, filledQty, avgPrice, commission float64) error {
updates := map[string]interface{}{
"status": status,
"filled_quantity": filledQty,
"avg_fill_price": avgPrice,
"commission": commission,
"updated_at": time.Now().UTC().UnixMilli(),
}
if status == "FILLED" {
updates["filled_at"] = time.Now().UTC().UnixMilli()
}
return s.db.Model(&TraderOrder{}).Where("id = ?", id).Updates(updates).Error
}
// CreateFill creates fill record
func (s *OrderStore) CreateFill(fill *TraderFill) error {
// Check if fill already exists
existing, err := s.GetFillByExchangeTradeID(fill.ExchangeID, fill.ExchangeTradeID)
if err != nil {
return fmt.Errorf("failed to check existing fill: %w", err)
}
if existing != nil {
fill.ID = existing.ID
fill.CreatedAt = existing.CreatedAt
return nil
}
return s.db.Create(fill).Error
}
// GetFillByExchangeTradeID gets fill by exchange trade ID
func (s *OrderStore) GetFillByExchangeTradeID(exchangeID, exchangeTradeID string) (*TraderFill, error) {
var fill TraderFill
err := s.db.Where("exchange_id = ? AND exchange_trade_id = ?", exchangeID, exchangeTradeID).First(&fill).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get fill: %w", err)
}
return &fill, nil
}
// GetOrderByExchangeID gets order by exchange order ID
func (s *OrderStore) GetOrderByExchangeID(exchangeID, exchangeOrderID string) (*TraderOrder, error) {
var order TraderOrder
err := s.db.Where("exchange_id = ? AND exchange_order_id = ?", exchangeID, exchangeOrderID).First(&order).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get order: %w", err)
}
return &order, nil
}
// GetTraderOrders gets trader's order list
func (s *OrderStore) GetTraderOrders(traderID string, limit int) ([]*TraderOrder, error) {
var orders []*TraderOrder
err := s.db.Where("trader_id = ?", traderID).
Order("created_at DESC").
Limit(limit).
Find(&orders).Error
if err != nil {
return nil, fmt.Errorf("failed to query orders: %w", err)
}
return orders, nil
}
// GetTraderOrdersFiltered gets trader's order list with optional symbol and status filters
func (s *OrderStore) GetTraderOrdersFiltered(traderID string, symbol string, status string, limit int) ([]*TraderOrder, error) {
var orders []*TraderOrder
query := s.db.Where("trader_id = ?", traderID)
if symbol != "" {
query = query.Where("symbol = ?", symbol)
}
if status != "" {
query = query.Where("status = ?", status)
}
err := query.Order("created_at DESC").
Limit(limit).
Find(&orders).Error
if err != nil {
return nil, fmt.Errorf("failed to query orders: %w", err)
}
return orders, nil
}
// GetOrderFills gets order's fill records
func (s *OrderStore) GetOrderFills(orderID int64) ([]*TraderFill, error) {
var fills []*TraderFill
err := s.db.Where("order_id = ?", orderID).
Order("created_at ASC").
Find(&fills).Error
if err != nil {
return nil, fmt.Errorf("failed to query fills: %w", err)
}
return fills, nil
}
// GetTraderOrderStats gets trader's order statistics
func (s *OrderStore) GetTraderOrderStats(traderID string) (map[string]interface{}, error) {
type result struct {
TotalOrders int
FilledOrders int
CanceledOrders int
TotalCommission float64
TotalVolume float64
}
var r result
err := s.db.Model(&TraderOrder{}).
Select(`COUNT(*) as total_orders,
SUM(CASE WHEN status = 'FILLED' THEN 1 ELSE 0 END) as filled_orders,
SUM(CASE WHEN status = 'CANCELED' THEN 1 ELSE 0 END) as canceled_orders,
SUM(commission) as total_commission,
SUM(filled_quantity * avg_fill_price) as total_volume`).
Where("trader_id = ?", traderID).
Scan(&r).Error
if err != nil {
return nil, fmt.Errorf("failed to get order stats: %w", err)
}
return map[string]interface{}{
"total_orders": r.TotalOrders,
"filled_orders": r.FilledOrders,
"canceled_orders": r.CanceledOrders,
"total_commission": r.TotalCommission,
"total_volume": r.TotalVolume,
}, nil
}
// CleanupDuplicateOrders cleans up duplicate order records
func (s *OrderStore) CleanupDuplicateOrders() (int, error) {
result := s.db.Exec(`
DELETE FROM trader_orders
WHERE id NOT IN (
SELECT MIN(id)
FROM trader_orders
GROUP BY exchange_id, exchange_order_id
)
`)
if result.Error != nil {
return 0, fmt.Errorf("failed to cleanup duplicate orders: %w", result.Error)
}
return int(result.RowsAffected), nil
}
// CleanupDuplicateFills cleans up duplicate fill records
func (s *OrderStore) CleanupDuplicateFills() (int, error) {
result := s.db.Exec(`
DELETE FROM trader_fills
WHERE id NOT IN (
SELECT MIN(id)
FROM trader_fills
GROUP BY exchange_id, exchange_trade_id
)
`)
if result.Error != nil {
return 0, fmt.Errorf("failed to cleanup duplicate fills: %w", result.Error)
}
return int(result.RowsAffected), nil
}
// GetDuplicateOrdersCount gets duplicate orders count
func (s *OrderStore) GetDuplicateOrdersCount() (int, error) {
var total, distinct int64
s.db.Model(&TraderOrder{}).Count(&total)
// Count distinct combinations
var distinctResult struct{ Count int64 }
s.db.Model(&TraderOrder{}).
Select("COUNT(DISTINCT exchange_id || ',' || exchange_order_id) as count").
Scan(&distinctResult)
distinct = distinctResult.Count
return int(total - distinct), nil
}
// GetDuplicateFillsCount gets duplicate fills count
func (s *OrderStore) GetDuplicateFillsCount() (int, error) {
var total, distinct int64
s.db.Model(&TraderFill{}).Count(&total)
var distinctResult struct{ Count int64 }
s.db.Model(&TraderFill{}).
Select("COUNT(DISTINCT exchange_id || ',' || exchange_trade_id) as count").
Scan(&distinctResult)
distinct = distinctResult.Count
return int(total - distinct), nil
}
// GetMaxTradeIDsByExchange returns max trade ID for each symbol for a given exchange
func (s *OrderStore) GetMaxTradeIDsByExchange(exchangeID string) (map[string]int64, error) {
type symbolTradeID struct {
Symbol string
ExchangeTradeID string
}
var results []symbolTradeID
// Query all trade IDs grouped by symbol, find max in Go to avoid database-specific CAST issues
// (PostgreSQL INTEGER is 32-bit, can't handle Binance trade IDs > 2.1B)
err := s.db.Model(&TraderFill{}).
Select("symbol, exchange_trade_id").
Where("exchange_id = ? AND exchange_trade_id != ''", exchangeID).
Find(&results).Error
if err != nil {
return nil, fmt.Errorf("failed to query trade IDs: %w", err)
}
// Find max trade ID per symbol in Go (handles 64-bit integers properly)
result := make(map[string]int64)
for _, r := range results {
tradeID, err := strconv.ParseInt(r.ExchangeTradeID, 10, 64)
if err != nil {
continue // Skip non-numeric trade IDs
}
if tradeID > result[r.Symbol] {
result[r.Symbol] = tradeID
}
}
return result, nil
}
// GetLastFillTimeByExchange returns the most recent fill time (Unix ms) for a given exchange
// Used to recover sync state after service restart
func (s *OrderStore) GetLastFillTimeByExchange(exchangeID string) (int64, error) {
var fill TraderFill
err := s.db.Where("exchange_id = ?", exchangeID).
Order("created_at DESC").
First(&fill).Error
if err != nil {
return 0, err
}
return fill.CreatedAt, nil
}
// GetRecentFillSymbolsByExchange returns distinct symbols with fills since given time (Unix ms)
func (s *OrderStore) GetRecentFillSymbolsByExchange(exchangeID string, sinceMs int64) ([]string, error) {
var symbols []string
err := s.db.Model(&TraderFill{}).
Select("DISTINCT symbol").
Where("exchange_id = ? AND created_at >= ?", exchangeID, sinceMs).
Pluck("symbol", &symbols).Error
if err != nil {
return nil, err
}
return symbols, nil
}