Files
nofx/backtest/runner.go
T
tinkle-community cb31782be4 refactor: split large files and clean up project structure
- Rename experience/ to telemetry/ for clarity
- Split 15+ large Go files (800-2200 lines) into focused modules:
  kernel/engine.go, backtest/runner.go, market/data.go, store/position.go,
  api/handler_trader.go, trader/auto_trader_grid.go, and 9 exchange traders
- Split frontend monoliths: types.ts, api.ts, AITradersPage.tsx, BacktestPage.tsx
  into domain-specific modules with barrel re-exports
- Remove stale files: screenshots, .yml.old, pyproject.toml
- Remove unused scripts/ and cmd/ directories
- Remove broken/outdated test files (network-dependent, stale expectations)
2026-03-12 12:53:57 +08:00

342 lines
7.7 KiB
Go

package backtest
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"nofx/kernel"
"nofx/logger"
"nofx/mcp"
)
var (
errBacktestCompleted = errors.New("backtest completed")
errLiquidated = errors.New("account liquidated")
)
const (
metricsWriteInterval = 5 * time.Second
aiDecisionMaxRetries = 3
)
// Runner encapsulates the lifecycle of a single backtest run.
type Runner struct {
cfg BacktestConfig
feed *DataFeed
account *BacktestAccount
strategyEngine *kernel.StrategyEngine
decisionLogDir string
mcpClient mcp.AIClient
statusMu sync.RWMutex
status RunState
stateMu sync.RWMutex
state *BacktestState
pauseCh chan struct{}
resumeCh chan struct{}
stopCh chan struct{}
doneCh chan struct{}
err error
errMu sync.RWMutex
lastError string
lastCheckpoint time.Time
createdAt time.Time
lastMetricsWrite time.Time
aiCache *AICache
cachePath string
lockInfo *RunLockInfo
lockStop chan struct{}
lockStopOnce sync.Once // Ensures lockStop is closed only once
}
// NewRunner constructs a backtest runner.
func NewRunner(cfg BacktestConfig, mcpClient mcp.AIClient) (*Runner, error) {
if err := ensureRunDir(cfg.RunID); err != nil {
return nil, err
}
client, err := configureMCPClient(cfg, mcpClient)
if err != nil {
return nil, err
}
feed, err := NewDataFeed(cfg)
if err != nil {
return nil, err
}
if err := os.MkdirAll(decisionLogDir(cfg.RunID), 0o755); err != nil {
return nil, err
}
dLogDir := decisionLogDir(cfg.RunID)
account := NewBacktestAccount(cfg.InitialBalance, cfg.FeeBps, cfg.SlippageBps)
createdAt := time.Now().UTC()
state := &BacktestState{
Positions: make(map[string]PositionSnapshot),
Cash: account.Cash(),
Equity: cfg.InitialBalance,
UnrealizedPnL: 0,
RealizedPnL: 0,
MaxEquity: cfg.InitialBalance,
MinEquity: cfg.InitialBalance,
MaxDrawdownPct: 0,
LastUpdate: createdAt,
}
var (
aiCache *AICache
cachePath string
)
if cfg.CacheAI || cfg.ReplayOnly || cfg.SharedAICachePath != "" {
cachePath = cfg.SharedAICachePath
if cachePath == "" {
cachePath = filepath.Join(runDir(cfg.RunID), "ai_cache.json")
}
cache, err := LoadAICache(cachePath)
if err != nil {
return nil, fmt.Errorf("load ai cache: %w", err)
}
aiCache = cache
}
// Create strategy engine from backtest config for unified prompt generation
strategyConfig := cfg.ToStrategyConfig()
strategyEngine := kernel.NewStrategyEngine(strategyConfig)
r := &Runner{
cfg: cfg,
feed: feed,
account: account,
strategyEngine: strategyEngine,
decisionLogDir: dLogDir,
mcpClient: client,
status: RunStateCreated,
state: state,
pauseCh: make(chan struct{}, 1),
resumeCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
doneCh: make(chan struct{}),
createdAt: createdAt,
aiCache: aiCache,
cachePath: cachePath,
}
if err := r.initLock(); err != nil {
return nil, err
}
return r, nil
}
func (r *Runner) initLock() error {
if r.cfg.RunID == "" {
return fmt.Errorf("run_id required for lock")
}
info, err := acquireRunLock(r.cfg.RunID)
if err != nil {
return err
}
r.lockInfo = info
r.lockStop = make(chan struct{})
go r.lockHeartbeatLoop()
return nil
}
func (r *Runner) lockHeartbeatLoop() {
ticker := time.NewTicker(lockHeartbeatInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := updateRunLockHeartbeat(r.lockInfo); err != nil {
logger.Infof("failed to update lock heartbeat for %s: %v", r.cfg.RunID, err)
}
case <-r.lockStop:
return
}
}
}
func (r *Runner) releaseLock() {
// Use sync.Once to ensure channel is closed exactly once, preventing panic on double-close
r.lockStopOnce.Do(func() {
if r.lockStop != nil {
close(r.lockStop)
}
})
if err := deleteRunLock(r.cfg.RunID); err != nil {
logger.Infof("failed to release lock for %s: %v", r.cfg.RunID, err)
}
r.lockInfo = nil
}
// Start launches the backtest loop.
func (r *Runner) Start(ctx context.Context) error {
r.statusMu.Lock()
if r.status != RunStateCreated && r.status != RunStatePaused {
r.statusMu.Unlock()
return fmt.Errorf("cannot start runner in state %s", r.status)
}
r.status = RunStateRunning
r.statusMu.Unlock()
go r.loop(ctx)
return nil
}
// PersistMetadata writes the current snapshot to run.json.
func (r *Runner) PersistMetadata() {
r.persistMetadata()
}
func (r *Runner) setLastError(err error) {
r.errMu.Lock()
defer r.errMu.Unlock()
if err == nil {
r.lastError = ""
return
}
r.lastError = err.Error()
}
func (r *Runner) lastErrorString() string {
r.errMu.RLock()
defer r.errMu.RUnlock()
return r.lastError
}
// CurrentMetadata returns the metadata corresponding to the current in-memory state.
func (r *Runner) CurrentMetadata() *RunMetadata {
state := r.snapshotState()
meta := r.buildMetadata(state, r.Status())
meta.CreatedAt = r.createdAt
meta.UpdatedAt = state.LastUpdate
return meta
}
func (r *Runner) Pause() {
select {
case r.pauseCh <- struct{}{}:
default:
}
}
func (r *Runner) Resume() {
select {
case r.resumeCh <- struct{}{}:
default:
}
}
func (r *Runner) Stop() {
select {
case r.stopCh <- struct{}{}:
default:
}
}
func (r *Runner) Wait() error {
<-r.doneCh
r.statusMu.RLock()
defer r.statusMu.RUnlock()
return r.err
}
// Status returns the current run state.
func (r *Runner) Status() RunState {
r.statusMu.RLock()
defer r.statusMu.RUnlock()
return r.status
}
// StatusPayload builds the status response for the API.
func (r *Runner) StatusPayload() StatusPayload {
snapshot := r.snapshotState()
progress := progressPercent(snapshot, r.cfg)
// Build position statuses with unrealized P&L
positions := make([]PositionStatus, 0, len(snapshot.Positions))
for _, pos := range snapshot.Positions {
if pos.Quantity <= 0 {
continue
}
// Get mark price from feed if available
markPrice := pos.AvgPrice // fallback to entry price
if r.feed != nil && snapshot.BarTimestamp > 0 {
if md, _, err := r.feed.BuildMarketData(snapshot.BarTimestamp); err == nil {
if data, ok := md[pos.Symbol]; ok {
markPrice = data.CurrentPrice
}
}
}
// Calculate unrealized P&L
var unrealizedPnL float64
if pos.Side == "long" {
unrealizedPnL = (markPrice - pos.AvgPrice) * pos.Quantity
} else {
unrealizedPnL = (pos.AvgPrice - markPrice) * pos.Quantity
}
// Calculate P&L percentage based on margin
pnlPct := 0.0
if pos.MarginUsed > 0 {
pnlPct = (unrealizedPnL / pos.MarginUsed) * 100
}
positions = append(positions, PositionStatus{
Symbol: pos.Symbol,
Side: pos.Side,
Quantity: pos.Quantity,
EntryPrice: pos.AvgPrice,
MarkPrice: markPrice,
Leverage: pos.Leverage,
UnrealizedPnL: unrealizedPnL,
UnrealizedPnLPct: pnlPct,
MarginUsed: pos.MarginUsed,
})
}
payload := StatusPayload{
RunID: r.cfg.RunID,
State: r.Status(),
ProgressPct: progress,
ProcessedBars: snapshot.BarIndex,
CurrentTime: snapshot.BarTimestamp,
DecisionCycle: snapshot.DecisionCycle,
Equity: snapshot.Equity,
UnrealizedPnL: snapshot.UnrealizedPnL,
RealizedPnL: snapshot.RealizedPnL,
Positions: positions,
Note: snapshot.LiquidationNote,
LastError: r.lastErrorString(),
LastUpdatedIso: snapshot.LastUpdate.UTC().Format(time.RFC3339),
}
return payload
}
func (r *Runner) snapshotState() BacktestState {
r.stateMu.RLock()
defer r.stateMu.RUnlock()
copyState := *r.state
copyState.Positions = make(map[string]PositionSnapshot, len(r.state.Positions))
for k, v := range r.state.Positions {
copyState.Positions[k] = v
}
return copyState
}