mirror of
https://github.com/laoxong/nofx.git
synced 2026-06-04 01:48:22 +08:00
3ca95b294d
* feat: integrate NOFXi agent into dev * Enhance NOFXi agent workflow and diagnostics
522 lines
16 KiB
Go
522 lines
16 KiB
Go
package agent
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
|
||
"nofx/mcp"
|
||
)
|
||
|
||
const (
|
||
workflowTaskPending = "pending"
|
||
workflowTaskRunning = "running"
|
||
workflowTaskCompleted = "completed"
|
||
workflowTaskFailed = "failed"
|
||
)
|
||
|
||
type WorkflowTask struct {
|
||
ID string `json:"id,omitempty"`
|
||
Skill string `json:"skill,omitempty"`
|
||
Action string `json:"action,omitempty"`
|
||
Request string `json:"request,omitempty"`
|
||
DependsOn []string `json:"depends_on,omitempty"`
|
||
Status string `json:"status,omitempty"`
|
||
Error string `json:"error,omitempty"`
|
||
}
|
||
|
||
type WorkflowSession struct {
|
||
UserID int64 `json:"user_id"`
|
||
OriginalRequest string `json:"original_request,omitempty"`
|
||
Tasks []WorkflowTask `json:"tasks,omitempty"`
|
||
UpdatedAt string `json:"updated_at,omitempty"`
|
||
}
|
||
|
||
type workflowDecomposition struct {
|
||
Tasks []WorkflowTask `json:"tasks"`
|
||
}
|
||
|
||
func workflowSessionConfigKey(userID int64) string {
|
||
return fmt.Sprintf("agent_workflow_session_%d", userID)
|
||
}
|
||
|
||
func normalizeWorkflowSession(session WorkflowSession) WorkflowSession {
|
||
session.OriginalRequest = strings.TrimSpace(session.OriginalRequest)
|
||
normalized := make([]WorkflowTask, 0, len(session.Tasks))
|
||
for i, task := range session.Tasks {
|
||
task.ID = strings.TrimSpace(task.ID)
|
||
if task.ID == "" {
|
||
task.ID = fmt.Sprintf("task_%d", i+1)
|
||
}
|
||
task.Skill = strings.TrimSpace(task.Skill)
|
||
task.Action = normalizeAtomicSkillAction(task.Skill, task.Action)
|
||
task.Request = strings.TrimSpace(task.Request)
|
||
task.DependsOn = cleanStringList(task.DependsOn)
|
||
task.Status = strings.TrimSpace(task.Status)
|
||
if task.Status == "" {
|
||
task.Status = workflowTaskPending
|
||
}
|
||
task.Error = strings.TrimSpace(task.Error)
|
||
if task.Skill == "" || task.Action == "" || task.Request == "" {
|
||
continue
|
||
}
|
||
normalized = append(normalized, task)
|
||
}
|
||
session.Tasks = normalized
|
||
if len(session.Tasks) == 0 {
|
||
return WorkflowSession{}
|
||
}
|
||
if session.UpdatedAt == "" {
|
||
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||
}
|
||
return session
|
||
}
|
||
|
||
func (a *Agent) getWorkflowSession(userID int64) WorkflowSession {
|
||
if a.store == nil {
|
||
return WorkflowSession{}
|
||
}
|
||
raw, err := a.store.GetSystemConfig(workflowSessionConfigKey(userID))
|
||
if err != nil || strings.TrimSpace(raw) == "" {
|
||
return WorkflowSession{}
|
||
}
|
||
var session WorkflowSession
|
||
if err := json.Unmarshal([]byte(raw), &session); err != nil {
|
||
return WorkflowSession{}
|
||
}
|
||
return normalizeWorkflowSession(session)
|
||
}
|
||
|
||
func (a *Agent) saveWorkflowSession(userID int64, session WorkflowSession) {
|
||
if a.store == nil {
|
||
return
|
||
}
|
||
session = normalizeWorkflowSession(session)
|
||
if len(session.Tasks) == 0 {
|
||
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), "")
|
||
return
|
||
}
|
||
session.UserID = userID
|
||
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
|
||
data, err := json.Marshal(session)
|
||
if err != nil {
|
||
return
|
||
}
|
||
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), string(data))
|
||
}
|
||
|
||
func (a *Agent) clearWorkflowSession(userID int64) {
|
||
if a.store == nil {
|
||
return
|
||
}
|
||
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), "")
|
||
}
|
||
|
||
func hasActiveWorkflowSession(session WorkflowSession) bool {
|
||
if len(session.Tasks) == 0 {
|
||
return false
|
||
}
|
||
for _, task := range session.Tasks {
|
||
if task.Status == workflowTaskPending || task.Status == workflowTaskRunning {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func nextRunnableWorkflowTask(session WorkflowSession) (WorkflowTask, int, bool) {
|
||
for i, task := range session.Tasks {
|
||
if task.Status != workflowTaskPending && task.Status != workflowTaskRunning {
|
||
continue
|
||
}
|
||
depsReady := true
|
||
for _, dep := range task.DependsOn {
|
||
ok := false
|
||
for _, candidate := range session.Tasks {
|
||
if candidate.ID == dep && candidate.Status == workflowTaskCompleted {
|
||
ok = true
|
||
break
|
||
}
|
||
}
|
||
if !ok {
|
||
depsReady = false
|
||
break
|
||
}
|
||
}
|
||
if depsReady {
|
||
return task, i, true
|
||
}
|
||
}
|
||
return WorkflowTask{}, -1, false
|
||
}
|
||
|
||
func supportedWorkflowSkill(skill, action string) bool {
|
||
skill = strings.TrimSpace(skill)
|
||
action = normalizeAtomicSkillAction(skill, action)
|
||
if skill == "" || action == "" {
|
||
return false
|
||
}
|
||
if _, ok := getSkillDAG(skill, action); ok {
|
||
return true
|
||
}
|
||
switch skill {
|
||
case "trader_management", "strategy_management", "model_management", "exchange_management":
|
||
switch action {
|
||
case "create", "query_list", "query_detail", "query_running", "activate":
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func (a *Agent) tryWorkflowIntent(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) {
|
||
if session := a.getWorkflowSession(userID); hasActiveWorkflowSession(session) {
|
||
return a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, session, onEvent)
|
||
}
|
||
|
||
decomposition, err := a.decomposeWorkflowIntent(ctx, userID, lang, text)
|
||
if err != nil || len(decomposition.Tasks) <= 1 {
|
||
return "", false, err
|
||
}
|
||
session := WorkflowSession{
|
||
UserID: userID,
|
||
OriginalRequest: text,
|
||
Tasks: decomposition.Tasks,
|
||
}
|
||
a.saveWorkflowSession(userID, session)
|
||
return a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, session, onEvent)
|
||
}
|
||
|
||
func (a *Agent) handleWorkflowSession(ctx context.Context, storeUserID string, userID int64, lang, text string, session WorkflowSession, onEvent func(event, data string)) (string, bool, error) {
|
||
if isExplicitFlowAbort(text) {
|
||
a.clearSkillSession(userID)
|
||
a.clearWorkflowSession(userID)
|
||
if lang == "zh" {
|
||
return "已取消当前任务流。", true, nil
|
||
}
|
||
return "Cancelled the current workflow.", true, nil
|
||
}
|
||
|
||
if activeSkill := a.getSkillSession(userID); strings.TrimSpace(activeSkill.Name) != "" {
|
||
answer, handled := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent)
|
||
if !handled {
|
||
return "", false, nil
|
||
}
|
||
session = a.getWorkflowSession(userID)
|
||
if hasActiveWorkflowSession(session) && strings.TrimSpace(a.getSkillSession(userID).Name) == "" {
|
||
session = markCurrentWorkflowTask(session, workflowTaskCompleted, "")
|
||
a.saveWorkflowSession(userID, session)
|
||
if final, done, err := a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent); done || err != nil {
|
||
if final != "" && answer != "" {
|
||
return answer + "\n\n" + final, true, err
|
||
}
|
||
if answer != "" {
|
||
return answer, true, err
|
||
}
|
||
return final, true, err
|
||
}
|
||
}
|
||
return answer, true, nil
|
||
}
|
||
|
||
return a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent)
|
||
}
|
||
|
||
func (a *Agent) maybeAdvanceWorkflow(ctx context.Context, storeUserID string, userID int64, lang string, session WorkflowSession, onEvent func(event, data string)) (string, bool, error) {
|
||
task, index, ok := nextRunnableWorkflowTask(session)
|
||
if !ok {
|
||
summary := a.generateWorkflowSummary(ctx, userID, lang, session)
|
||
a.clearWorkflowSession(userID)
|
||
if summary == "" {
|
||
if lang == "zh" {
|
||
summary = "已完成当前任务流。"
|
||
} else {
|
||
summary = "Completed the current workflow."
|
||
}
|
||
}
|
||
if onEvent != nil {
|
||
onEvent(StreamEventPlan, summary)
|
||
onEvent(StreamEventDelta, summary)
|
||
}
|
||
return summary, true, nil
|
||
}
|
||
|
||
session.Tasks[index].Status = workflowTaskRunning
|
||
a.saveWorkflowSession(userID, session)
|
||
taskSession := skillSession{Name: task.Skill, Action: task.Action, Phase: "collecting"}
|
||
a.saveSkillSession(userID, taskSession)
|
||
|
||
if onEvent != nil {
|
||
onEvent(StreamEventPlan, a.formatWorkflowStatus(lang, session))
|
||
onEvent(StreamEventTool, "workflow:"+task.Skill+":"+task.Action)
|
||
}
|
||
|
||
answer, handled := a.tryHardSkill(ctx, storeUserID, userID, lang, task.Request, onEvent)
|
||
if !handled {
|
||
session.Tasks[index].Status = workflowTaskFailed
|
||
session.Tasks[index].Error = "task_not_handled"
|
||
a.saveWorkflowSession(userID, session)
|
||
return "", false, nil
|
||
}
|
||
|
||
if strings.TrimSpace(a.getSkillSession(userID).Name) == "" {
|
||
session = a.getWorkflowSession(userID)
|
||
session = markCurrentWorkflowTask(session, workflowTaskCompleted, "")
|
||
a.saveWorkflowSession(userID, session)
|
||
if more, ok, err := a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent); ok || err != nil {
|
||
if answer != "" && more != "" {
|
||
return answer + "\n\n" + more, true, err
|
||
}
|
||
if answer != "" {
|
||
return answer, true, err
|
||
}
|
||
return more, true, err
|
||
}
|
||
}
|
||
return answer, true, nil
|
||
}
|
||
|
||
func markCurrentWorkflowTask(session WorkflowSession, status, errMsg string) WorkflowSession {
|
||
for i := range session.Tasks {
|
||
if session.Tasks[i].Status == workflowTaskRunning {
|
||
session.Tasks[i].Status = status
|
||
session.Tasks[i].Error = strings.TrimSpace(errMsg)
|
||
return session
|
||
}
|
||
}
|
||
return session
|
||
}
|
||
|
||
func (a *Agent) formatWorkflowStatus(lang string, session WorkflowSession) string {
|
||
parts := make([]string, 0, len(session.Tasks))
|
||
for _, task := range session.Tasks {
|
||
label := task.Request
|
||
if label == "" {
|
||
label = task.Skill + ":" + task.Action
|
||
}
|
||
switch task.Status {
|
||
case workflowTaskCompleted:
|
||
label = "✓ " + label
|
||
case workflowTaskRunning:
|
||
label = "→ " + label
|
||
default:
|
||
label = "· " + label
|
||
}
|
||
parts = append(parts, label)
|
||
}
|
||
if lang == "zh" {
|
||
return "任务流:" + strings.Join(parts, " | ")
|
||
}
|
||
return "Workflow: " + strings.Join(parts, " | ")
|
||
}
|
||
|
||
func (a *Agent) generateWorkflowSummary(ctx context.Context, userID int64, lang string, session WorkflowSession) string {
|
||
completed := make([]string, 0, len(session.Tasks))
|
||
for _, task := range session.Tasks {
|
||
if task.Status == workflowTaskCompleted {
|
||
completed = append(completed, task.Request)
|
||
}
|
||
}
|
||
if len(completed) == 0 {
|
||
return ""
|
||
}
|
||
if a.aiClient == nil {
|
||
if lang == "zh" {
|
||
return "已完成这些任务:" + strings.Join(completed, ";")
|
||
}
|
||
return "Completed these tasks: " + strings.Join(completed, "; ")
|
||
}
|
||
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
|
||
defer cancel()
|
||
systemPrompt := `You are summarizing a finished workflow for NOFXi.
|
||
Return one short user-facing summary in the user's language.
|
||
Do not mention internal DAG, scheduler, or JSON.`
|
||
userPrompt := fmt.Sprintf("Language: %s\nOriginal request: %s\nCompleted tasks:\n- %s", lang, session.OriginalRequest, strings.Join(completed, "\n- "))
|
||
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
||
Messages: []mcp.Message{
|
||
mcp.NewSystemMessage(systemPrompt),
|
||
mcp.NewUserMessage(userPrompt),
|
||
},
|
||
Ctx: stageCtx,
|
||
})
|
||
if err != nil {
|
||
if lang == "zh" {
|
||
return "已完成这些任务:" + strings.Join(completed, ";")
|
||
}
|
||
return "Completed these tasks: " + strings.Join(completed, "; ")
|
||
}
|
||
return strings.TrimSpace(raw)
|
||
}
|
||
|
||
func (a *Agent) decomposeWorkflowIntent(ctx context.Context, userID int64, lang, text string) (workflowDecomposition, error) {
|
||
if !looksLikeMultiTaskIntent(text) {
|
||
return workflowDecomposition{}, nil
|
||
}
|
||
if a.aiClient != nil {
|
||
if dec, err := a.decomposeWorkflowIntentWithLLM(ctx, userID, lang, text); err == nil && len(dec.Tasks) > 1 {
|
||
return dec, nil
|
||
}
|
||
}
|
||
return a.decomposeWorkflowIntentFallback(text), nil
|
||
}
|
||
|
||
func looksLikeMultiTaskIntent(text string) bool {
|
||
lower := strings.ToLower(strings.TrimSpace(text))
|
||
if lower == "" {
|
||
return false
|
||
}
|
||
connectors := []string{",", ",", "然后", "再", "并且", "并", "同时", "and", "then"}
|
||
count := 0
|
||
for _, c := range connectors {
|
||
if strings.Contains(lower, c) {
|
||
count++
|
||
}
|
||
}
|
||
return count > 0
|
||
}
|
||
|
||
func (a *Agent) decomposeWorkflowIntentWithLLM(ctx context.Context, userID int64, lang, text string) (workflowDecomposition, error) {
|
||
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
|
||
defer cancel()
|
||
systemPrompt := `You decompose one NOFXi user request into a small task graph.
|
||
Return JSON only. No markdown.
|
||
Only use these skills: trader_management, strategy_management, model_management, exchange_management.
|
||
Only use one atomic action per task.
|
||
Each task must include:
|
||
- id
|
||
- skill
|
||
- action
|
||
- request
|
||
- depends_on (array, may be empty)
|
||
If the request is effectively a single task, return one task only.`
|
||
userPrompt := fmt.Sprintf("Language: %s\nUser request: %s", lang, text)
|
||
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
|
||
Messages: []mcp.Message{
|
||
mcp.NewSystemMessage(systemPrompt),
|
||
mcp.NewUserMessage(userPrompt),
|
||
},
|
||
Ctx: stageCtx,
|
||
})
|
||
if err != nil {
|
||
return workflowDecomposition{}, err
|
||
}
|
||
return parseWorkflowDecomposition(raw)
|
||
}
|
||
|
||
func parseWorkflowDecomposition(raw string) (workflowDecomposition, error) {
|
||
raw = strings.TrimSpace(raw)
|
||
raw = strings.TrimPrefix(raw, "```json")
|
||
raw = strings.TrimPrefix(raw, "```")
|
||
raw = strings.TrimSuffix(raw, "```")
|
||
raw = strings.TrimSpace(raw)
|
||
var out workflowDecomposition
|
||
if err := json.Unmarshal([]byte(raw), &out); err == nil {
|
||
out = normalizeWorkflowDecomposition(out)
|
||
return out, nil
|
||
}
|
||
start := strings.Index(raw, "{")
|
||
end := strings.LastIndex(raw, "}")
|
||
if start >= 0 && end > start {
|
||
if err := json.Unmarshal([]byte(raw[start:end+1]), &out); err == nil {
|
||
out = normalizeWorkflowDecomposition(out)
|
||
return out, nil
|
||
}
|
||
}
|
||
return workflowDecomposition{}, fmt.Errorf("invalid workflow json")
|
||
}
|
||
|
||
func normalizeWorkflowDecomposition(out workflowDecomposition) workflowDecomposition {
|
||
normalized := make([]WorkflowTask, 0, len(out.Tasks))
|
||
for i, task := range out.Tasks {
|
||
task.ID = strings.TrimSpace(task.ID)
|
||
if task.ID == "" {
|
||
task.ID = fmt.Sprintf("task_%d", i+1)
|
||
}
|
||
task.Skill = strings.TrimSpace(task.Skill)
|
||
task.Action = normalizeAtomicSkillAction(task.Skill, task.Action)
|
||
task.Request = strings.TrimSpace(task.Request)
|
||
task.DependsOn = cleanStringList(task.DependsOn)
|
||
if !supportedWorkflowSkill(task.Skill, task.Action) || task.Request == "" {
|
||
continue
|
||
}
|
||
task.Status = workflowTaskPending
|
||
normalized = append(normalized, task)
|
||
}
|
||
out.Tasks = normalized
|
||
return out
|
||
}
|
||
|
||
func (a *Agent) decomposeWorkflowIntentFallback(text string) workflowDecomposition {
|
||
segments := splitWorkflowSegments(text)
|
||
tasks := make([]WorkflowTask, 0, len(segments))
|
||
for i, segment := range segments {
|
||
task, ok := classifyWorkflowTask(segment)
|
||
if !ok {
|
||
continue
|
||
}
|
||
task.ID = fmt.Sprintf("task_%d", i+1)
|
||
task.Status = workflowTaskPending
|
||
if len(tasks) > 0 {
|
||
task.DependsOn = []string{tasks[len(tasks)-1].ID}
|
||
}
|
||
tasks = append(tasks, task)
|
||
}
|
||
return workflowDecomposition{Tasks: tasks}
|
||
}
|
||
|
||
func splitWorkflowSegments(text string) []string {
|
||
parts := []string{strings.TrimSpace(text)}
|
||
separators := []string{",", ",", "然后", "再", "并且", "同时", " and then ", " then ", " and "}
|
||
for _, sep := range separators {
|
||
next := make([]string, 0, len(parts))
|
||
for _, part := range parts {
|
||
split := strings.Split(part, sep)
|
||
for _, candidate := range split {
|
||
candidate = strings.TrimSpace(candidate)
|
||
if candidate != "" {
|
||
next = append(next, candidate)
|
||
}
|
||
}
|
||
}
|
||
parts = next
|
||
}
|
||
return parts
|
||
}
|
||
|
||
func classifyWorkflowTask(text string) (WorkflowTask, bool) {
|
||
segment := strings.TrimSpace(text)
|
||
if segment == "" {
|
||
return WorkflowTask{}, false
|
||
}
|
||
switch {
|
||
case detectCreateTraderSkill(segment):
|
||
return WorkflowTask{Skill: "trader_management", Action: "create", Request: segment}, true
|
||
case detectTraderManagementIntent(segment):
|
||
action := normalizeAtomicSkillAction("trader_management", detectManagementAction(segment, "trader"))
|
||
if supportedWorkflowSkill("trader_management", action) {
|
||
return WorkflowTask{Skill: "trader_management", Action: action, Request: segment}, true
|
||
}
|
||
case detectExchangeManagementIntent(segment):
|
||
action := normalizeAtomicSkillAction("exchange_management", detectManagementAction(segment, "exchange"))
|
||
if supportedWorkflowSkill("exchange_management", action) {
|
||
return WorkflowTask{Skill: "exchange_management", Action: action, Request: segment}, true
|
||
}
|
||
case detectModelManagementIntent(segment):
|
||
action := normalizeAtomicSkillAction("model_management", detectManagementAction(segment, "model"))
|
||
if supportedWorkflowSkill("model_management", action) {
|
||
return WorkflowTask{Skill: "model_management", Action: action, Request: segment}, true
|
||
}
|
||
case detectStrategyManagementIntent(segment):
|
||
action := normalizeAtomicSkillAction("strategy_management", detectManagementAction(segment, "strategy"))
|
||
if action == "" && wantsStrategyDetails(segment) {
|
||
action = "query_detail"
|
||
}
|
||
if supportedWorkflowSkill("strategy_management", action) {
|
||
return WorkflowTask{Skill: "strategy_management", Action: action, Request: segment}, true
|
||
}
|
||
}
|
||
return WorkflowTask{}, false
|
||
}
|