Files
nofx/store/order.go
T
tinkle-community 2d272bb7b8 feat: migrate store layer to GORM with PostgreSQL support
- Migrate all store packages from raw database/sql to GORM ORM
- Add PostgreSQL support alongside SQLite
- Move EncryptedString type to crypto package for cleaner architecture
- Add automatic encryption/decryption for sensitive fields (API keys, secrets)
- Fix PostgreSQL AutoMigrate conflicts by skipping existing tables
- Fix duplicate /klines route registration
- Update tests to use GORM database connections
- Add database configuration support in config package
2026-01-01 19:32:49 +08:00

339 lines
13 KiB
Go

package store
import (
"fmt"
"strings"
"time"
"gorm.io/gorm"
)
// TraderOrder order record
type TraderOrder struct {
ID int64 `gorm:"primaryKey;autoIncrement" json:"id"`
TraderID string `gorm:"column:trader_id;not null;index:idx_orders_trader_id" json:"trader_id"`
ExchangeID string `gorm:"column:exchange_id;not null;default:''" json:"exchange_id"`
ExchangeType string `gorm:"column:exchange_type;not null;default:''" json:"exchange_type"`
ExchangeOrderID string `gorm:"column:exchange_order_id;not null;uniqueIndex:idx_orders_exchange_unique,priority:2" json:"exchange_order_id"`
ClientOrderID string `gorm:"column:client_order_id;default:''" json:"client_order_id"`
Symbol string `gorm:"column:symbol;not null;index:idx_orders_symbol" json:"symbol"`
Side string `gorm:"column:side;not null" json:"side"`
PositionSide string `gorm:"column:position_side;default:''" json:"position_side"`
Type string `gorm:"column:type;not null" json:"type"`
TimeInForce string `gorm:"column:time_in_force;default:GTC" json:"time_in_force"`
Quantity float64 `gorm:"column:quantity;not null" json:"quantity"`
Price float64 `gorm:"column:price;default:0" json:"price"`
StopPrice float64 `gorm:"column:stop_price;default:0" json:"stop_price"`
Status string `gorm:"column:status;not null;default:NEW;index:idx_orders_status" json:"status"`
FilledQuantity float64 `gorm:"column:filled_quantity;default:0" json:"filled_quantity"`
AvgFillPrice float64 `gorm:"column:avg_fill_price;default:0" json:"avg_fill_price"`
Commission float64 `gorm:"column:commission;default:0" json:"commission"`
CommissionAsset string `gorm:"column:commission_asset;default:USDT" json:"commission_asset"`
Leverage int `gorm:"column:leverage;default:1" json:"leverage"`
ReduceOnly bool `gorm:"column:reduce_only;default:false" json:"reduce_only"`
ClosePosition bool `gorm:"column:close_position;default:false" json:"close_position"`
WorkingType string `gorm:"column:working_type;default:CONTRACT_PRICE" json:"working_type"`
PriceProtect bool `gorm:"column:price_protect;default:false" json:"price_protect"`
OrderAction string `gorm:"column:order_action;default:''" json:"order_action"`
RelatedPositionID int64 `gorm:"column:related_position_id;default:0" json:"related_position_id"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"created_at"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime" json:"updated_at"`
FilledAt time.Time `gorm:"column:filled_at" json:"filled_at"`
}
// TableName returns the table name for TraderOrder
func (TraderOrder) TableName() string {
return "trader_orders"
}
// TraderFill trade record
type TraderFill struct {
ID int64 `gorm:"primaryKey;autoIncrement" json:"id"`
TraderID string `gorm:"column:trader_id;not null;index:idx_fills_trader_id" json:"trader_id"`
ExchangeID string `gorm:"column:exchange_id;not null;default:''" json:"exchange_id"`
ExchangeType string `gorm:"column:exchange_type;not null;default:''" json:"exchange_type"`
OrderID int64 `gorm:"column:order_id;not null;index:idx_fills_order_id" json:"order_id"`
ExchangeOrderID string `gorm:"column:exchange_order_id;not null" json:"exchange_order_id"`
ExchangeTradeID string `gorm:"column:exchange_trade_id;not null;uniqueIndex:idx_fills_exchange_unique,priority:2" json:"exchange_trade_id"`
Symbol string `gorm:"column:symbol;not null" json:"symbol"`
Side string `gorm:"column:side;not null" json:"side"`
Price float64 `gorm:"column:price;not null" json:"price"`
Quantity float64 `gorm:"column:quantity;not null" json:"quantity"`
QuoteQuantity float64 `gorm:"column:quote_quantity;not null" json:"quote_quantity"`
Commission float64 `gorm:"column:commission;not null" json:"commission"`
CommissionAsset string `gorm:"column:commission_asset;not null" json:"commission_asset"`
RealizedPnL float64 `gorm:"column:realized_pnl;default:0" json:"realized_pnl"`
IsMaker bool `gorm:"column:is_maker;default:false" json:"is_maker"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"created_at"`
}
// TableName returns the table name for TraderFill
func (TraderFill) TableName() string {
return "trader_fills"
}
// OrderStore order storage
type OrderStore struct {
db *gorm.DB
}
// NewOrderStore creates order storage instance
func NewOrderStore(db *gorm.DB) *OrderStore {
return &OrderStore{db: db}
}
// InitTables initializes order tables
func (s *OrderStore) InitTables() error {
// For PostgreSQL, check if tables exist to avoid AutoMigrate index conflicts
if s.db.Dialector.Name() == "postgres" {
var ordersExist, fillsExist int64
s.db.Raw(`SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'trader_orders'`).Scan(&ordersExist)
s.db.Raw(`SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'trader_fills'`).Scan(&fillsExist)
if ordersExist > 0 && fillsExist > 0 {
// Tables exist - just ensure indexes exist, skip AutoMigrate
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_orders_exchange_unique ON trader_orders(exchange_id, exchange_order_id)`)
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_fills_exchange_unique ON trader_fills(exchange_id, exchange_trade_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_trader_id ON trader_orders(trader_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_symbol ON trader_orders(symbol)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_orders_status ON trader_orders(status)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_fills_trader_id ON trader_fills(trader_id)`)
s.db.Exec(`CREATE INDEX IF NOT EXISTS idx_fills_order_id ON trader_fills(order_id)`)
return nil
}
}
if err := s.db.AutoMigrate(&TraderOrder{}, &TraderFill{}); err != nil {
return fmt.Errorf("failed to migrate order tables: %w", err)
}
// Create unique composite index for exchange_id + exchange_order_id
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_orders_exchange_unique ON trader_orders(exchange_id, exchange_order_id)`)
// Create unique composite index for exchange_id + exchange_trade_id
s.db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_fills_exchange_unique ON trader_fills(exchange_id, exchange_trade_id)`)
return nil
}
// CreateOrder creates order record
func (s *OrderStore) CreateOrder(order *TraderOrder) error {
// Check if order already exists
existing, err := s.GetOrderByExchangeID(order.ExchangeID, order.ExchangeOrderID)
if err != nil {
return fmt.Errorf("failed to check existing order: %w", err)
}
if existing != nil {
order.ID = existing.ID
order.CreatedAt = existing.CreatedAt
order.UpdatedAt = existing.UpdatedAt
return nil
}
return s.db.Create(order).Error
}
// UpdateOrderStatus updates order status
func (s *OrderStore) UpdateOrderStatus(id int64, status string, filledQty, avgPrice, commission float64) error {
updates := map[string]interface{}{
"status": status,
"filled_quantity": filledQty,
"avg_fill_price": avgPrice,
"commission": commission,
}
if status == "FILLED" {
updates["filled_at"] = time.Now()
}
return s.db.Model(&TraderOrder{}).Where("id = ?", id).Updates(updates).Error
}
// CreateFill creates fill record
func (s *OrderStore) CreateFill(fill *TraderFill) error {
// Check if fill already exists
existing, err := s.GetFillByExchangeTradeID(fill.ExchangeID, fill.ExchangeTradeID)
if err != nil {
return fmt.Errorf("failed to check existing fill: %w", err)
}
if existing != nil {
fill.ID = existing.ID
fill.CreatedAt = existing.CreatedAt
return nil
}
return s.db.Create(fill).Error
}
// GetFillByExchangeTradeID gets fill by exchange trade ID
func (s *OrderStore) GetFillByExchangeTradeID(exchangeID, exchangeTradeID string) (*TraderFill, error) {
var fill TraderFill
err := s.db.Where("exchange_id = ? AND exchange_trade_id = ?", exchangeID, exchangeTradeID).First(&fill).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get fill: %w", err)
}
return &fill, nil
}
// GetOrderByExchangeID gets order by exchange order ID
func (s *OrderStore) GetOrderByExchangeID(exchangeID, exchangeOrderID string) (*TraderOrder, error) {
var order TraderOrder
err := s.db.Where("exchange_id = ? AND exchange_order_id = ?", exchangeID, exchangeOrderID).First(&order).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, fmt.Errorf("failed to get order: %w", err)
}
return &order, nil
}
// GetTraderOrders gets trader's order list
func (s *OrderStore) GetTraderOrders(traderID string, limit int) ([]*TraderOrder, error) {
var orders []*TraderOrder
err := s.db.Where("trader_id = ?", traderID).
Order("created_at DESC").
Limit(limit).
Find(&orders).Error
if err != nil {
return nil, fmt.Errorf("failed to query orders: %w", err)
}
return orders, nil
}
// GetOrderFills gets order's fill records
func (s *OrderStore) GetOrderFills(orderID int64) ([]*TraderFill, error) {
var fills []*TraderFill
err := s.db.Where("order_id = ?", orderID).
Order("created_at ASC").
Find(&fills).Error
if err != nil {
return nil, fmt.Errorf("failed to query fills: %w", err)
}
return fills, nil
}
// GetTraderOrderStats gets trader's order statistics
func (s *OrderStore) GetTraderOrderStats(traderID string) (map[string]interface{}, error) {
type result struct {
TotalOrders int
FilledOrders int
CanceledOrders int
TotalCommission float64
TotalVolume float64
}
var r result
err := s.db.Model(&TraderOrder{}).
Select(`COUNT(*) as total_orders,
SUM(CASE WHEN status = 'FILLED' THEN 1 ELSE 0 END) as filled_orders,
SUM(CASE WHEN status = 'CANCELED' THEN 1 ELSE 0 END) as canceled_orders,
SUM(commission) as total_commission,
SUM(filled_quantity * avg_fill_price) as total_volume`).
Where("trader_id = ?", traderID).
Scan(&r).Error
if err != nil {
return nil, fmt.Errorf("failed to get order stats: %w", err)
}
return map[string]interface{}{
"total_orders": r.TotalOrders,
"filled_orders": r.FilledOrders,
"canceled_orders": r.CanceledOrders,
"total_commission": r.TotalCommission,
"total_volume": r.TotalVolume,
}, nil
}
// CleanupDuplicateOrders cleans up duplicate order records
func (s *OrderStore) CleanupDuplicateOrders() (int, error) {
result := s.db.Exec(`
DELETE FROM trader_orders
WHERE id NOT IN (
SELECT MIN(id)
FROM trader_orders
GROUP BY exchange_id, exchange_order_id
)
`)
if result.Error != nil {
return 0, fmt.Errorf("failed to cleanup duplicate orders: %w", result.Error)
}
return int(result.RowsAffected), nil
}
// CleanupDuplicateFills cleans up duplicate fill records
func (s *OrderStore) CleanupDuplicateFills() (int, error) {
result := s.db.Exec(`
DELETE FROM trader_fills
WHERE id NOT IN (
SELECT MIN(id)
FROM trader_fills
GROUP BY exchange_id, exchange_trade_id
)
`)
if result.Error != nil {
return 0, fmt.Errorf("failed to cleanup duplicate fills: %w", result.Error)
}
return int(result.RowsAffected), nil
}
// GetDuplicateOrdersCount gets duplicate orders count
func (s *OrderStore) GetDuplicateOrdersCount() (int, error) {
var total, distinct int64
s.db.Model(&TraderOrder{}).Count(&total)
// Count distinct combinations
var distinctResult struct{ Count int64 }
s.db.Model(&TraderOrder{}).
Select("COUNT(DISTINCT exchange_id || ',' || exchange_order_id) as count").
Scan(&distinctResult)
distinct = distinctResult.Count
return int(total - distinct), nil
}
// GetDuplicateFillsCount gets duplicate fills count
func (s *OrderStore) GetDuplicateFillsCount() (int, error) {
var total, distinct int64
s.db.Model(&TraderFill{}).Count(&total)
var distinctResult struct{ Count int64 }
s.db.Model(&TraderFill{}).
Select("COUNT(DISTINCT exchange_id || ',' || exchange_trade_id) as count").
Scan(&distinctResult)
distinct = distinctResult.Count
return int(total - distinct), nil
}
// GetMaxTradeIDsByExchange returns max trade ID for each symbol for a given exchange
func (s *OrderStore) GetMaxTradeIDsByExchange(exchangeID string) (map[string]int64, error) {
type symbolMaxID struct {
Symbol string
MaxTradeID int64
}
var results []symbolMaxID
err := s.db.Model(&TraderFill{}).
Select("symbol, MAX(CAST(exchange_trade_id AS INTEGER)) as max_trade_id").
Where("exchange_id = ? AND exchange_trade_id != ''", exchangeID).
Group("symbol").
Find(&results).Error
if err != nil {
// If CAST fails (non-numeric trade IDs), fallback to string comparison
if strings.Contains(err.Error(), "CAST") || strings.Contains(err.Error(), "invalid") {
return make(map[string]int64), nil
}
return nil, fmt.Errorf("failed to query max trade IDs: %w", err)
}
result := make(map[string]int64)
for _, r := range results {
result[r.Symbol] = r.MaxTradeID
}
return result, nil
}