mirror of
https://github.com/laoxong/nofx.git
synced 2026-06-04 01:48:22 +08:00
4cadf6f442
- Fix Stop() race condition using sync.Once - Add ensureHistory() to prevent nil panic in planner/dispatcher - Add bounds check on trader ID slicing - Log saveExecutionState and clearSetupState errors instead of discarding - Remove always-true modelID condition in onboard setup - Add Chinese setup keywords and expand model name aliases - Strip max_tokens from claw402 requests to avoid thinking-model budget exhaustion - Hide Agent nav tab (Beta) pending merge to main - Sync tests with code changes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2479 lines
81 KiB
Go
2479 lines
81 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"nofx/mcp"
|
|
)
|
|
|
|
const (
|
|
plannerMaxSteps = 8
|
|
plannerMaxIterations = 12
|
|
observationMaxLength = 400
|
|
)
|
|
|
|
var (
|
|
plannerCreateTimeout = 36 * time.Second
|
|
plannerReplanTimeout = 24 * time.Second
|
|
plannerReasonTimeout = 30 * time.Second
|
|
plannerFinalTimeout = 36 * time.Second
|
|
directReplyTimeout = 8 * time.Second
|
|
)
|
|
|
|
type replannerDecision struct {
|
|
Action string `json:"action"`
|
|
Goal string `json:"goal,omitempty"`
|
|
Steps []PlanStep `json:"steps,omitempty"`
|
|
Instruction string `json:"instruction,omitempty"`
|
|
Question string `json:"question,omitempty"`
|
|
}
|
|
|
|
type readFastPathRequest struct {
|
|
Kind string
|
|
ArgsJSON string
|
|
}
|
|
|
|
type directReplyDecision struct {
|
|
Action string `json:"action"`
|
|
Answer string `json:"answer,omitempty"`
|
|
}
|
|
|
|
func latestAskedQuestion(state ExecutionState) string {
|
|
if state.Waiting != nil && strings.TrimSpace(state.Waiting.Question) != "" {
|
|
return strings.TrimSpace(state.Waiting.Question)
|
|
}
|
|
for i := len(state.Steps) - 1; i >= 0; i-- {
|
|
step := state.Steps[i]
|
|
if step.Type == planStepTypeAskUser {
|
|
if q := strings.TrimSpace(step.Instruction); q != "" {
|
|
return q
|
|
}
|
|
if q := strings.TrimSpace(step.OutputSummary); q != "" {
|
|
return q
|
|
}
|
|
}
|
|
}
|
|
if state.Status == executionStatusWaitingUser {
|
|
return strings.TrimSpace(state.FinalAnswer)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func buildWaitingState(state ExecutionState, step PlanStep, question string) *WaitingState {
|
|
waiting := &WaitingState{
|
|
Question: strings.TrimSpace(question),
|
|
Intent: inferWaitingIntent(state.Goal, step, question),
|
|
PendingFields: inferPendingFields(step, question),
|
|
ConfirmationTarget: inferConfirmationTarget(state.Goal, step, question),
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
return normalizeWaitingState(waiting)
|
|
}
|
|
|
|
func inferWaitingIntent(goal string, step PlanStep, question string) string {
|
|
lowerGoal := strings.ToLower(strings.TrimSpace(goal))
|
|
lowerQuestion := strings.ToLower(strings.TrimSpace(question))
|
|
switch {
|
|
case step.RequiresConfirmation || strings.Contains(lowerQuestion, "需要我") || strings.Contains(lowerQuestion, "confirm") || strings.Contains(lowerQuestion, "确认"):
|
|
return "confirm_action"
|
|
case strings.Contains(lowerGoal, "交易员") || strings.Contains(lowerGoal, "trader"):
|
|
return "complete_trader_setup"
|
|
case strings.Contains(lowerGoal, "交易所") || strings.Contains(lowerGoal, "exchange"):
|
|
return "complete_exchange_config"
|
|
case strings.Contains(lowerGoal, "模型") || strings.Contains(lowerGoal, "model"):
|
|
return "complete_model_config"
|
|
default:
|
|
return "provide_missing_information"
|
|
}
|
|
}
|
|
|
|
func inferPendingFields(step PlanStep, question string) []string {
|
|
source := strings.ToLower(strings.TrimSpace(question))
|
|
if source == "" {
|
|
sourceBytes, _ := json.Marshal(step.ToolArgs)
|
|
source = strings.ToLower(string(sourceBytes))
|
|
}
|
|
candidates := []struct {
|
|
key string
|
|
patterns []string
|
|
}{
|
|
{key: "ai_model_id", patterns: []string{"ai_model_id", "model id", "模型id", "模型 id"}},
|
|
{key: "exchange_id", patterns: []string{"exchange_id", "exchange id", "交易所id", "交易所 id"}},
|
|
{key: "strategy_id", patterns: []string{"strategy_id", "strategy id", "策略id", "策略 id"}},
|
|
{key: "name", patterns: []string{"trader name", "name", "名字", "名称"}},
|
|
{key: "api_key", patterns: []string{"api key", "apikey", "api_key"}},
|
|
{key: "secret_key", patterns: []string{"secret key", "secret_key", "密钥"}},
|
|
{key: "passphrase", patterns: []string{"passphrase", "密码短语"}},
|
|
}
|
|
fields := make([]string, 0, len(candidates))
|
|
for _, candidate := range candidates {
|
|
for _, pattern := range candidate.patterns {
|
|
if strings.Contains(source, pattern) {
|
|
fields = append(fields, candidate.key)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return cleanStringList(fields)
|
|
}
|
|
|
|
func inferConfirmationTarget(goal string, step PlanStep, question string) string {
|
|
if step.RequiresConfirmation {
|
|
if step.ToolName != "" {
|
|
return step.ToolName
|
|
}
|
|
}
|
|
lowerGoal := strings.ToLower(strings.TrimSpace(goal))
|
|
lowerQuestion := strings.ToLower(strings.TrimSpace(question))
|
|
switch {
|
|
case strings.Contains(lowerGoal, "交易员") || strings.Contains(lowerQuestion, "交易员") || strings.Contains(lowerGoal, "trader"):
|
|
return "trader"
|
|
case strings.Contains(lowerGoal, "交易所") || strings.Contains(lowerQuestion, "交易所") || strings.Contains(lowerGoal, "exchange"):
|
|
return "exchange_config"
|
|
case strings.Contains(lowerGoal, "模型") || strings.Contains(lowerQuestion, "模型") || strings.Contains(lowerGoal, "model"):
|
|
return "model_config"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func isConfigOrTraderIntent(text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
keywords := []string{
|
|
"交易员", "trader", "exchange", "交易所", "模型", "model", "api key", "apikey",
|
|
"绑定", "配置", "setup", "configure", "deepseek", "openai", "claude", "gemini",
|
|
"okx", "binance", "bybit", "gate", "kucoin", "hyperliquid", "aster", "lighter",
|
|
}
|
|
for _, kw := range keywords {
|
|
if strings.Contains(lower, kw) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isStrategyIntent(text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
keywords := []string{
|
|
"策略", "strategy", "template", "模板", "激进", "趋势跟踪", "网格策略",
|
|
"量化策略", "策略模板", "strategy studio",
|
|
}
|
|
for _, kw := range keywords {
|
|
if strings.Contains(lower, kw) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isRealtimeAccountIntent(text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
keywords := []string{
|
|
"余额", "balance", "equity", "净值", "available", "available balance",
|
|
"持仓", "position", "positions", "仓位", "unrealized pnl", "浮盈", "浮亏",
|
|
"交易历史", "trade history", "history", "closed trades", "recent trades",
|
|
"订单", "order", "orders", "成交", "pnl", "profit", "loss",
|
|
}
|
|
for _, kw := range keywords {
|
|
if strings.Contains(lower, kw) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func snapshotKindsForIntent(userText string) []string {
|
|
kinds := make([]string, 0, 6)
|
|
return uniqueStrings(kinds)
|
|
}
|
|
|
|
func uniqueStrings(values []string) []string {
|
|
if len(values) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]string, 0, len(values))
|
|
seen := make(map[string]struct{}, len(values))
|
|
for _, value := range values {
|
|
if _, ok := seen[value]; ok {
|
|
continue
|
|
}
|
|
seen[value] = struct{}{}
|
|
out = append(out, value)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func withPlannerStageTimeout(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
|
if timeout <= 0 {
|
|
return context.WithCancel(ctx)
|
|
}
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
remaining := time.Until(deadline)
|
|
if remaining <= timeout {
|
|
return context.WithCancel(ctx)
|
|
}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|
|
|
|
func isPlannerTimeoutError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
return errors.Is(err, context.DeadlineExceeded)
|
|
}
|
|
|
|
func plannerTimeoutMessage(lang string) string {
|
|
if lang == "zh" {
|
|
return "⏱️ 当前请求处理超时,请重试一次。若持续出现,请把问题拆小一点。"
|
|
}
|
|
return "⏱️ This request timed out. Please try again, or break it into a smaller request."
|
|
}
|
|
|
|
func shouldResetExecutionStateForNewAttempt(text string, state ExecutionState) bool {
|
|
if state.SessionID == "" {
|
|
return false
|
|
}
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
retrySignals := []string{
|
|
"再试", "重试", "重新", "继续", "继续创建", "我已经配置好了", "已经配置好了", "我配好了",
|
|
"我已经弄好了", "已经弄好了", "好了", "retry", "try again", "continue", "resume",
|
|
"i configured it", "i've configured it", "i already configured", "configured already",
|
|
}
|
|
for _, signal := range retrySignals {
|
|
if strings.Contains(lower, signal) {
|
|
return true
|
|
}
|
|
}
|
|
if isConfigOrTraderIntent(lower) && (state.Status == executionStatusFailed || state.Status == executionStatusCompleted) {
|
|
return true
|
|
}
|
|
if isConfigOrTraderIntent(lower) && state.Status == executionStatusWaitingUser {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func ensureCurrentReferences(state *ExecutionState) {
|
|
if state.CurrentReferences == nil {
|
|
state.CurrentReferences = &CurrentReferences{}
|
|
}
|
|
}
|
|
|
|
func preferReference(current **EntityReference, id, name string) {
|
|
id = strings.TrimSpace(id)
|
|
name = strings.TrimSpace(name)
|
|
if id == "" && name == "" {
|
|
return
|
|
}
|
|
if *current == nil {
|
|
*current = &EntityReference{}
|
|
}
|
|
if id != "" {
|
|
(*current).ID = id
|
|
}
|
|
if name != "" {
|
|
(*current).Name = name
|
|
}
|
|
}
|
|
|
|
func matchEntityReference(text string, candidates []EntityReference) *EntityReference {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return nil
|
|
}
|
|
var matched *EntityReference
|
|
for _, candidate := range candidates {
|
|
id := strings.ToLower(strings.TrimSpace(candidate.ID))
|
|
name := strings.ToLower(strings.TrimSpace(candidate.Name))
|
|
if id == "" && name == "" {
|
|
continue
|
|
}
|
|
if (id != "" && strings.Contains(lower, id)) || (name != "" && strings.Contains(lower, name)) {
|
|
if matched != nil {
|
|
return nil
|
|
}
|
|
copy := candidate
|
|
matched = ©
|
|
}
|
|
}
|
|
return matched
|
|
}
|
|
|
|
func (a *Agent) refreshCurrentReferencesForUserText(storeUserID, text string, state *ExecutionState) {
|
|
if a.store == nil || strings.TrimSpace(text) == "" {
|
|
return
|
|
}
|
|
ensureCurrentReferences(state)
|
|
|
|
if strategies, err := a.store.Strategy().List(storeUserID); err == nil {
|
|
candidates := make([]EntityReference, 0, len(strategies))
|
|
for _, strategy := range strategies {
|
|
candidates = append(candidates, EntityReference{ID: strategy.ID, Name: strategy.Name})
|
|
}
|
|
if ref := matchEntityReference(text, candidates); ref != nil {
|
|
preferReference(&state.CurrentReferences.Strategy, ref.ID, ref.Name)
|
|
}
|
|
}
|
|
if traders, err := a.store.Trader().List(storeUserID); err == nil {
|
|
candidates := make([]EntityReference, 0, len(traders))
|
|
for _, trader := range traders {
|
|
candidates = append(candidates, EntityReference{ID: trader.ID, Name: trader.Name})
|
|
}
|
|
if ref := matchEntityReference(text, candidates); ref != nil {
|
|
preferReference(&state.CurrentReferences.Trader, ref.ID, ref.Name)
|
|
}
|
|
}
|
|
if models, err := a.store.AIModel().List(storeUserID); err == nil {
|
|
candidates := make([]EntityReference, 0, len(models))
|
|
for _, model := range models {
|
|
name := model.Name
|
|
if name == "" {
|
|
name = model.CustomModelName
|
|
}
|
|
if name == "" {
|
|
name = model.Provider
|
|
}
|
|
candidates = append(candidates, EntityReference{ID: model.ID, Name: name})
|
|
}
|
|
if ref := matchEntityReference(text, candidates); ref != nil {
|
|
preferReference(&state.CurrentReferences.Model, ref.ID, ref.Name)
|
|
}
|
|
}
|
|
if exchanges, err := a.store.Exchange().List(storeUserID); err == nil {
|
|
candidates := make([]EntityReference, 0, len(exchanges))
|
|
for _, exchange := range exchanges {
|
|
name := exchange.AccountName
|
|
if name == "" {
|
|
name = exchange.ExchangeType
|
|
}
|
|
candidates = append(candidates, EntityReference{ID: exchange.ID, Name: name})
|
|
}
|
|
if ref := matchEntityReference(text, candidates); ref != nil {
|
|
preferReference(&state.CurrentReferences.Exchange, ref.ID, ref.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateCurrentReferencesFromToolResult(state *ExecutionState, toolName, raw string) bool {
|
|
if strings.TrimSpace(raw) == "" {
|
|
return false
|
|
}
|
|
var payload map[string]any
|
|
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
|
|
return false
|
|
}
|
|
ensureCurrentReferences(state)
|
|
before, _ := json.Marshal(state.CurrentReferences)
|
|
|
|
switch toolName {
|
|
case "manage_strategy":
|
|
if item, ok := payload["strategy"].(map[string]any); ok {
|
|
preferReference(&state.CurrentReferences.Strategy, asString(item["id"]), asString(item["name"]))
|
|
}
|
|
case "manage_trader":
|
|
if item, ok := payload["trader"].(map[string]any); ok {
|
|
preferReference(&state.CurrentReferences.Trader, asString(item["id"]), asString(item["name"]))
|
|
preferReference(&state.CurrentReferences.Model, asString(item["ai_model_id"]), "")
|
|
preferReference(&state.CurrentReferences.Exchange, asString(item["exchange_id"]), "")
|
|
preferReference(&state.CurrentReferences.Strategy, asString(item["strategy_id"]), "")
|
|
}
|
|
case "manage_model_config":
|
|
if item, ok := payload["model"].(map[string]any); ok {
|
|
name := asString(item["name"])
|
|
if name == "" {
|
|
name = asString(item["provider"])
|
|
}
|
|
preferReference(&state.CurrentReferences.Model, asString(item["id"]), name)
|
|
}
|
|
case "manage_exchange_config":
|
|
if item, ok := payload["exchange"].(map[string]any); ok {
|
|
name := asString(item["account_name"])
|
|
if name == "" {
|
|
name = asString(item["exchange_type"])
|
|
}
|
|
preferReference(&state.CurrentReferences.Exchange, asString(item["id"]), name)
|
|
}
|
|
case "get_strategies":
|
|
if items, ok := payload["strategies"].([]any); ok && len(items) == 1 {
|
|
if item, ok := items[0].(map[string]any); ok {
|
|
preferReference(&state.CurrentReferences.Strategy, asString(item["id"]), asString(item["name"]))
|
|
}
|
|
}
|
|
}
|
|
state.CurrentReferences = normalizeCurrentReferences(state.CurrentReferences)
|
|
after, _ := json.Marshal(state.CurrentReferences)
|
|
return string(before) != string(after)
|
|
}
|
|
|
|
func asString(v any) string {
|
|
s, _ := v.(string)
|
|
return strings.TrimSpace(s)
|
|
}
|
|
|
|
func containsAnyKeyword(text string, keywords []string) bool {
|
|
for _, keyword := range keywords {
|
|
if strings.Contains(text, keyword) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func detectReadFastPath(text string) *readFastPathRequest {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return nil
|
|
}
|
|
|
|
switch lower {
|
|
case "/traders":
|
|
return &readFastPathRequest{Kind: "list_traders"}
|
|
case "/strategies":
|
|
return &readFastPathRequest{Kind: "get_strategies"}
|
|
case "/models":
|
|
return &readFastPathRequest{Kind: "get_model_configs"}
|
|
case "/exchanges":
|
|
return &readFastPathRequest{Kind: "get_exchange_configs"}
|
|
case "/balance":
|
|
return &readFastPathRequest{Kind: "get_balance"}
|
|
case "/positions":
|
|
return &readFastPathRequest{Kind: "get_positions"}
|
|
case "/history", "/trades":
|
|
return &readFastPathRequest{Kind: "get_trade_history", ArgsJSON: `{"limit":10}`}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (a *Agent) tryReadFastPath(storeUserID string, userID int64, lang, text string) (string, bool) {
|
|
req := detectReadFastPath(text)
|
|
if req == nil {
|
|
return "", false
|
|
}
|
|
a.ensureHistory()
|
|
|
|
a.history.Add(userID, "user", text)
|
|
raw := a.executeReadFastPath(storeUserID, userID, req)
|
|
answer := formatReadFastPathResponse(lang, req.Kind, raw)
|
|
a.history.Add(userID, "assistant", answer)
|
|
if !isEphemeralReadFastPathKind(req.Kind) {
|
|
a.maybeUpdateTaskStateIncrementally(context.Background(), userID)
|
|
a.maybeCompressHistory(context.Background(), userID)
|
|
}
|
|
return answer, true
|
|
}
|
|
|
|
func isEphemeralReadFastPathKind(kind string) bool {
|
|
switch kind {
|
|
case "get_balance", "get_positions", "get_trade_history":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (a *Agent) executeReadFastPath(storeUserID string, _ int64, req *readFastPathRequest) string {
|
|
switch req.Kind {
|
|
case "get_balance":
|
|
return a.toolGetBalance()
|
|
case "get_positions":
|
|
return a.toolGetPositions()
|
|
case "get_trade_history":
|
|
return a.toolGetTradeHistory(req.ArgsJSON)
|
|
case "get_strategies":
|
|
return a.toolGetStrategies(storeUserID)
|
|
case "list_traders":
|
|
return a.toolListTraders(storeUserID)
|
|
case "get_model_configs":
|
|
return a.toolGetModelConfigs(storeUserID)
|
|
case "get_exchange_configs":
|
|
return a.toolGetExchangeConfigs(storeUserID)
|
|
default:
|
|
return `{"error":"unsupported fast path"}`
|
|
}
|
|
}
|
|
|
|
func formatReadFastPathResponse(lang, kind, raw string) string {
|
|
var payload map[string]any
|
|
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
|
|
return summarizeObservation(raw)
|
|
}
|
|
if errMsg, _ := payload["error"].(string); strings.TrimSpace(errMsg) != "" {
|
|
return summarizeObservation(raw)
|
|
}
|
|
|
|
switch kind {
|
|
case "get_strategies":
|
|
items, _ := payload["strategies"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前还没有策略。"
|
|
}
|
|
return "There are no strategies yet."
|
|
}
|
|
lines := []string{"Current strategies:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前策略:"
|
|
}
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
name := asString(entry["name"])
|
|
if name == "" {
|
|
name = asString(entry["id"])
|
|
}
|
|
meta := make([]string, 0, 2)
|
|
if active, _ := entry["is_active"].(bool); active {
|
|
meta = append(meta, "active")
|
|
}
|
|
if isDefault, _ := entry["is_default"].(bool); isDefault {
|
|
meta = append(meta, "default")
|
|
}
|
|
if len(meta) > 0 {
|
|
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
|
|
} else {
|
|
lines = append(lines, fmt.Sprintf("- %s", name))
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "list_traders":
|
|
items, _ := payload["traders"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前还没有交易员。"
|
|
}
|
|
return "There are no traders yet."
|
|
}
|
|
lines := []string{"Current traders:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前交易员:"
|
|
}
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
name := asString(entry["name"])
|
|
line := fmt.Sprintf("- %s", name)
|
|
meta := cleanStringList([]string{asString(entry["exchange_type"]), asString(entry["ai_model_id"])})
|
|
if len(meta) > 0 {
|
|
line += fmt.Sprintf(" (%s)", strings.Join(meta, ", "))
|
|
}
|
|
lines = append(lines, line)
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "get_model_configs":
|
|
items, _ := payload["model_configs"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前还没有模型配置。"
|
|
}
|
|
return "There are no model configs yet."
|
|
}
|
|
lines := []string{"Current model configs:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前模型配置:"
|
|
}
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
name := asString(entry["name"])
|
|
if name == "" {
|
|
name = asString(entry["provider"])
|
|
}
|
|
meta := make([]string, 0, 2)
|
|
if enabled, _ := entry["enabled"].(bool); enabled {
|
|
meta = append(meta, "enabled")
|
|
}
|
|
if model := asString(entry["custom_model_name"]); model != "" {
|
|
meta = append(meta, model)
|
|
}
|
|
if len(meta) > 0 {
|
|
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
|
|
} else {
|
|
lines = append(lines, fmt.Sprintf("- %s", name))
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "get_exchange_configs":
|
|
items, _ := payload["exchange_configs"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前还没有交易所配置。"
|
|
}
|
|
return "There are no exchange configs yet."
|
|
}
|
|
lines := []string{"Current exchange configs:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前交易所配置:"
|
|
}
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
name := asString(entry["account_name"])
|
|
if name == "" {
|
|
name = asString(entry["exchange_type"])
|
|
}
|
|
meta := cleanStringList([]string{asString(entry["exchange_type"])})
|
|
if enabled, _ := entry["enabled"].(bool); enabled {
|
|
meta = append(meta, "enabled")
|
|
}
|
|
if len(meta) > 0 {
|
|
lines = append(lines, fmt.Sprintf("- %s (%s)", name, strings.Join(meta, ", ")))
|
|
} else {
|
|
lines = append(lines, fmt.Sprintf("- %s", name))
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "get_balance":
|
|
items, _ := payload["balances"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前没有可用的余额数据。"
|
|
}
|
|
return "No balance data is available right now."
|
|
}
|
|
lines := []string{"Current balance overview:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前余额概览:"
|
|
}
|
|
var totalEquity float64
|
|
var totalAvailable float64
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
equity := toFloat(entry["total_equity"])
|
|
available := toFloat(entry["available"])
|
|
totalEquity += equity
|
|
totalAvailable += available
|
|
lines = append(lines, fmt.Sprintf("- %s (%s): equity %.4f, available %.4f",
|
|
asString(entry["name"]), asString(entry["exchange"]),
|
|
equity, available))
|
|
}
|
|
if len(items) > 1 {
|
|
if lang == "zh" {
|
|
lines = append(lines, fmt.Sprintf("汇总:equity %.4f, available %.4f", totalEquity, totalAvailable))
|
|
} else {
|
|
lines = append(lines, fmt.Sprintf("Total: equity %.4f, available %.4f", totalEquity, totalAvailable))
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "get_positions":
|
|
items, _ := payload["positions"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前没有持仓。"
|
|
}
|
|
return "There are no open positions right now."
|
|
}
|
|
lines := []string{"Current positions:"}
|
|
if lang == "zh" {
|
|
lines[0] = "当前持仓:"
|
|
}
|
|
for _, item := range items {
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
lines = append(lines, fmt.Sprintf("- %s %s size %.4f, entry %.4f, pnl %.4f",
|
|
asString(entry["symbol"]), asString(entry["side"]),
|
|
toFloat(entry["size"]), toFloat(entry["entry_price"]), toFloat(entry["unrealized_pnl"])))
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
case "get_trade_history":
|
|
items, _ := payload["trades"].([]any)
|
|
if len(items) == 0 {
|
|
if lang == "zh" {
|
|
return "当前没有已平仓交易历史。"
|
|
}
|
|
return "There is no closed trade history yet."
|
|
}
|
|
summary, _ := payload["summary"].(map[string]any)
|
|
head := fmt.Sprintf("Recent trades: %.0f total, win rate %s, total PnL %.4f",
|
|
toFloat(summary["total_trades"]), asString(summary["win_rate"]), toFloat(summary["total_pnl"]))
|
|
if lang == "zh" {
|
|
head = fmt.Sprintf("最近交易:共 %.0f 笔,胜率 %s,总 PnL %.4f",
|
|
toFloat(summary["total_trades"]), asString(summary["win_rate"]), toFloat(summary["total_pnl"]))
|
|
}
|
|
lines := []string{head}
|
|
for idx, item := range items {
|
|
if idx >= 5 {
|
|
break
|
|
}
|
|
entry, ok := item.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
lines = append(lines, fmt.Sprintf("- %s %s pnl %.4f (%s -> %s)",
|
|
asString(entry["symbol"]), asString(entry["side"]), toFloat(entry["pnl"]),
|
|
asString(entry["entry_time"]), asString(entry["exit_time"])))
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
default:
|
|
return summarizeObservation(raw)
|
|
}
|
|
}
|
|
|
|
func (a *Agent) thinkAndAct(ctx context.Context, storeUserID string, userID int64, lang, text string) (string, error) {
|
|
if answer, ok, err := a.tryStatePriorityPath(ctx, storeUserID, userID, lang, text, nil); ok || err != nil {
|
|
return answer, err
|
|
}
|
|
if answer, ok := tryInstantDirectReply(lang, text); ok {
|
|
return answer, nil
|
|
}
|
|
if answer, ok := a.tryReadFastPath(storeUserID, userID, lang, text); ok {
|
|
return answer, nil
|
|
}
|
|
if answer, ok, err := a.tryWorkflowIntent(ctx, storeUserID, userID, lang, text, nil); ok || err != nil {
|
|
return answer, err
|
|
}
|
|
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, nil); ok {
|
|
return answer, nil
|
|
}
|
|
// Check setup flow before falling back to noAI — handles "开始配置", "setup", etc.
|
|
if reply, handled := a.handleSetupFlowForStoreUser(storeUserID, userID, text, lang); handled {
|
|
return reply, nil
|
|
}
|
|
if a.aiClient == nil {
|
|
return a.noAIFallback(lang, text)
|
|
}
|
|
return a.runPlannedAgent(ctx, storeUserID, userID, lang, text, nil)
|
|
}
|
|
|
|
func (a *Agent) thinkAndActStream(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
|
|
if answer, ok, err := a.tryStatePriorityPath(ctx, storeUserID, userID, lang, text, onEvent); ok || err != nil {
|
|
return answer, err
|
|
}
|
|
if answer, ok := tryInstantDirectReply(lang, text); ok {
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, answer)
|
|
}
|
|
return answer, nil
|
|
}
|
|
if answer, ok := a.tryReadFastPath(storeUserID, userID, lang, text); ok {
|
|
if onEvent != nil {
|
|
onEvent(StreamEventTool, "read_fast_path")
|
|
onEvent(StreamEventDelta, answer)
|
|
}
|
|
return answer, nil
|
|
}
|
|
if answer, ok, err := a.tryWorkflowIntent(ctx, storeUserID, userID, lang, text, onEvent); ok || err != nil {
|
|
return answer, err
|
|
}
|
|
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent); ok {
|
|
return answer, nil
|
|
}
|
|
// Check setup flow before falling back to noAI — handles "开始配置", "setup", etc.
|
|
if reply, handled := a.handleSetupFlowForStoreUser(storeUserID, userID, text, lang); handled {
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, reply)
|
|
}
|
|
return reply, nil
|
|
}
|
|
if a.aiClient == nil {
|
|
return a.noAIFallback(lang, text)
|
|
}
|
|
return a.runPlannedAgent(ctx, storeUserID, userID, lang, text, onEvent)
|
|
}
|
|
|
|
func tryInstantDirectReply(lang, text string) (string, bool) {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return "", false
|
|
}
|
|
|
|
zhReplies := map[string]string{
|
|
"hi": "在,有什么我帮你看的?",
|
|
"hello": "在,有什么我帮你看的?",
|
|
"hey": "在,有什么我帮你看的?",
|
|
"你好": "在,有什么我帮你看的?",
|
|
"嗨": "在,有什么我帮你看的?",
|
|
"在吗": "在,有什么我帮你看的?",
|
|
"谢谢": "不客气。",
|
|
"多谢": "不客气。",
|
|
"谢了": "不客气。",
|
|
"ok": "好。",
|
|
"好的": "好。",
|
|
"收到": "好。",
|
|
}
|
|
enReplies := map[string]string{
|
|
"hi": "I'm here. What should we look at?",
|
|
"hello": "I'm here. What should we look at?",
|
|
"hey": "I'm here. What should we look at?",
|
|
"thanks": "You're welcome.",
|
|
"thank you": "You're welcome.",
|
|
"ok": "Okay.",
|
|
"okay": "Okay.",
|
|
"got it": "Got it.",
|
|
}
|
|
|
|
if lang == "zh" {
|
|
if reply, ok := zhReplies[lower]; ok {
|
|
return reply, true
|
|
}
|
|
if reply, ok := enReplies[lower]; ok {
|
|
return reply, true
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
if reply, ok := enReplies[lower]; ok {
|
|
return reply, true
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func (a *Agent) hasActiveSkillSession(userID int64) bool {
|
|
session := a.getSkillSession(userID)
|
|
return strings.TrimSpace(session.Name) != ""
|
|
}
|
|
|
|
func hasActiveExecutionState(state ExecutionState) bool {
|
|
if strings.TrimSpace(state.SessionID) == "" {
|
|
return false
|
|
}
|
|
switch strings.TrimSpace(state.Status) {
|
|
case executionStatusPlanning, executionStatusRunning, executionStatusWaitingUser:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (a *Agent) tryStatePriorityPath(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) {
|
|
if workflow := a.getWorkflowSession(userID); hasActiveWorkflowSession(workflow) {
|
|
answer, handled, err := a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, workflow, onEvent)
|
|
if handled || err != nil {
|
|
return answer, true, err
|
|
}
|
|
}
|
|
if session := a.getSkillSession(userID); strings.TrimSpace(session.Name) != "" {
|
|
switch a.classifySkillSessionInput(ctx, userID, lang, session, text) {
|
|
case "cancel":
|
|
a.clearSkillSession(userID)
|
|
a.clearWorkflowSession(userID)
|
|
if lang == "zh" {
|
|
return "已取消当前流程。", true, nil
|
|
}
|
|
return "Cancelled the current flow.", true, nil
|
|
case "interrupt":
|
|
a.clearSkillSession(userID)
|
|
default:
|
|
if answer, ok := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent); ok {
|
|
return answer, true, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
state := a.getExecutionState(userID)
|
|
if hasActiveExecutionState(state) {
|
|
switch classifyExecutionStateInput(state, text) {
|
|
case "cancel":
|
|
a.clearExecutionState(userID)
|
|
if lang == "zh" {
|
|
return "已取消当前流程。", true, nil
|
|
}
|
|
return "Cancelled the current flow.", true, nil
|
|
case "interrupt":
|
|
a.clearExecutionState(userID)
|
|
default:
|
|
answer, err := a.runPlannedAgent(ctx, storeUserID, userID, lang, text, onEvent)
|
|
return answer, true, err
|
|
}
|
|
}
|
|
|
|
return "", false, nil
|
|
}
|
|
|
|
func (a *Agent) classifySkillSessionInput(ctx context.Context, userID int64, lang string, session skillSession, text string) string {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return "continue"
|
|
}
|
|
if isYesReply(text) || isNoReply(text) {
|
|
return "continue"
|
|
}
|
|
if isExplicitFlowAbort(text) {
|
|
return "cancel"
|
|
}
|
|
if shouldContinueSkillSessionByExpectedSlot(session, text) {
|
|
return "continue"
|
|
}
|
|
if decision := a.classifySkillSessionIntentWithLLM(ctx, userID, lang, session, text); decision != "" {
|
|
return decision
|
|
}
|
|
if isNewSkillRootIntent(session, text) {
|
|
return "interrupt"
|
|
}
|
|
if isSkillFlowDeflection(session, text) {
|
|
return "interrupt"
|
|
}
|
|
if belongsToSkillDomain(session.Name, text) || !looksLikeNewTopLevelIntent(text) {
|
|
return "continue"
|
|
}
|
|
return "interrupt"
|
|
}
|
|
|
|
type skillSessionIntentDecision struct {
|
|
Decision string `json:"decision"`
|
|
}
|
|
|
|
func shouldUseLLMSkillSessionClassifier(session skillSession, text string) bool {
|
|
if strings.TrimSpace(text) == "" {
|
|
return false
|
|
}
|
|
if isExplicitFlowAbort(text) || isYesReply(text) || isNoReply(text) {
|
|
return false
|
|
}
|
|
if shouldContinueSkillSessionByExpectedSlot(session, text) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func shouldContinueSkillSessionByExpectedSlot(session skillSession, text string) bool {
|
|
text = strings.TrimSpace(text)
|
|
if text == "" {
|
|
return false
|
|
}
|
|
currentStep, ok := currentSkillDAGStep(session)
|
|
if !ok {
|
|
return false
|
|
}
|
|
switch currentStep.ID {
|
|
case "await_start_confirmation", "await_confirmation":
|
|
return isYesReply(text) || isNoReply(text)
|
|
case "resolve_config_value":
|
|
if fieldValue(session, "config_field") == "selected_timeframes" {
|
|
return timeframeTokenRE.MatchString(strings.ToLower(text))
|
|
}
|
|
return firstIntegerPattern.MatchString(text)
|
|
case "collect_enabled":
|
|
_, ok := parseEnabledValue(text)
|
|
return ok
|
|
case "collect_custom_api_url":
|
|
return extractURL(text) != ""
|
|
case "resolve_exchange_type":
|
|
return exchangeTypeFromText(text) != ""
|
|
case "resolve_provider":
|
|
return providerFromText(text) != ""
|
|
case "resolve_name", "collect_name", "collect_prompt", "collect_account_name", "collect_custom_model_name":
|
|
return !looksLikeNewTopLevelIntent(text)
|
|
}
|
|
for _, field := range currentStep.RequiredFields {
|
|
switch field {
|
|
case "config_value":
|
|
return firstIntegerPattern.MatchString(text)
|
|
case "enabled":
|
|
_, ok := parseEnabledValue(text)
|
|
return ok
|
|
case "custom_api_url":
|
|
return extractURL(text) != ""
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (a *Agent) classifySkillSessionIntentWithLLM(ctx context.Context, userID int64, lang string, session skillSession, text string) string {
|
|
if a == nil || a.aiClient == nil {
|
|
return ""
|
|
}
|
|
if !shouldUseLLMSkillSessionClassifier(session, text) {
|
|
return ""
|
|
}
|
|
currentStep, _ := currentSkillDAGStep(session)
|
|
recentConversationCtx := a.buildRecentConversationContext(userID, text)
|
|
systemPrompt := `You classify one user message while a NOFXi structured management flow is active.
|
|
Return JSON only. No markdown.
|
|
|
|
Possible decisions:
|
|
- "continue": the user is still answering the current flow
|
|
- "cancel": the user wants to stop the current flow
|
|
- "interrupt": the user changed topic, wants diagnosis/query/new task, or should leave the current flow
|
|
|
|
Be conservative:
|
|
- Prefer "continue" only when the message clearly answers the current slot/question.
|
|
- Use "cancel" for explicit abandonment like "算了", "不改了", "换话题", "别弄了".
|
|
- Use "interrupt" for diagnosis, query, new requests, or topic shifts.`
|
|
userPrompt := fmt.Sprintf(
|
|
"Language: %s\nActive skill: %s\nAction: %s\nCurrent DAG step: %s\nExpected required fields: %s\nUser message: %s\n\nRecent conversation:\n%s",
|
|
lang,
|
|
session.Name,
|
|
session.Action,
|
|
currentStep.ID,
|
|
strings.Join(currentStep.RequiredFields, ", "),
|
|
text,
|
|
recentConversationCtx,
|
|
)
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
|
|
defer cancel()
|
|
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewUserMessage(userPrompt),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
raw = strings.TrimSpace(raw)
|
|
raw = strings.TrimPrefix(raw, "```json")
|
|
raw = strings.TrimPrefix(raw, "```")
|
|
raw = strings.TrimSuffix(raw, "```")
|
|
raw = strings.TrimSpace(raw)
|
|
var decision skillSessionIntentDecision
|
|
if err := json.Unmarshal([]byte(raw), &decision); err != nil {
|
|
start := strings.Index(raw, "{")
|
|
end := strings.LastIndex(raw, "}")
|
|
if start < 0 || end <= start || json.Unmarshal([]byte(raw[start:end+1]), &decision) != nil {
|
|
return ""
|
|
}
|
|
}
|
|
switch strings.TrimSpace(decision.Decision) {
|
|
case "continue", "cancel", "interrupt":
|
|
return decision.Decision
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func isSkillFlowDeflection(session skillSession, text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
if containsAny(lower, []string{
|
|
"看下报错", "看看报错", "帮我看下报错", "帮我看看报错", "报错怎么回事", "错误怎么回事",
|
|
"换话题", "聊别的", "不是这个", "先说别的", "不聊这个",
|
|
}) {
|
|
return true
|
|
}
|
|
switch strings.TrimSpace(session.Name) {
|
|
case "exchange_management":
|
|
return detectModelDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
|
|
case "model_management":
|
|
return detectExchangeDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
|
|
case "strategy_management":
|
|
return detectExchangeDiagnosisSkill(text) || detectTraderDiagnosisSkill(text) || detectModelDiagnosisSkill(text)
|
|
case "trader_management":
|
|
return detectExchangeDiagnosisSkill(text) || detectModelDiagnosisSkill(text) || detectStrategyDiagnosisSkill(text)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func isNewSkillRootIntent(session skillSession, text string) bool {
|
|
currentSkill := strings.TrimSpace(session.Name)
|
|
currentAction := strings.TrimSpace(session.Action)
|
|
if currentSkill == "" {
|
|
return false
|
|
}
|
|
switch currentSkill {
|
|
case "trader_management":
|
|
if detectCreateTraderSkill(text) && currentAction != "create" {
|
|
return true
|
|
}
|
|
if action := normalizeAtomicSkillAction("trader_management", detectManagementAction(text, "trader")); action == "create" && currentAction != "create" {
|
|
return true
|
|
}
|
|
case "strategy_management":
|
|
if action := normalizeAtomicSkillAction("strategy_management", detectManagementAction(text, "strategy")); action == "create" && currentAction != "create" {
|
|
return true
|
|
}
|
|
case "model_management":
|
|
if action := normalizeAtomicSkillAction("model_management", detectManagementAction(text, "model")); action == "create" && currentAction != "create" {
|
|
return true
|
|
}
|
|
case "exchange_management":
|
|
if action := normalizeAtomicSkillAction("exchange_management", detectManagementAction(text, "exchange")); action == "create" && currentAction != "create" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func classifyExecutionStateInput(state ExecutionState, text string) string {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return "continue"
|
|
}
|
|
if isExplicitFlowAbort(text) {
|
|
return "cancel"
|
|
}
|
|
if isYesReply(text) || isNoReply(text) || shouldResetExecutionStateForNewAttempt(text, state) {
|
|
return "continue"
|
|
}
|
|
if state.Waiting != nil && !looksLikeNewTopLevelIntent(text) {
|
|
return "continue"
|
|
}
|
|
if looksLikeNewTopLevelIntent(text) {
|
|
return "interrupt"
|
|
}
|
|
return "continue"
|
|
}
|
|
|
|
func isExplicitFlowAbort(text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
if isCancelSkillReply(text) {
|
|
return true
|
|
}
|
|
return containsAny(lower, []string{
|
|
"算了", "先不", "不配了", "别弄了", "不搞了", "先停", "换个话题", "换话题", "聊点别的", "聊别的",
|
|
"stop this", "drop it", "never mind", "forget it", "skip this",
|
|
})
|
|
}
|
|
|
|
func belongsToSkillDomain(skillName, text string) bool {
|
|
switch strings.TrimSpace(skillName) {
|
|
case "trader_management":
|
|
return detectCreateTraderSkill(text) || detectTraderManagementIntent(text) || detectTraderDiagnosisSkill(text)
|
|
case "strategy_management":
|
|
return detectStrategyManagementIntent(text) || detectStrategyDiagnosisSkill(text)
|
|
case "model_management":
|
|
return detectModelManagementIntent(text) || detectModelDiagnosisSkill(text)
|
|
case "exchange_management":
|
|
return detectExchangeManagementIntent(text) || detectExchangeDiagnosisSkill(text)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func looksLikeNewTopLevelIntent(text string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(text))
|
|
if lower == "" {
|
|
return false
|
|
}
|
|
if strings.HasPrefix(lower, "/") {
|
|
return true
|
|
}
|
|
if detectCreateTraderSkill(text) ||
|
|
detectTraderManagementIntent(text) ||
|
|
detectExchangeManagementIntent(text) ||
|
|
detectModelManagementIntent(text) ||
|
|
detectStrategyManagementIntent(text) ||
|
|
detectTraderDiagnosisSkill(text) ||
|
|
detectExchangeDiagnosisSkill(text) ||
|
|
detectModelDiagnosisSkill(text) ||
|
|
detectStrategyDiagnosisSkill(text) {
|
|
return true
|
|
}
|
|
if detectReadFastPath(text) != nil {
|
|
return true
|
|
}
|
|
return containsAny(lower, []string{
|
|
"btc", "eth", "sol", "市场", "行情", "余额", "仓位", "持仓", "订单", "账户",
|
|
"price", "market", "balance", "position", "portfolio", "account",
|
|
})
|
|
}
|
|
|
|
func (a *Agent) tryDirectAnswer(ctx context.Context, userID int64, lang, text string, onEvent func(event, data string)) (string, bool) {
|
|
if a.aiClient == nil {
|
|
return "", false
|
|
}
|
|
|
|
text = strings.TrimSpace(text)
|
|
if text == "" {
|
|
return "", false
|
|
}
|
|
|
|
recentConversationCtx := a.buildRecentConversationContext(userID, text)
|
|
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
|
|
executionState := normalizeExecutionState(a.getExecutionState(userID))
|
|
executionJSON, _ := json.Marshal(executionState)
|
|
systemPrompt := `You are the first-pass router for NOFXi.
|
|
Decide whether the assistant can answer the user's message directly without using skills, tools, or planning.
|
|
Return JSON only. Do not return markdown.
|
|
|
|
Use "direct_answer" only when a concise, self-contained answer is sufficient.
|
|
Examples that often fit direct_answer:
|
|
- greetings, thanks, small talk
|
|
- concept explanations
|
|
- open-ended advice that does not require current system state
|
|
- trading education or opinion questions that can be answered from general reasoning
|
|
|
|
Use "defer" when the message likely needs:
|
|
- a management or diagnosis skill
|
|
- tool reads
|
|
- multi-step planning
|
|
- continuation of an active execution flow that needs stateful follow-up
|
|
|
|
Rules:
|
|
- Consider Recent conversation, Task state, and Execution state JSON before deciding.
|
|
- Default to direct_answer for greetings, thanks, identity questions, and other lightweight conversational turns unless there is a clearly unfinished operational flow that the user is continuing.
|
|
- If the user is clearly continuing an unfinished operational flow, choose defer.
|
|
- If you choose direct_answer, provide the final user-facing answer in the same language as the user.
|
|
- Prefer defer when uncertain.
|
|
|
|
Return JSON with this exact shape:
|
|
{"action":"direct_answer|defer","answer":""}`
|
|
userPrompt := fmt.Sprintf("Language: %s\nUser message: %s\n\nRecent conversation:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s", lang, text, recentConversationCtx, taskStateCtx, string(executionJSON))
|
|
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
|
|
defer cancel()
|
|
|
|
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewUserMessage(userPrompt),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
if err != nil {
|
|
return "", false
|
|
}
|
|
|
|
decision, err := parseDirectReplyDecision(raw)
|
|
if err != nil {
|
|
return "", false
|
|
}
|
|
if decision.Action != "direct_answer" {
|
|
return "", false
|
|
}
|
|
|
|
answer := strings.TrimSpace(decision.Answer)
|
|
if answer == "" {
|
|
return "", false
|
|
}
|
|
|
|
a.ensureHistory()
|
|
a.history.Add(userID, "user", text)
|
|
a.history.Add(userID, "assistant", answer)
|
|
a.maybeUpdateTaskStateIncrementally(ctx, userID)
|
|
a.maybeCompressHistory(ctx, userID)
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, answer)
|
|
}
|
|
return answer, true
|
|
}
|
|
|
|
func parseDirectReplyDecision(raw string) (directReplyDecision, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
raw = strings.TrimPrefix(raw, "```json")
|
|
raw = strings.TrimPrefix(raw, "```")
|
|
raw = strings.TrimSuffix(raw, "```")
|
|
raw = strings.TrimSpace(raw)
|
|
|
|
var decision directReplyDecision
|
|
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
|
|
return normalizeDirectReplyDecision(decision), nil
|
|
}
|
|
start := strings.Index(raw, "{")
|
|
end := strings.LastIndex(raw, "}")
|
|
if start >= 0 && end > start {
|
|
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
|
|
return normalizeDirectReplyDecision(decision), nil
|
|
}
|
|
}
|
|
return directReplyDecision{}, fmt.Errorf("invalid direct reply decision json")
|
|
}
|
|
|
|
func normalizeDirectReplyDecision(decision directReplyDecision) directReplyDecision {
|
|
decision.Action = strings.TrimSpace(strings.ToLower(decision.Action))
|
|
decision.Answer = strings.TrimSpace(decision.Answer)
|
|
return decision
|
|
}
|
|
|
|
func (a *Agent) runPlannedAgent(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
|
|
a.ensureHistory()
|
|
a.history.Add(userID, "user", text)
|
|
if onEvent != nil {
|
|
onEvent(StreamEventPlanning, a.planningStatusText(lang))
|
|
}
|
|
|
|
requestStartedAt := time.Now()
|
|
state, err := a.prepareExecutionState(ctx, storeUserID, userID, lang, text)
|
|
if err != nil {
|
|
a.logPlannerTiming("", userID, "prepare_execution_state", requestStartedAt, err)
|
|
if isPlannerTimeoutError(err) {
|
|
msg := plannerTimeoutMessage(lang)
|
|
if onEvent != nil {
|
|
onEvent(StreamEventError, msg)
|
|
onEvent(StreamEventDelta, msg)
|
|
}
|
|
return msg, nil
|
|
}
|
|
a.logger.Warn("planner failed, falling back to legacy loop", "error", err, "user_id", userID)
|
|
return a.thinkAndActLegacy(ctx, userID, lang, text, onEvent)
|
|
}
|
|
a.logPlannerTiming(state.SessionID, userID, "prepare_execution_state", requestStartedAt, nil)
|
|
|
|
executionStartedAt := time.Now()
|
|
answer, err := a.executePlan(ctx, storeUserID, userID, lang, &state, onEvent)
|
|
a.logPlannerTiming(state.SessionID, userID, "execute_plan", executionStartedAt, err)
|
|
if err != nil {
|
|
if isPlannerTimeoutError(err) {
|
|
msg := plannerTimeoutMessage(lang)
|
|
if onEvent != nil {
|
|
onEvent(StreamEventError, msg)
|
|
onEvent(StreamEventDelta, msg)
|
|
}
|
|
return msg, nil
|
|
}
|
|
a.logger.Warn("plan execution failed, falling back to legacy loop", "error", err, "user_id", userID)
|
|
return a.thinkAndActLegacy(ctx, userID, lang, text, onEvent)
|
|
}
|
|
|
|
a.history.Add(userID, "assistant", answer)
|
|
a.maybeUpdateTaskStateIncrementally(ctx, userID)
|
|
a.maybeCompressHistory(ctx, userID)
|
|
a.logPlannerTiming(state.SessionID, userID, "run_planned_agent_total", requestStartedAt, nil)
|
|
return answer, nil
|
|
}
|
|
|
|
func (a *Agent) prepareExecutionState(ctx context.Context, storeUserID string, userID int64, lang, text string) (ExecutionState, error) {
|
|
existing := a.getExecutionState(userID)
|
|
if shouldResetExecutionStateForNewAttempt(text, existing) {
|
|
a.clearExecutionState(userID)
|
|
existing = ExecutionState{}
|
|
}
|
|
if existing.Status == executionStatusWaitingUser && existing.SessionID != "" {
|
|
a.refreshCurrentReferencesForUserText(storeUserID, text, &existing)
|
|
askedQuestion := latestAskedQuestion(existing)
|
|
replySummary := strings.TrimSpace(text)
|
|
if askedQuestion != "" {
|
|
replySummary = fmt.Sprintf("Answer to previous question [%s]: %s", askedQuestion, replySummary)
|
|
}
|
|
appendExecutionLog(&existing, Observation{
|
|
Kind: "user_reply",
|
|
Summary: replySummary,
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
existing.Status = executionStatusPlanning
|
|
existing.Waiting = nil
|
|
existing.FinalAnswer = ""
|
|
existing.LastError = ""
|
|
existing = a.refreshStateForDynamicRequests(storeUserID, text, existing)
|
|
existing.Steps = completedSteps(existing.Steps)
|
|
existing.CurrentStepID = ""
|
|
existing.Status = executionStatusRunning
|
|
existing.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(existing); err != nil {
|
|
return ExecutionState{}, err
|
|
}
|
|
return existing, nil
|
|
}
|
|
|
|
state := newExecutionState(userID, text)
|
|
a.refreshCurrentReferencesForUserText(storeUserID, text, &state)
|
|
state = a.refreshStateForDynamicRequests(storeUserID, text, state)
|
|
state.Status = executionStatusRunning
|
|
if err := a.saveExecutionState(state); err != nil {
|
|
return ExecutionState{}, err
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
type nextStepDecision struct {
|
|
Goal string `json:"goal"`
|
|
Steps []PlanStep `json:"steps,omitempty"`
|
|
Step PlanStep `json:"step"`
|
|
}
|
|
|
|
func (a *Agent) decideNextStep(ctx context.Context, userID int64, lang string, state ExecutionState) (nextStepDecision, error) {
|
|
toolDefs, _ := json.Marshal(agentTools())
|
|
stateJSON, _ := json.Marshal(normalizeExecutionState(state))
|
|
obsJSON, _ := json.Marshal(buildObservationContext(state))
|
|
recentlyFetchedJSON, _ := json.Marshal(buildRecentlyFetchedData(state, time.Now().UTC()))
|
|
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
|
|
recentConversationCtx := a.buildRecentConversationContext(userID, state.Goal)
|
|
|
|
systemPrompt := `You are the step selector for NOFXi.
|
|
Return JSON only. Do not return markdown.
|
|
|
|
You are operating in ReAct mode: Thought -> Action -> Observation.
|
|
Choose the immediate next action batch. Do not generate a long multi-step execution plan.
|
|
|
|
Allowed step types:
|
|
- tool
|
|
- reason
|
|
- ask_user
|
|
- respond
|
|
|
|
Rules:
|
|
- Use all available memory layers: Execution state JSON, Observations JSON, Recent conversation, and Task state.
|
|
- Use Recently fetched data JSON as the deduplication source of truth for fresh tool results.
|
|
- Prefer the freshest evidence in this order: execution state, observations, recent conversation, then task state.
|
|
- If fresh external or system data is needed, choose a tool step.
|
|
- If the user is blocked on a missing parameter, choose ask_user.
|
|
- If there is enough information to answer now, choose respond.
|
|
- Use reason only when a short intermediate synthesis is necessary before the next action.
|
|
- Prefer tool or respond over reason whenever possible.
|
|
- Never emit the same reason step twice in a row.
|
|
- After a reason step, the next batch should usually be tool, ask_user, or respond. Do not stay in analysis loops.
|
|
- Never invent tools.
|
|
- If the task needs multiple independent tool reads, emit ALL of them together in one response.
|
|
- Parallelism rule: when multiple tool reads are mutually independent, do not split them across turns. Return them together in steps.
|
|
- Never mix ask_user/respond with additional steps in the same batch.
|
|
- Only emit multiple steps when every emitted step is a tool step.
|
|
- Avoid repeated tool calls. If a matching tool call already exists in Recently fetched data and age_seconds <= 60, do not call it again unless the user explicitly asks to refresh.
|
|
- For tool steps, set tool_name exactly to one available tool and provide tool_args as a JSON object.
|
|
- For ask_user or respond steps, put the user-facing question/response instruction in instruction.
|
|
- If the latest observation already answers the goal, prefer respond over another tool call.
|
|
- Never place a trade unless the user intent is explicit.
|
|
|
|
Return JSON with this exact shape:
|
|
{"goal":"","steps":[{"id":"step_1","type":"tool|reason|ask_user|respond","title":"","tool_name":"","tool_args":{},"instruction":"","requires_confirmation":false}]}`
|
|
|
|
userPrompt := fmt.Sprintf("Language: %s\nGoal: %s\n\nRecent conversation:\n%s\n\nAvailable tools JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s\n\nObservations JSON:\n%s\n\nRecently fetched data JSON:\n%s", lang, state.Goal, recentConversationCtx, string(toolDefs), a.buildPersistentPreferencesContext(userID), taskStateCtx, string(stateJSON), string(obsJSON), string(recentlyFetchedJSON))
|
|
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerCreateTimeout)
|
|
defer cancel()
|
|
|
|
startedAt := time.Now()
|
|
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewUserMessage(userPrompt),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
a.logPlannerTiming(state.SessionID, userID, "decide_next_step_llm", startedAt, err)
|
|
if err != nil {
|
|
return nextStepDecision{}, err
|
|
}
|
|
return parseNextStepDecisionJSON(raw)
|
|
}
|
|
|
|
func parseNextStepDecisionJSON(raw string) (nextStepDecision, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
raw = strings.TrimPrefix(raw, "```json")
|
|
raw = strings.TrimPrefix(raw, "```")
|
|
raw = strings.TrimSuffix(raw, "```")
|
|
raw = strings.TrimSpace(raw)
|
|
|
|
var decision nextStepDecision
|
|
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
|
|
return normalizeNextStepDecision(decision), nil
|
|
}
|
|
start := strings.Index(raw, "{")
|
|
end := strings.LastIndex(raw, "}")
|
|
if start >= 0 && end > start {
|
|
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
|
|
return normalizeNextStepDecision(decision), nil
|
|
}
|
|
}
|
|
return nextStepDecision{}, fmt.Errorf("invalid next step decision json")
|
|
}
|
|
|
|
func normalizeNextStepDecision(decision nextStepDecision) nextStepDecision {
|
|
decision.Goal = strings.TrimSpace(decision.Goal)
|
|
steps := decision.Steps
|
|
if len(steps) == 0 && decision.Step.Type != "" {
|
|
steps = []PlanStep{decision.Step}
|
|
}
|
|
if len(steps) > 0 {
|
|
steps = normalizeExecutionState(ExecutionState{Steps: steps}).Steps
|
|
}
|
|
decision.Steps = steps
|
|
if len(steps) > 0 {
|
|
decision.Step = steps[0]
|
|
}
|
|
return decision
|
|
}
|
|
|
|
func (a *Agent) refreshStateForDynamicRequests(storeUserID, userText string, state ExecutionState) ExecutionState {
|
|
kinds := snapshotKindsForIntent(userText)
|
|
if len(kinds) == 0 {
|
|
return state
|
|
}
|
|
kindsToRefresh := make(map[string]struct{}, len(kinds))
|
|
for _, kind := range kinds {
|
|
kindsToRefresh[kind] = struct{}{}
|
|
}
|
|
|
|
fresh := make([]Observation, 0, len(state.DynamicSnapshots)+3)
|
|
for _, obs := range state.DynamicSnapshots {
|
|
if _, ok := kindsToRefresh[obs.Kind]; ok {
|
|
continue
|
|
}
|
|
fresh = append(fresh, obs)
|
|
}
|
|
|
|
appendSnapshot := func(kind, raw string) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return
|
|
}
|
|
fresh = append(fresh, Observation{
|
|
Kind: kind,
|
|
Summary: summarizeObservation(raw),
|
|
RawJSON: raw,
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
for _, kind := range kinds {
|
|
switch kind {
|
|
case "current_model_configs":
|
|
appendSnapshot(kind, a.toolGetModelConfigs(storeUserID))
|
|
case "current_exchange_configs":
|
|
appendSnapshot(kind, a.toolGetExchangeConfigs(storeUserID))
|
|
case "current_traders":
|
|
appendSnapshot(kind, a.toolListTraders(storeUserID))
|
|
case "current_strategies":
|
|
appendSnapshot(kind, a.toolGetStrategies(storeUserID))
|
|
case "current_balances":
|
|
appendSnapshot(kind, a.toolGetBalance())
|
|
case "current_positions":
|
|
appendSnapshot(kind, a.toolGetPositions())
|
|
case "recent_trade_history":
|
|
appendSnapshot(kind, a.toolGetTradeHistory(`{"limit":10}`))
|
|
}
|
|
}
|
|
state.DynamicSnapshots = fresh
|
|
return state
|
|
}
|
|
|
|
func (a *Agent) buildRecentConversationContext(userID int64, currentUserText string) string {
|
|
if a.history == nil {
|
|
return ""
|
|
}
|
|
|
|
msgs := a.history.Get(userID)
|
|
if len(msgs) == 0 {
|
|
return ""
|
|
}
|
|
|
|
currentUserText = strings.TrimSpace(currentUserText)
|
|
if currentUserText != "" {
|
|
last := msgs[len(msgs)-1]
|
|
if last.Role == "user" && strings.TrimSpace(last.Content) == currentUserText {
|
|
msgs = msgs[:len(msgs)-1]
|
|
}
|
|
}
|
|
|
|
if len(msgs) == 0 {
|
|
return ""
|
|
}
|
|
if len(msgs) > recentConversationMessages {
|
|
msgs = msgs[len(msgs)-recentConversationMessages:]
|
|
}
|
|
|
|
transcript := formatChatMessagesForSummary(msgs)
|
|
if transcript == "" {
|
|
return ""
|
|
}
|
|
return transcript
|
|
}
|
|
|
|
func (a *Agent) createExecutionPlan(ctx context.Context, userID int64, lang, userText string, state ExecutionState) (executionPlan, error) {
|
|
toolDefs, _ := json.Marshal(agentTools())
|
|
stateJSON, _ := json.Marshal(normalizeExecutionState(state))
|
|
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
|
|
recentConversationCtx := a.buildRecentConversationContext(userID, userText)
|
|
if isConfigOrTraderIntent(userText) {
|
|
// Configuration and trader setup requests are especially sensitive to stale
|
|
// summaries like "this capability does not exist". Prefer fresh tool checks.
|
|
taskStateCtx = ""
|
|
}
|
|
|
|
systemPrompt := `You are the planning module for NOFXi.
|
|
Return JSON only. Do not return markdown.
|
|
|
|
Create a minimal safe execution plan using these step types only:
|
|
- tool
|
|
- reason
|
|
- ask_user
|
|
- respond
|
|
|
|
Rules:
|
|
- Use all available memory layers when planning: Execution state JSON, Recent conversation, and Task state.
|
|
- Memory priority order:
|
|
1. Execution state JSON = current operational truth for the active task.
|
|
2. Recent conversation = the best source for what was said in the last few turns.
|
|
3. Task state = compressed durable background only.
|
|
- If these memory layers conflict, prefer execution state first, then recent conversation. Do not let task state override fresher evidence.
|
|
- Do not ask the user to repeat a fact that is already explicit in execution state or recent conversation unless the inputs are contradictory.
|
|
- Use tool steps whenever fresh external data is required.
|
|
- Use ask_user if required parameters are missing.
|
|
- Never place a trade unless the user intent is explicit.
|
|
- For exchange binding or exchange credential requests, prefer get_exchange_configs/manage_exchange_config.
|
|
- For AI model binding or model credential requests, prefer get_model_configs/manage_model_config.
|
|
- For strategy template creation or editing requests, prefer get_strategies/manage_strategy.
|
|
- For trader creation or trader lifecycle requests, prefer manage_trader.
|
|
- A strategy template is independent and does not require exchange/model bindings unless the user explicitly asks to run or deploy it through a trader.
|
|
- If these tools exist, never answer that the system lacks exchange/model/trader management capability.
|
|
- When configuration, strategy, or trader creation is requested, gather missing required fields via ask_user, then call the appropriate tool.
|
|
- Before concluding that exchange/model/trader/strategy setup is impossible or missing, first inspect current state with the relevant tools.
|
|
- For high-volatility state such as balances, positions, recent trade history, or current config availability, prefer fresh tool reads over old observations.
|
|
- Keep the plan short and practical.
|
|
- End with either ask_user or respond.
|
|
- At most 8 steps.
|
|
- For tool steps, set tool_name exactly to one of the available tool names and provide tool_args as JSON object.
|
|
- For reason steps, put the reasoning task in instruction.
|
|
- For ask_user steps, put the exact follow-up question in instruction.
|
|
- For respond steps, put either a short instruction or leave instruction empty.
|
|
- If resuming after a waiting_user state, incorporate the new user reply and return a fresh full plan.
|
|
- Never invent tools.`
|
|
|
|
resumeContext := ""
|
|
if state.SessionID != "" {
|
|
if askedQuestion := latestAskedQuestion(state); askedQuestion != "" {
|
|
resumeContext = fmt.Sprintf("\n\nResume context:\n- The assistant was waiting for the user's answer to this exact question: %s\n- Interpret the new user message as the answer to that question unless the message clearly starts a new topic.", askedQuestion)
|
|
if state.Waiting != nil {
|
|
waitingJSON, _ := json.Marshal(state.Waiting)
|
|
resumeContext += fmt.Sprintf("\n- Structured waiting state JSON: %s", string(waitingJSON))
|
|
}
|
|
}
|
|
}
|
|
|
|
userPrompt := fmt.Sprintf("Language: %s\nUser request: %s%s\n\nRecent conversation:\n%s\n\nAvailable tools JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nExecution state JSON:\n%s\n\nReturn JSON with this exact shape:\n{\"goal\":\"\",\"steps\":[{\"id\":\"step_1\",\"type\":\"tool|reason|ask_user|respond\",\"title\":\"\",\"tool_name\":\"\",\"tool_args\":{},\"instruction\":\"\",\"requires_confirmation\":false}]}", lang, userText, resumeContext, recentConversationCtx, string(toolDefs), a.buildPersistentPreferencesContext(userID), taskStateCtx, string(stateJSON))
|
|
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerCreateTimeout)
|
|
defer cancel()
|
|
|
|
startedAt := time.Now()
|
|
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewUserMessage(userPrompt),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
a.logPlannerTiming(state.SessionID, userID, "create_execution_plan_llm", startedAt, err)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
plan, err := parseExecutionPlanJSON(resp)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
if len(plan.Steps) == 0 {
|
|
return executionPlan{}, fmt.Errorf("empty execution plan")
|
|
}
|
|
if len(plan.Steps) > plannerMaxSteps {
|
|
plan.Steps = plan.Steps[:plannerMaxSteps]
|
|
}
|
|
for i := range plan.Steps {
|
|
if plan.Steps[i].ID == "" {
|
|
plan.Steps[i].ID = fmt.Sprintf("step_%d", i+1)
|
|
}
|
|
if plan.Steps[i].Status == "" {
|
|
plan.Steps[i].Status = planStepStatusPending
|
|
}
|
|
if plan.Steps[i].Title == "" {
|
|
plan.Steps[i].Title = strings.ReplaceAll(plan.Steps[i].ID, "_", " ")
|
|
}
|
|
}
|
|
if strings.TrimSpace(plan.Goal) == "" {
|
|
plan.Goal = strings.TrimSpace(userText)
|
|
}
|
|
return plan, nil
|
|
}
|
|
|
|
func parseExecutionPlanJSON(raw string) (executionPlan, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
raw = strings.TrimPrefix(raw, "```json")
|
|
raw = strings.TrimPrefix(raw, "```")
|
|
raw = strings.TrimSuffix(raw, "```")
|
|
raw = strings.TrimSpace(raw)
|
|
|
|
var plan executionPlan
|
|
if err := json.Unmarshal([]byte(raw), &plan); err == nil {
|
|
return plan, nil
|
|
}
|
|
start := strings.Index(raw, "{")
|
|
end := strings.LastIndex(raw, "}")
|
|
if start >= 0 && end > start {
|
|
if err := json.Unmarshal([]byte(raw[start:end+1]), &plan); err == nil {
|
|
return plan, nil
|
|
}
|
|
}
|
|
return executionPlan{}, fmt.Errorf("invalid execution plan json")
|
|
}
|
|
|
|
func (a *Agent) executePlan(ctx context.Context, storeUserID string, userID int64, lang string, state *ExecutionState, onEvent func(event, data string)) (string, error) {
|
|
if onEvent != nil && len(state.Steps) > 0 {
|
|
onEvent(StreamEventPlan, formatPlanStatus(*state, lang))
|
|
}
|
|
|
|
for i := 0; i < plannerMaxIterations; i++ {
|
|
stepIndex := nextPendingStepIndex(state.Steps)
|
|
if stepIndex < 0 {
|
|
decisionStartedAt := time.Now()
|
|
decision, err := a.decideNextStep(ctx, userID, lang, *state)
|
|
a.logPlannerTiming(state.SessionID, userID, "decide_next_step", decisionStartedAt, err)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
steps := filterFreshDuplicateToolSteps(decision.Steps, *state, time.Now().UTC())
|
|
if len(steps) == 0 {
|
|
appendExecutionLog(state, Observation{
|
|
Kind: "decision_note",
|
|
Summary: "Skipped duplicate fresh tool calls from next-step decision",
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
continue
|
|
}
|
|
if hasRepeatedReasonLoop(*state, steps) {
|
|
return "", fmt.Errorf("repeated reasoning loop detected")
|
|
}
|
|
if decision.Goal != "" {
|
|
state.Goal = decision.Goal
|
|
}
|
|
base := len(completedSteps(state.Steps))
|
|
for idx := range steps {
|
|
if steps[idx].Type == "" {
|
|
return "", fmt.Errorf("next step decision missing step type")
|
|
}
|
|
if steps[idx].ID == "" {
|
|
steps[idx].ID = fmt.Sprintf("step_%d", base+idx+1)
|
|
}
|
|
if steps[idx].Title == "" {
|
|
steps[idx].Title = strings.ReplaceAll(steps[idx].ID, "_", " ")
|
|
}
|
|
if steps[idx].Status == "" {
|
|
steps[idx].Status = planStepStatusPending
|
|
}
|
|
}
|
|
state.Steps = append(completedSteps(state.Steps), steps...)
|
|
state.Status = executionStatusRunning
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventPlan, formatPlanStatus(*state, lang))
|
|
}
|
|
continue
|
|
}
|
|
|
|
step := &state.Steps[stepIndex]
|
|
step.Status = planStepStatusRunning
|
|
state.Status = executionStatusRunning
|
|
state.CurrentStepID = step.ID
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if onEvent != nil {
|
|
onEvent(StreamEventStepStart, formatStepStatus(*step, stepIndex, len(state.Steps), lang))
|
|
}
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
switch step.Type {
|
|
case planStepTypeTool:
|
|
if onEvent != nil {
|
|
onEvent(StreamEventTool, step.ToolName)
|
|
}
|
|
stepStartedAt := time.Now()
|
|
result := a.executePlanTool(ctx, storeUserID, userID, lang, *step)
|
|
a.logPlannerTiming(state.SessionID, userID, "tool:"+step.ToolName, stepStartedAt, nil)
|
|
summary := summarizeObservation(result)
|
|
referencesChanged := false
|
|
step.Status = planStepStatusCompleted
|
|
step.OutputSummary = summary
|
|
appendExecutionLog(state, Observation{
|
|
StepID: step.ID,
|
|
Kind: "tool_result",
|
|
Summary: summary,
|
|
RawJSON: result,
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
referencesChanged = updateCurrentReferencesFromToolResult(state, step.ToolName, result)
|
|
if referencesChanged {
|
|
a.log().Info("tool step updated references", "tool", step.ToolName, "session", state.SessionID)
|
|
}
|
|
case planStepTypeReason:
|
|
reasonStartedAt := time.Now()
|
|
reasoning, err := a.executeReasonStep(ctx, userID, lang, state.Goal, *state, *step)
|
|
a.logPlannerTiming(state.SessionID, userID, "reason_step", reasonStartedAt, err)
|
|
if err != nil {
|
|
step.Status = planStepStatusFailed
|
|
step.Error = err.Error()
|
|
state.Status = executionStatusFailed
|
|
state.LastError = err.Error()
|
|
if saveErr := a.saveExecutionState(*state); saveErr != nil {
|
|
a.log().Warn("failed to save execution state after reason step error", "error", saveErr)
|
|
}
|
|
return "", err
|
|
}
|
|
step.Status = planStepStatusCompleted
|
|
step.OutputSummary = reasoning
|
|
appendExecutionLog(state, Observation{
|
|
StepID: step.ID,
|
|
Kind: "reasoning",
|
|
Summary: reasoning,
|
|
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
case planStepTypeAskUser:
|
|
question := strings.TrimSpace(step.Instruction)
|
|
if question == "" {
|
|
if lang == "zh" {
|
|
question = "我还缺少一些信息,麻烦你补充一下。"
|
|
} else {
|
|
question = "I need a bit more information before I continue."
|
|
}
|
|
}
|
|
step.Status = planStepStatusCompleted
|
|
step.OutputSummary = question
|
|
state.Status = executionStatusWaitingUser
|
|
state.Waiting = buildWaitingState(*state, *step, question)
|
|
state.FinalAnswer = question
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
|
|
onEvent(StreamEventDelta, question)
|
|
}
|
|
return question, nil
|
|
case planStepTypeRespond:
|
|
respondStartedAt := time.Now()
|
|
finalText, err := a.generateFinalPlanResponse(ctx, userID, lang, *state, step.Instruction)
|
|
a.logPlannerTiming(state.SessionID, userID, "respond_step", respondStartedAt, err)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
step.Status = planStepStatusCompleted
|
|
step.OutputSummary = finalText
|
|
state.Status = executionStatusCompleted
|
|
state.Waiting = nil
|
|
state.FinalAnswer = finalText
|
|
state.CurrentStepID = ""
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
|
|
onEvent(StreamEventDelta, finalText)
|
|
}
|
|
return finalText, nil
|
|
default:
|
|
return "", fmt.Errorf("unsupported step type: %s", step.Type)
|
|
}
|
|
|
|
state.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
|
if err := a.saveExecutionState(*state); err != nil {
|
|
return "", err
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventStepComplete, formatStepCompleteStatus(*step, lang))
|
|
}
|
|
}
|
|
|
|
return "", fmt.Errorf("plan execution exceeded iteration limit")
|
|
}
|
|
|
|
type fetchedToolRecord struct {
|
|
ToolName string `json:"tool_name"`
|
|
ToolArgsJSON string `json:"tool_args_json"`
|
|
FetchedAt string `json:"fetched_at"`
|
|
AgeSeconds int64 `json:"age_seconds"`
|
|
}
|
|
|
|
func buildRecentlyFetchedData(state ExecutionState, now time.Time) []fetchedToolRecord {
|
|
state = normalizeExecutionState(state)
|
|
stepByID := make(map[string]PlanStep, len(state.Steps))
|
|
for _, step := range state.Steps {
|
|
stepByID[step.ID] = step
|
|
}
|
|
latest := map[string]fetchedToolRecord{}
|
|
for _, obs := range state.ExecutionLog {
|
|
if obs.Kind != "tool_result" {
|
|
continue
|
|
}
|
|
step, ok := stepByID[obs.StepID]
|
|
if !ok || step.ToolName == "" {
|
|
continue
|
|
}
|
|
sig := toolCallSignature(step.ToolName, step.ToolArgs)
|
|
createdAt := parseRFC3339(obs.CreatedAt)
|
|
record := fetchedToolRecord{
|
|
ToolName: step.ToolName,
|
|
ToolArgsJSON: toolArgsJSONString(step.ToolArgs),
|
|
FetchedAt: obs.CreatedAt,
|
|
AgeSeconds: int64(now.Sub(createdAt).Seconds()),
|
|
}
|
|
prev, exists := latest[sig]
|
|
if !exists || prev.FetchedAt < record.FetchedAt {
|
|
latest[sig] = record
|
|
}
|
|
}
|
|
out := make([]fetchedToolRecord, 0, len(latest))
|
|
for _, record := range latest {
|
|
if record.AgeSeconds < 0 {
|
|
record.AgeSeconds = 0
|
|
}
|
|
out = append(out, record)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func filterFreshDuplicateToolSteps(steps []PlanStep, state ExecutionState, now time.Time) []PlanStep {
|
|
if len(steps) == 0 {
|
|
return nil
|
|
}
|
|
fresh := make(map[string]struct{})
|
|
for _, item := range buildRecentlyFetchedData(state, now) {
|
|
if item.AgeSeconds <= 60 {
|
|
fresh[item.ToolName+"|"+item.ToolArgsJSON] = struct{}{}
|
|
}
|
|
}
|
|
out := make([]PlanStep, 0, len(steps))
|
|
for _, step := range steps {
|
|
if step.Type != planStepTypeTool {
|
|
out = append(out, step)
|
|
continue
|
|
}
|
|
sig := toolCallSignature(step.ToolName, step.ToolArgs)
|
|
if _, ok := fresh[sig]; ok {
|
|
continue
|
|
}
|
|
fresh[sig] = struct{}{}
|
|
out = append(out, step)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func hasRepeatedReasonLoop(state ExecutionState, steps []PlanStep) bool {
|
|
if len(steps) == 0 {
|
|
return false
|
|
}
|
|
last := lastCompletedStep(state.Steps)
|
|
if last == nil || last.Type != planStepTypeReason {
|
|
return false
|
|
}
|
|
for _, step := range steps {
|
|
if step.Type != planStepTypeReason {
|
|
return false
|
|
}
|
|
if stepSemanticKey(*last) != stepSemanticKey(step) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func lastCompletedStep(steps []PlanStep) *PlanStep {
|
|
for i := len(steps) - 1; i >= 0; i-- {
|
|
if steps[i].Status == planStepStatusCompleted {
|
|
return &steps[i]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func stepSemanticKey(step PlanStep) string {
|
|
return strings.ToLower(strings.TrimSpace(
|
|
step.Type + "|" + step.ToolName + "|" + step.Title + "|" + step.Instruction,
|
|
))
|
|
}
|
|
|
|
func toolCallSignature(toolName string, args map[string]any) string {
|
|
return strings.TrimSpace(toolName) + "|" + toolArgsJSONString(args)
|
|
}
|
|
|
|
func toolArgsJSONString(args map[string]any) string {
|
|
if len(args) == 0 {
|
|
return "{}"
|
|
}
|
|
data, err := json.Marshal(args)
|
|
if err != nil {
|
|
return "{}"
|
|
}
|
|
return string(data)
|
|
}
|
|
|
|
func parseRFC3339(value string) time.Time {
|
|
t, err := time.Parse(time.RFC3339, strings.TrimSpace(value))
|
|
if err != nil {
|
|
return time.Time{}
|
|
}
|
|
return t
|
|
}
|
|
|
|
func (a *Agent) replanAfterStep(ctx context.Context, userID int64, lang string, state ExecutionState, completedStep PlanStep) (replannerDecision, error) {
|
|
obsJSON, _ := json.Marshal(buildObservationContext(state))
|
|
stepsJSON, _ := json.Marshal(state.Steps)
|
|
systemPrompt := `You are the replanning module for NOFXi.
|
|
Return JSON only.
|
|
|
|
Decide what to do after a plan step completed.
|
|
Allowed actions:
|
|
- continue
|
|
- replace_remaining
|
|
- ask_user
|
|
- finish
|
|
|
|
Rules:
|
|
- Use continue when the current remaining steps still make sense.
|
|
- Use replace_remaining when the observations materially change the remaining plan.
|
|
- Use ask_user when execution is blocked on missing user input.
|
|
- Use finish when there is enough information to answer and remaining steps are unnecessary.
|
|
- If action=replace_remaining, return a fresh list of remaining steps only.
|
|
- Keep plans short and safe.
|
|
- Never invent tools.`
|
|
|
|
userPrompt := fmt.Sprintf("Language: %s\nGoal: %s\nCompleted step: %s (%s)\nCompleted summary: %s\n\nCurrent steps JSON:\n%s\n\nObservations JSON:\n%s\n\nPersistent preferences:\n%s\n\nTask state:\n%s\n\nReturn JSON with this exact shape:\n{\"action\":\"continue|replace_remaining|ask_user|finish\",\"goal\":\"\",\"instruction\":\"\",\"question\":\"\",\"steps\":[{\"id\":\"step_x\",\"type\":\"tool|reason|ask_user|respond\",\"title\":\"\",\"tool_name\":\"\",\"tool_args\":{},\"instruction\":\"\",\"requires_confirmation\":false}]}", lang, state.Goal, completedStep.ID, completedStep.Type, completedStep.OutputSummary, string(stepsJSON), string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))
|
|
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerReplanTimeout)
|
|
defer cancel()
|
|
|
|
startedAt := time.Now()
|
|
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewUserMessage(userPrompt),
|
|
},
|
|
Ctx: stageCtx,
|
|
MaxTokens: intPtr(500),
|
|
})
|
|
a.logPlannerTiming(state.SessionID, userID, "replan_after_step_llm", startedAt, err)
|
|
if err != nil {
|
|
return replannerDecision{}, err
|
|
}
|
|
return parseReplannerDecisionJSON(raw)
|
|
}
|
|
|
|
func parseReplannerDecisionJSON(raw string) (replannerDecision, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
raw = strings.TrimPrefix(raw, "```json")
|
|
raw = strings.TrimPrefix(raw, "```")
|
|
raw = strings.TrimSuffix(raw, "```")
|
|
raw = strings.TrimSpace(raw)
|
|
|
|
var decision replannerDecision
|
|
if err := json.Unmarshal([]byte(raw), &decision); err == nil {
|
|
return normalizeReplannerDecision(decision), nil
|
|
}
|
|
start := strings.Index(raw, "{")
|
|
end := strings.LastIndex(raw, "}")
|
|
if start >= 0 && end > start {
|
|
if err := json.Unmarshal([]byte(raw[start:end+1]), &decision); err == nil {
|
|
return normalizeReplannerDecision(decision), nil
|
|
}
|
|
}
|
|
return replannerDecision{}, fmt.Errorf("invalid replanner decision json")
|
|
}
|
|
|
|
func normalizeReplannerDecision(decision replannerDecision) replannerDecision {
|
|
decision.Action = strings.TrimSpace(decision.Action)
|
|
decision.Goal = strings.TrimSpace(decision.Goal)
|
|
decision.Instruction = strings.TrimSpace(decision.Instruction)
|
|
decision.Question = strings.TrimSpace(decision.Question)
|
|
for i := range decision.Steps {
|
|
if decision.Steps[i].ID == "" {
|
|
decision.Steps[i].ID = fmt.Sprintf("step_%d", i+1)
|
|
}
|
|
if decision.Steps[i].Status == "" {
|
|
decision.Steps[i].Status = planStepStatusPending
|
|
}
|
|
decision.Steps[i].Type = strings.TrimSpace(decision.Steps[i].Type)
|
|
decision.Steps[i].Title = strings.TrimSpace(decision.Steps[i].Title)
|
|
decision.Steps[i].ToolName = strings.TrimSpace(decision.Steps[i].ToolName)
|
|
decision.Steps[i].Instruction = strings.TrimSpace(decision.Steps[i].Instruction)
|
|
}
|
|
return decision
|
|
}
|
|
|
|
func applyReplannerDecision(state *ExecutionState, decision replannerDecision) bool {
|
|
switch decision.Action {
|
|
case "", "continue":
|
|
return false
|
|
case "finish":
|
|
state.Steps = append(completedSteps(state.Steps), PlanStep{
|
|
ID: fmt.Sprintf("step_finish_%d", time.Now().UTC().UnixNano()),
|
|
Type: planStepTypeRespond,
|
|
Title: "final response",
|
|
Status: planStepStatusPending,
|
|
Instruction: decision.Instruction,
|
|
})
|
|
state.CurrentStepID = ""
|
|
if decision.Goal != "" {
|
|
state.Goal = decision.Goal
|
|
}
|
|
state.Waiting = nil
|
|
return true
|
|
case "ask_user":
|
|
question := decision.Question
|
|
if question == "" {
|
|
question = decision.Instruction
|
|
}
|
|
state.Steps = append(completedSteps(state.Steps), PlanStep{
|
|
ID: fmt.Sprintf("step_ask_%d", time.Now().UTC().UnixNano()),
|
|
Type: planStepTypeAskUser,
|
|
Title: "need user input",
|
|
Status: planStepStatusPending,
|
|
Instruction: question,
|
|
})
|
|
state.CurrentStepID = ""
|
|
if decision.Goal != "" {
|
|
state.Goal = decision.Goal
|
|
}
|
|
state.Waiting = buildWaitingState(*state, state.Steps[len(state.Steps)-1], question)
|
|
return true
|
|
case "replace_remaining":
|
|
if len(decision.Steps) == 0 {
|
|
return false
|
|
}
|
|
state.Steps = append(completedSteps(state.Steps), decision.Steps...)
|
|
state.CurrentStepID = ""
|
|
if decision.Goal != "" {
|
|
state.Goal = decision.Goal
|
|
}
|
|
state.Waiting = nil
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func shouldAttemptReplan(state ExecutionState, step PlanStep, referencesChanged bool) bool {
|
|
if step.Type != planStepTypeTool {
|
|
return false
|
|
}
|
|
if toolResultIndicatesError(step.OutputSummary) || toolResultSignalsDependencyGap(step.OutputSummary) {
|
|
return true
|
|
}
|
|
if referencesChanged {
|
|
return true
|
|
}
|
|
if !hasPendingWorkAfterStep(state.Steps) {
|
|
return false
|
|
}
|
|
switch step.ToolName {
|
|
case "manage_trader", "manage_strategy", "manage_model_config", "manage_exchange_config", "execute_trade":
|
|
return toolActionMayChangePlan(step.ToolArgs)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func hasPendingWorkAfterStep(steps []PlanStep) bool {
|
|
for _, step := range steps {
|
|
if step.Status == planStepStatusPending {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func toolActionMayChangePlan(args map[string]any) bool {
|
|
action, _ := args["action"].(string)
|
|
switch strings.TrimSpace(action) {
|
|
case "create", "update", "delete", "start", "stop", "activate", "duplicate":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func toolResultIndicatesError(summary string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(summary))
|
|
return strings.Contains(lower, `"error"`) || strings.Contains(lower, `"status":"error"`) || strings.Contains(lower, "failed to ")
|
|
}
|
|
|
|
func toolResultSignalsDependencyGap(summary string) bool {
|
|
lower := strings.ToLower(strings.TrimSpace(summary))
|
|
patterns := []string{
|
|
"is required", "invalid ai_model_id", "invalid exchange_id", "invalid strategy_id",
|
|
"ai model is disabled", "exchange is disabled", "not found", "missing",
|
|
}
|
|
return containsAnyKeyword(lower, patterns)
|
|
}
|
|
|
|
func completedSteps(steps []PlanStep) []PlanStep {
|
|
out := make([]PlanStep, 0, len(steps))
|
|
for _, step := range steps {
|
|
if step.Status == planStepStatusCompleted {
|
|
out = append(out, step)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (a *Agent) planningStatusText(lang string) string {
|
|
if lang == "zh" {
|
|
return "🧭 正在规划执行步骤..."
|
|
}
|
|
return "🧭 Planning the next execution steps..."
|
|
}
|
|
|
|
func formatPlanStatus(state ExecutionState, lang string) string {
|
|
parts := make([]string, 0, len(state.Steps))
|
|
for i, step := range state.Steps {
|
|
label := step.Title
|
|
if label == "" {
|
|
label = step.Type
|
|
}
|
|
parts = append(parts, fmt.Sprintf("%d.%s", i+1, label))
|
|
}
|
|
if lang == "zh" {
|
|
return fmt.Sprintf("🗺️ 计划: %s", strings.Join(parts, " -> "))
|
|
}
|
|
return fmt.Sprintf("🗺️ Plan: %s", strings.Join(parts, " -> "))
|
|
}
|
|
|
|
func formatStepStatus(step PlanStep, idx, total int, lang string) string {
|
|
label := step.Title
|
|
if label == "" {
|
|
label = step.Type
|
|
}
|
|
if lang == "zh" {
|
|
return fmt.Sprintf("▶️ 步骤 %d/%d: %s", idx+1, total, label)
|
|
}
|
|
return fmt.Sprintf("▶️ Step %d/%d: %s", idx+1, total, label)
|
|
}
|
|
|
|
func formatStepCompleteStatus(step PlanStep, lang string) string {
|
|
label := step.Title
|
|
if label == "" {
|
|
label = step.Type
|
|
}
|
|
if lang == "zh" {
|
|
return fmt.Sprintf("✅ 已完成: %s", label)
|
|
}
|
|
return fmt.Sprintf("✅ Completed: %s", label)
|
|
}
|
|
|
|
func formatReplanStatus(decision replannerDecision, lang string) string {
|
|
switch decision.Action {
|
|
case "replace_remaining":
|
|
if lang == "zh" {
|
|
return "🔄 已根据新结果更新后续步骤"
|
|
}
|
|
return "🔄 Updated the remaining steps based on new results"
|
|
case "ask_user":
|
|
if lang == "zh" {
|
|
return "📝 当前流程需要用户补充信息"
|
|
}
|
|
return "📝 This flow needs more user input"
|
|
case "finish":
|
|
if lang == "zh" {
|
|
return "🏁 已提前收敛到最终回复"
|
|
}
|
|
return "🏁 Converged early to the final response"
|
|
default:
|
|
if lang == "zh" {
|
|
return "🔄 已重新评估计划"
|
|
}
|
|
return "🔄 Re-evaluated the plan"
|
|
}
|
|
}
|
|
|
|
func (a *Agent) executePlanTool(ctx context.Context, storeUserID string, userID int64, lang string, step PlanStep) string {
|
|
argsJSON := "{}"
|
|
if len(step.ToolArgs) > 0 {
|
|
if data, err := json.Marshal(step.ToolArgs); err == nil {
|
|
argsJSON = string(data)
|
|
}
|
|
}
|
|
return a.handleToolCall(ctx, storeUserID, userID, lang, mcp.ToolCall{
|
|
ID: step.ID,
|
|
Type: "function",
|
|
Function: mcp.ToolCallFunction{
|
|
Name: step.ToolName,
|
|
Arguments: argsJSON,
|
|
},
|
|
})
|
|
}
|
|
|
|
func (a *Agent) executeReasonStep(ctx context.Context, userID int64, lang, goal string, state ExecutionState, step PlanStep) (string, error) {
|
|
obsJSON, _ := json.Marshal(buildObservationContext(state))
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerReasonTimeout)
|
|
defer cancel()
|
|
|
|
startedAt := time.Now()
|
|
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage("You are the reasoning module for NOFXi. Return one short paragraph only. No markdown, no bullet list."),
|
|
mcp.NewUserMessage(fmt.Sprintf("Language: %s\nGoal: %s\nReasoning task: %s\nObservations JSON: %s\nPersistent preferences: %s\nTask state: %s", lang, goal, step.Instruction, string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
a.logPlannerTiming(state.SessionID, userID, "reason_step_llm", startedAt, err)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return summarizeObservation(resp), nil
|
|
}
|
|
|
|
func (a *Agent) generateFinalPlanResponse(ctx context.Context, userID int64, lang string, state ExecutionState, instruction string) (string, error) {
|
|
obsJSON, _ := json.Marshal(buildObservationContext(state))
|
|
systemPrompt := a.buildSystemPrompt(lang)
|
|
if instruction == "" {
|
|
instruction = "Provide the best possible final response to the user based on the finished execution."
|
|
}
|
|
stageCtx, cancel := withPlannerStageTimeout(ctx, plannerFinalTimeout)
|
|
defer cancel()
|
|
startedAt := time.Now()
|
|
resp, err := a.aiClient.CallWithRequest(&mcp.Request{
|
|
Messages: []mcp.Message{
|
|
mcp.NewSystemMessage(systemPrompt),
|
|
mcp.NewSystemMessage("You are responding after a completed execution plan. Use the observations as the source of truth. Be concise and actionable."),
|
|
mcp.NewUserMessage(fmt.Sprintf("Goal: %s\nResponse instruction: %s\nObservations JSON: %s\nPersistent preferences: %s\nTask state: %s", state.Goal, instruction, string(obsJSON), a.buildPersistentPreferencesContext(userID), buildTaskStateContext(a.getTaskState(userID)))),
|
|
},
|
|
Ctx: stageCtx,
|
|
})
|
|
a.logPlannerTiming(state.SessionID, userID, "generate_final_response_llm", startedAt, err)
|
|
return resp, err
|
|
}
|
|
|
|
func (a *Agent) logPlannerTiming(sessionID string, userID int64, stage string, startedAt time.Time, err error) {
|
|
if stage == "" || startedAt.IsZero() {
|
|
return
|
|
}
|
|
attrs := []any{
|
|
"session_id", sessionID,
|
|
"user_id", userID,
|
|
"stage", stage,
|
|
"elapsed_ms", time.Since(startedAt).Milliseconds(),
|
|
}
|
|
if err != nil {
|
|
attrs = append(attrs, "error", err.Error())
|
|
}
|
|
a.log().Info("planner timing", attrs...)
|
|
}
|
|
|
|
func nextPendingStepIndex(steps []PlanStep) int {
|
|
for i := range steps {
|
|
if steps[i].Status == "" || steps[i].Status == planStepStatusPending {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func summarizeObservation(value string) string {
|
|
value = strings.TrimSpace(value)
|
|
if len(value) <= observationMaxLength {
|
|
return value
|
|
}
|
|
return strings.TrimSpace(value[:observationMaxLength]) + "..."
|
|
}
|
|
|
|
func (a *Agent) thinkAndActLegacy(ctx context.Context, userID int64, lang, text string, onEvent func(event, data string)) (string, error) {
|
|
systemPrompt := a.buildSystemPrompt(lang)
|
|
enrichment := a.gatherContext(text)
|
|
preferencesCtx := a.buildPersistentPreferencesContext(userID)
|
|
|
|
userPrompt := text
|
|
if preferencesCtx != "" {
|
|
userPrompt = preferencesCtx + "\n\n---\n" + userPrompt
|
|
}
|
|
if enrichment != "" {
|
|
userPrompt = text + "\n\n---\n[NOFXi System Context - real-time data for reference]\n" + enrichment
|
|
if preferencesCtx != "" {
|
|
userPrompt = preferencesCtx + "\n\n---\n" + userPrompt
|
|
}
|
|
}
|
|
|
|
messages := []mcp.Message{mcp.NewSystemMessage(systemPrompt)}
|
|
taskStateCtx := buildTaskStateContext(a.getTaskState(userID))
|
|
if isConfigOrTraderIntent(text) {
|
|
taskStateCtx = ""
|
|
}
|
|
if taskStateCtx != "" {
|
|
messages = append(messages, mcp.NewSystemMessage(taskStateCtx))
|
|
}
|
|
history := a.history.Get(userID)
|
|
if len(history) > 0 {
|
|
history = history[:len(history)-1]
|
|
}
|
|
for _, msg := range history {
|
|
messages = append(messages, mcp.NewMessage(msg.Role, msg.Content))
|
|
}
|
|
messages = append(messages, mcp.NewUserMessage(userPrompt))
|
|
|
|
tools := agentTools()
|
|
|
|
const maxToolRounds = 5
|
|
for round := 0; round < maxToolRounds; round++ {
|
|
req := &mcp.Request{
|
|
Messages: messages,
|
|
Tools: tools,
|
|
ToolChoice: "auto",
|
|
Ctx: ctx,
|
|
}
|
|
|
|
resp, err := a.aiClient.CallWithRequestFull(req)
|
|
if err != nil {
|
|
if round == 0 {
|
|
plainResp, plainErr := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx})
|
|
if plainErr != nil {
|
|
a.logger.Warn("legacy AI plain fallback failed", "error", plainErr, "user_id", userID)
|
|
return a.aiServiceFailure(lang, plainErr)
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, plainResp)
|
|
}
|
|
return plainResp, nil
|
|
}
|
|
a.logger.Warn("legacy AI tool round failed", "error", err, "user_id", userID, "round", round)
|
|
return a.aiServiceFailure(lang, err)
|
|
}
|
|
|
|
if len(resp.ToolCalls) == 0 {
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, resp.Content)
|
|
}
|
|
return resp.Content, nil
|
|
}
|
|
|
|
assistantMsg := mcp.Message{Role: "assistant", ToolCalls: resp.ToolCalls}
|
|
if resp.Content != "" {
|
|
assistantMsg.Content = resp.Content
|
|
}
|
|
messages = append(messages, assistantMsg)
|
|
|
|
for _, tc := range resp.ToolCalls {
|
|
if onEvent != nil {
|
|
onEvent(StreamEventTool, tc.Function.Name)
|
|
}
|
|
result := a.handleToolCall(ctx, storeUserIDFromContext(ctx), userID, lang, tc)
|
|
messages = append(messages, mcp.Message{
|
|
Role: "tool",
|
|
Content: result,
|
|
ToolCallID: tc.ID,
|
|
})
|
|
}
|
|
}
|
|
|
|
finalResp, err := a.aiClient.CallWithRequest(&mcp.Request{Messages: messages, Ctx: ctx})
|
|
if err != nil {
|
|
a.logger.Warn("legacy AI final response failed", "error", err, "user_id", userID)
|
|
return a.aiServiceFailure(lang, err)
|
|
}
|
|
if onEvent != nil {
|
|
onEvent(StreamEventDelta, finalResp)
|
|
}
|
|
return finalResp, nil
|
|
}
|