feat: supports spawn subagent as a background task that not block the main agent workflow (#5081)
* feat:为subagent添加后台任务参数 * ruff * fix: update terminology from 'handoff mission' to 'background task' and refactor related logic * fix: update terminology from 'background_mission' to 'background_task' in HandoffTool and related logic * fix(HandoffTool): update background_task description for clarity on usage --------- Co-authored-by: Soulter <905617992@qq.com>
This commit is contained in:
@@ -44,6 +44,14 @@ class HandoffTool(FunctionTool, Generic[TContext]):
|
||||
"type": "string",
|
||||
"description": "The input to be handed off to another agent. This should be a clear and concise request or task.",
|
||||
},
|
||||
"background_task": {
|
||||
"type": "boolean",
|
||||
"description": (
|
||||
"Defaults to false. "
|
||||
"Set to true if the task may take noticeable time, involves external tools, or the user does not need to wait. "
|
||||
"Use false only for quick, immediate tasks."
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,13 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
|
||||
"""
|
||||
if isinstance(tool, HandoffTool):
|
||||
is_bg = tool_args.pop("background_task", False)
|
||||
if is_bg:
|
||||
async for r in cls._execute_handoff_background(
|
||||
tool, run_context, **tool_args
|
||||
):
|
||||
yield r
|
||||
return
|
||||
async for r in cls._execute_handoff(tool, run_context, **tool_args):
|
||||
yield r
|
||||
return
|
||||
@@ -146,6 +153,86 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _execute_handoff_background(
|
||||
cls,
|
||||
tool: HandoffTool,
|
||||
run_context: ContextWrapper[AstrAgentContext],
|
||||
**tool_args,
|
||||
):
|
||||
"""Execute a handoff as a background task.
|
||||
|
||||
Immediately yields a success response with a task_id, then runs
|
||||
the subagent asynchronously. When the subagent finishes, a
|
||||
``CronMessageEvent`` is created so the main LLM can inform the
|
||||
user of the result – the same pattern used by
|
||||
``_execute_background`` for regular background tasks.
|
||||
"""
|
||||
task_id = uuid.uuid4().hex
|
||||
|
||||
async def _run_handoff_in_background() -> None:
|
||||
try:
|
||||
await cls._do_handoff_background(
|
||||
tool=tool,
|
||||
run_context=run_context,
|
||||
task_id=task_id,
|
||||
**tool_args,
|
||||
)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.error(
|
||||
f"Background handoff {task_id} ({tool.name}) failed: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
asyncio.create_task(_run_handoff_in_background())
|
||||
|
||||
text_content = mcp.types.TextContent(
|
||||
type="text",
|
||||
text=(
|
||||
f"Background task dedicated to subagent '{tool.agent.name}' submitted. task_id={task_id}. "
|
||||
f"The subagent '{tool.agent.name}' is working on the task on hehalf you. "
|
||||
f"You will be notified when it finishes."
|
||||
),
|
||||
)
|
||||
yield mcp.types.CallToolResult(content=[text_content])
|
||||
|
||||
@classmethod
|
||||
async def _do_handoff_background(
|
||||
cls,
|
||||
tool: HandoffTool,
|
||||
run_context: ContextWrapper[AstrAgentContext],
|
||||
task_id: str,
|
||||
**tool_args,
|
||||
) -> None:
|
||||
"""Run the subagent handoff and, on completion, wake the main agent."""
|
||||
result_text = ""
|
||||
try:
|
||||
async for r in cls._execute_handoff(tool, run_context, **tool_args):
|
||||
if isinstance(r, mcp.types.CallToolResult):
|
||||
for content in r.content:
|
||||
if isinstance(content, mcp.types.TextContent):
|
||||
result_text += content.text + "\n"
|
||||
except Exception as e:
|
||||
result_text = (
|
||||
f"error: Background task execution failed, internal error: {e!s}"
|
||||
)
|
||||
|
||||
event = run_context.context.event
|
||||
|
||||
await cls._wake_main_agent_for_background_result(
|
||||
run_context=run_context,
|
||||
task_id=task_id,
|
||||
tool_name=tool.name,
|
||||
result_text=result_text,
|
||||
tool_args=tool_args,
|
||||
note=(
|
||||
event.get_extra("background_note")
|
||||
or f"Background task for subagent '{tool.agent.name}' finished."
|
||||
),
|
||||
summary_name=f"Dedicated to subagent `{tool.agent.name}`",
|
||||
extra_result_fields={"subagent_name": tool.agent.name},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _execute_background(
|
||||
cls,
|
||||
@@ -154,12 +241,6 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
task_id: str,
|
||||
**tool_args,
|
||||
) -> None:
|
||||
from astrbot.core.astr_main_agent import (
|
||||
MainAgentBuildConfig,
|
||||
_get_session_conv,
|
||||
build_main_agent,
|
||||
)
|
||||
|
||||
# run the tool
|
||||
result_text = ""
|
||||
try:
|
||||
@@ -177,21 +258,53 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
f"error: Background task execution failed, internal error: {e!s}"
|
||||
)
|
||||
|
||||
event = run_context.context.event
|
||||
|
||||
await cls._wake_main_agent_for_background_result(
|
||||
run_context=run_context,
|
||||
task_id=task_id,
|
||||
tool_name=tool.name,
|
||||
result_text=result_text,
|
||||
tool_args=tool_args,
|
||||
note=(
|
||||
event.get_extra("background_note")
|
||||
or f"Background task {tool.name} finished."
|
||||
),
|
||||
summary_name=tool.name,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _wake_main_agent_for_background_result(
|
||||
cls,
|
||||
run_context: ContextWrapper[AstrAgentContext],
|
||||
*,
|
||||
task_id: str,
|
||||
tool_name: str,
|
||||
result_text: str,
|
||||
tool_args: dict[str, T.Any],
|
||||
note: str,
|
||||
summary_name: str,
|
||||
extra_result_fields: dict[str, T.Any] | None = None,
|
||||
) -> None:
|
||||
from astrbot.core.astr_main_agent import (
|
||||
MainAgentBuildConfig,
|
||||
_get_session_conv,
|
||||
build_main_agent,
|
||||
)
|
||||
|
||||
event = run_context.context.event
|
||||
ctx = run_context.context.context
|
||||
|
||||
note = (
|
||||
event.get_extra("background_note")
|
||||
or f"Background task {tool.name} finished."
|
||||
)
|
||||
extras = {
|
||||
"background_task_result": {
|
||||
"task_id": task_id,
|
||||
"tool_name": tool.name,
|
||||
"result": result_text or "",
|
||||
"tool_args": tool_args,
|
||||
}
|
||||
task_result = {
|
||||
"task_id": task_id,
|
||||
"tool_name": tool_name,
|
||||
"result": result_text or "",
|
||||
"tool_args": tool_args,
|
||||
}
|
||||
if extra_result_fields:
|
||||
task_result.update(extra_result_fields)
|
||||
extras = {"background_task_result": task_result}
|
||||
|
||||
session = MessageSession.from_str(event.unified_msg_origin)
|
||||
cron_event = CronMessageEvent(
|
||||
context=ctx,
|
||||
@@ -222,8 +335,11 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
)
|
||||
req.prompt = (
|
||||
"Proceed according to your system instructions. "
|
||||
"Output using same language as previous conversation."
|
||||
" After completing your task, summarize and output your actions and results."
|
||||
"Output using same language as previous conversation. "
|
||||
"If you need to deliver the result to the user immediately, "
|
||||
"you MUST use `send_message_to_user` tool to send the message directly to the user, "
|
||||
"otherwise the user will not see the result. "
|
||||
"After completing your task, summarize and output your actions and results. "
|
||||
)
|
||||
if not req.func_tool:
|
||||
req.func_tool = ToolSet()
|
||||
@@ -233,7 +349,7 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
event=cron_event, plugin_context=ctx, config=config, req=req
|
||||
)
|
||||
if not result:
|
||||
logger.error("Failed to build main agent for background task job.")
|
||||
logger.error(f"Failed to build main agent for background task {tool_name}.")
|
||||
return
|
||||
|
||||
runner = result.agent_runner
|
||||
@@ -243,7 +359,7 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
|
||||
llm_resp = runner.get_final_llm_resp()
|
||||
task_meta = extras.get("background_task_result", {})
|
||||
summary_note = (
|
||||
f"[BackgroundTask] {task_meta.get('tool_name', tool.name)} "
|
||||
f"[BackgroundTask] {summary_name} "
|
||||
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
|
||||
f"Result: {task_meta.get('result') or result_text or 'no content'}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user