Files
nofx/agent/workflow.go
T
lky-spec 3ca95b294d feat: port NOFXi agent module onto latest dev base (#1485)
* feat: integrate NOFXi agent into dev

* Enhance NOFXi agent workflow and diagnostics
2026-04-21 23:47:55 +08:00

522 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package agent
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"nofx/mcp"
)
const (
workflowTaskPending = "pending"
workflowTaskRunning = "running"
workflowTaskCompleted = "completed"
workflowTaskFailed = "failed"
)
type WorkflowTask struct {
ID string `json:"id,omitempty"`
Skill string `json:"skill,omitempty"`
Action string `json:"action,omitempty"`
Request string `json:"request,omitempty"`
DependsOn []string `json:"depends_on,omitempty"`
Status string `json:"status,omitempty"`
Error string `json:"error,omitempty"`
}
type WorkflowSession struct {
UserID int64 `json:"user_id"`
OriginalRequest string `json:"original_request,omitempty"`
Tasks []WorkflowTask `json:"tasks,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
}
type workflowDecomposition struct {
Tasks []WorkflowTask `json:"tasks"`
}
func workflowSessionConfigKey(userID int64) string {
return fmt.Sprintf("agent_workflow_session_%d", userID)
}
func normalizeWorkflowSession(session WorkflowSession) WorkflowSession {
session.OriginalRequest = strings.TrimSpace(session.OriginalRequest)
normalized := make([]WorkflowTask, 0, len(session.Tasks))
for i, task := range session.Tasks {
task.ID = strings.TrimSpace(task.ID)
if task.ID == "" {
task.ID = fmt.Sprintf("task_%d", i+1)
}
task.Skill = strings.TrimSpace(task.Skill)
task.Action = normalizeAtomicSkillAction(task.Skill, task.Action)
task.Request = strings.TrimSpace(task.Request)
task.DependsOn = cleanStringList(task.DependsOn)
task.Status = strings.TrimSpace(task.Status)
if task.Status == "" {
task.Status = workflowTaskPending
}
task.Error = strings.TrimSpace(task.Error)
if task.Skill == "" || task.Action == "" || task.Request == "" {
continue
}
normalized = append(normalized, task)
}
session.Tasks = normalized
if len(session.Tasks) == 0 {
return WorkflowSession{}
}
if session.UpdatedAt == "" {
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
}
return session
}
func (a *Agent) getWorkflowSession(userID int64) WorkflowSession {
if a.store == nil {
return WorkflowSession{}
}
raw, err := a.store.GetSystemConfig(workflowSessionConfigKey(userID))
if err != nil || strings.TrimSpace(raw) == "" {
return WorkflowSession{}
}
var session WorkflowSession
if err := json.Unmarshal([]byte(raw), &session); err != nil {
return WorkflowSession{}
}
return normalizeWorkflowSession(session)
}
func (a *Agent) saveWorkflowSession(userID int64, session WorkflowSession) {
if a.store == nil {
return
}
session = normalizeWorkflowSession(session)
if len(session.Tasks) == 0 {
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), "")
return
}
session.UserID = userID
session.UpdatedAt = time.Now().UTC().Format(time.RFC3339)
data, err := json.Marshal(session)
if err != nil {
return
}
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), string(data))
}
func (a *Agent) clearWorkflowSession(userID int64) {
if a.store == nil {
return
}
_ = a.store.SetSystemConfig(workflowSessionConfigKey(userID), "")
}
func hasActiveWorkflowSession(session WorkflowSession) bool {
if len(session.Tasks) == 0 {
return false
}
for _, task := range session.Tasks {
if task.Status == workflowTaskPending || task.Status == workflowTaskRunning {
return true
}
}
return false
}
func nextRunnableWorkflowTask(session WorkflowSession) (WorkflowTask, int, bool) {
for i, task := range session.Tasks {
if task.Status != workflowTaskPending && task.Status != workflowTaskRunning {
continue
}
depsReady := true
for _, dep := range task.DependsOn {
ok := false
for _, candidate := range session.Tasks {
if candidate.ID == dep && candidate.Status == workflowTaskCompleted {
ok = true
break
}
}
if !ok {
depsReady = false
break
}
}
if depsReady {
return task, i, true
}
}
return WorkflowTask{}, -1, false
}
func supportedWorkflowSkill(skill, action string) bool {
skill = strings.TrimSpace(skill)
action = normalizeAtomicSkillAction(skill, action)
if skill == "" || action == "" {
return false
}
if _, ok := getSkillDAG(skill, action); ok {
return true
}
switch skill {
case "trader_management", "strategy_management", "model_management", "exchange_management":
switch action {
case "create", "query_list", "query_detail", "query_running", "activate":
return true
}
}
return false
}
func (a *Agent) tryWorkflowIntent(ctx context.Context, storeUserID string, userID int64, lang, text string, onEvent func(event, data string)) (string, bool, error) {
if session := a.getWorkflowSession(userID); hasActiveWorkflowSession(session) {
return a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, session, onEvent)
}
decomposition, err := a.decomposeWorkflowIntent(ctx, userID, lang, text)
if err != nil || len(decomposition.Tasks) <= 1 {
return "", false, err
}
session := WorkflowSession{
UserID: userID,
OriginalRequest: text,
Tasks: decomposition.Tasks,
}
a.saveWorkflowSession(userID, session)
return a.handleWorkflowSession(ctx, storeUserID, userID, lang, text, session, onEvent)
}
func (a *Agent) handleWorkflowSession(ctx context.Context, storeUserID string, userID int64, lang, text string, session WorkflowSession, onEvent func(event, data string)) (string, bool, error) {
if isExplicitFlowAbort(text) {
a.clearSkillSession(userID)
a.clearWorkflowSession(userID)
if lang == "zh" {
return "已取消当前任务流。", true, nil
}
return "Cancelled the current workflow.", true, nil
}
if activeSkill := a.getSkillSession(userID); strings.TrimSpace(activeSkill.Name) != "" {
answer, handled := a.tryHardSkill(ctx, storeUserID, userID, lang, text, onEvent)
if !handled {
return "", false, nil
}
session = a.getWorkflowSession(userID)
if hasActiveWorkflowSession(session) && strings.TrimSpace(a.getSkillSession(userID).Name) == "" {
session = markCurrentWorkflowTask(session, workflowTaskCompleted, "")
a.saveWorkflowSession(userID, session)
if final, done, err := a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent); done || err != nil {
if final != "" && answer != "" {
return answer + "\n\n" + final, true, err
}
if answer != "" {
return answer, true, err
}
return final, true, err
}
}
return answer, true, nil
}
return a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent)
}
func (a *Agent) maybeAdvanceWorkflow(ctx context.Context, storeUserID string, userID int64, lang string, session WorkflowSession, onEvent func(event, data string)) (string, bool, error) {
task, index, ok := nextRunnableWorkflowTask(session)
if !ok {
summary := a.generateWorkflowSummary(ctx, userID, lang, session)
a.clearWorkflowSession(userID)
if summary == "" {
if lang == "zh" {
summary = "已完成当前任务流。"
} else {
summary = "Completed the current workflow."
}
}
if onEvent != nil {
onEvent(StreamEventPlan, summary)
onEvent(StreamEventDelta, summary)
}
return summary, true, nil
}
session.Tasks[index].Status = workflowTaskRunning
a.saveWorkflowSession(userID, session)
taskSession := skillSession{Name: task.Skill, Action: task.Action, Phase: "collecting"}
a.saveSkillSession(userID, taskSession)
if onEvent != nil {
onEvent(StreamEventPlan, a.formatWorkflowStatus(lang, session))
onEvent(StreamEventTool, "workflow:"+task.Skill+":"+task.Action)
}
answer, handled := a.tryHardSkill(ctx, storeUserID, userID, lang, task.Request, onEvent)
if !handled {
session.Tasks[index].Status = workflowTaskFailed
session.Tasks[index].Error = "task_not_handled"
a.saveWorkflowSession(userID, session)
return "", false, nil
}
if strings.TrimSpace(a.getSkillSession(userID).Name) == "" {
session = a.getWorkflowSession(userID)
session = markCurrentWorkflowTask(session, workflowTaskCompleted, "")
a.saveWorkflowSession(userID, session)
if more, ok, err := a.maybeAdvanceWorkflow(ctx, storeUserID, userID, lang, session, onEvent); ok || err != nil {
if answer != "" && more != "" {
return answer + "\n\n" + more, true, err
}
if answer != "" {
return answer, true, err
}
return more, true, err
}
}
return answer, true, nil
}
func markCurrentWorkflowTask(session WorkflowSession, status, errMsg string) WorkflowSession {
for i := range session.Tasks {
if session.Tasks[i].Status == workflowTaskRunning {
session.Tasks[i].Status = status
session.Tasks[i].Error = strings.TrimSpace(errMsg)
return session
}
}
return session
}
func (a *Agent) formatWorkflowStatus(lang string, session WorkflowSession) string {
parts := make([]string, 0, len(session.Tasks))
for _, task := range session.Tasks {
label := task.Request
if label == "" {
label = task.Skill + ":" + task.Action
}
switch task.Status {
case workflowTaskCompleted:
label = "✓ " + label
case workflowTaskRunning:
label = "→ " + label
default:
label = "· " + label
}
parts = append(parts, label)
}
if lang == "zh" {
return "任务流:" + strings.Join(parts, " | ")
}
return "Workflow: " + strings.Join(parts, " | ")
}
func (a *Agent) generateWorkflowSummary(ctx context.Context, userID int64, lang string, session WorkflowSession) string {
completed := make([]string, 0, len(session.Tasks))
for _, task := range session.Tasks {
if task.Status == workflowTaskCompleted {
completed = append(completed, task.Request)
}
}
if len(completed) == 0 {
return ""
}
if a.aiClient == nil {
if lang == "zh" {
return "已完成这些任务:" + strings.Join(completed, "")
}
return "Completed these tasks: " + strings.Join(completed, "; ")
}
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
defer cancel()
systemPrompt := `You are summarizing a finished workflow for NOFXi.
Return one short user-facing summary in the user's language.
Do not mention internal DAG, scheduler, or JSON.`
userPrompt := fmt.Sprintf("Language: %s\nOriginal request: %s\nCompleted tasks:\n- %s", lang, session.OriginalRequest, strings.Join(completed, "\n- "))
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
if err != nil {
if lang == "zh" {
return "已完成这些任务:" + strings.Join(completed, "")
}
return "Completed these tasks: " + strings.Join(completed, "; ")
}
return strings.TrimSpace(raw)
}
func (a *Agent) decomposeWorkflowIntent(ctx context.Context, userID int64, lang, text string) (workflowDecomposition, error) {
if !looksLikeMultiTaskIntent(text) {
return workflowDecomposition{}, nil
}
if a.aiClient != nil {
if dec, err := a.decomposeWorkflowIntentWithLLM(ctx, userID, lang, text); err == nil && len(dec.Tasks) > 1 {
return dec, nil
}
}
return a.decomposeWorkflowIntentFallback(text), nil
}
func looksLikeMultiTaskIntent(text string) bool {
lower := strings.ToLower(strings.TrimSpace(text))
if lower == "" {
return false
}
connectors := []string{"", ",", "然后", "再", "并且", "并", "同时", "and", "then"}
count := 0
for _, c := range connectors {
if strings.Contains(lower, c) {
count++
}
}
return count > 0
}
func (a *Agent) decomposeWorkflowIntentWithLLM(ctx context.Context, userID int64, lang, text string) (workflowDecomposition, error) {
stageCtx, cancel := withPlannerStageTimeout(ctx, directReplyTimeout)
defer cancel()
systemPrompt := `You decompose one NOFXi user request into a small task graph.
Return JSON only. No markdown.
Only use these skills: trader_management, strategy_management, model_management, exchange_management.
Only use one atomic action per task.
Each task must include:
- id
- skill
- action
- request
- depends_on (array, may be empty)
If the request is effectively a single task, return one task only.`
userPrompt := fmt.Sprintf("Language: %s\nUser request: %s", lang, text)
raw, err := a.aiClient.CallWithRequest(&mcp.Request{
Messages: []mcp.Message{
mcp.NewSystemMessage(systemPrompt),
mcp.NewUserMessage(userPrompt),
},
Ctx: stageCtx,
})
if err != nil {
return workflowDecomposition{}, err
}
return parseWorkflowDecomposition(raw)
}
func parseWorkflowDecomposition(raw string) (workflowDecomposition, error) {
raw = strings.TrimSpace(raw)
raw = strings.TrimPrefix(raw, "```json")
raw = strings.TrimPrefix(raw, "```")
raw = strings.TrimSuffix(raw, "```")
raw = strings.TrimSpace(raw)
var out workflowDecomposition
if err := json.Unmarshal([]byte(raw), &out); err == nil {
out = normalizeWorkflowDecomposition(out)
return out, nil
}
start := strings.Index(raw, "{")
end := strings.LastIndex(raw, "}")
if start >= 0 && end > start {
if err := json.Unmarshal([]byte(raw[start:end+1]), &out); err == nil {
out = normalizeWorkflowDecomposition(out)
return out, nil
}
}
return workflowDecomposition{}, fmt.Errorf("invalid workflow json")
}
func normalizeWorkflowDecomposition(out workflowDecomposition) workflowDecomposition {
normalized := make([]WorkflowTask, 0, len(out.Tasks))
for i, task := range out.Tasks {
task.ID = strings.TrimSpace(task.ID)
if task.ID == "" {
task.ID = fmt.Sprintf("task_%d", i+1)
}
task.Skill = strings.TrimSpace(task.Skill)
task.Action = normalizeAtomicSkillAction(task.Skill, task.Action)
task.Request = strings.TrimSpace(task.Request)
task.DependsOn = cleanStringList(task.DependsOn)
if !supportedWorkflowSkill(task.Skill, task.Action) || task.Request == "" {
continue
}
task.Status = workflowTaskPending
normalized = append(normalized, task)
}
out.Tasks = normalized
return out
}
func (a *Agent) decomposeWorkflowIntentFallback(text string) workflowDecomposition {
segments := splitWorkflowSegments(text)
tasks := make([]WorkflowTask, 0, len(segments))
for i, segment := range segments {
task, ok := classifyWorkflowTask(segment)
if !ok {
continue
}
task.ID = fmt.Sprintf("task_%d", i+1)
task.Status = workflowTaskPending
if len(tasks) > 0 {
task.DependsOn = []string{tasks[len(tasks)-1].ID}
}
tasks = append(tasks, task)
}
return workflowDecomposition{Tasks: tasks}
}
func splitWorkflowSegments(text string) []string {
parts := []string{strings.TrimSpace(text)}
separators := []string{"", ",", "然后", "再", "并且", "同时", " and then ", " then ", " and "}
for _, sep := range separators {
next := make([]string, 0, len(parts))
for _, part := range parts {
split := strings.Split(part, sep)
for _, candidate := range split {
candidate = strings.TrimSpace(candidate)
if candidate != "" {
next = append(next, candidate)
}
}
}
parts = next
}
return parts
}
func classifyWorkflowTask(text string) (WorkflowTask, bool) {
segment := strings.TrimSpace(text)
if segment == "" {
return WorkflowTask{}, false
}
switch {
case detectCreateTraderSkill(segment):
return WorkflowTask{Skill: "trader_management", Action: "create", Request: segment}, true
case detectTraderManagementIntent(segment):
action := normalizeAtomicSkillAction("trader_management", detectManagementAction(segment, "trader"))
if supportedWorkflowSkill("trader_management", action) {
return WorkflowTask{Skill: "trader_management", Action: action, Request: segment}, true
}
case detectExchangeManagementIntent(segment):
action := normalizeAtomicSkillAction("exchange_management", detectManagementAction(segment, "exchange"))
if supportedWorkflowSkill("exchange_management", action) {
return WorkflowTask{Skill: "exchange_management", Action: action, Request: segment}, true
}
case detectModelManagementIntent(segment):
action := normalizeAtomicSkillAction("model_management", detectManagementAction(segment, "model"))
if supportedWorkflowSkill("model_management", action) {
return WorkflowTask{Skill: "model_management", Action: action, Request: segment}, true
}
case detectStrategyManagementIntent(segment):
action := normalizeAtomicSkillAction("strategy_management", detectManagementAction(segment, "strategy"))
if action == "" && wantsStrategyDetails(segment) {
action = "query_detail"
}
if supportedWorkflowSkill("strategy_management", action) {
return WorkflowTask{Skill: "strategy_management", Action: action, Request: segment}, true
}
}
return WorkflowTask{}, false
}