feat: add cron job management tools and dashboard integration

- Implemented proactive cron job tools in InternalAgentSubStage for scheduling tasks.
- Created SendMessageToUserTool for sending messages to users based on cron job triggers.
- Added CreateActiveCronTool, DeleteCronJobTool, and ListCronJobsTool for cron job management.
- Introduced CronRoute for handling cron job API requests in the dashboard.
- Developed CronJobPage.vue for managing cron jobs in the dashboard UI.
- Updated SubAgentPage.vue to include persona selection for subagents.
This commit is contained in:
Soulter
2026-01-31 17:08:37 +08:00
parent 831c2150d6
commit 4ea865f017
26 changed files with 1375 additions and 214 deletions
@@ -123,7 +123,29 @@ class ProcessLLMRequest:
continue
if a.get("enabled", True) is False:
continue
persona_tools = None
persona_id = a.get("persona_id")
if persona_id:
persona_tools = next(
(
p.get("tools")
for p in self.ctx.persona_manager.personas_v3
if p["name"] == persona_id
),
None,
)
tools = a.get("tools", [])
if persona_tools is not None:
tools = persona_tools
if tools is None:
assigned_tools.update(
[
tool.name
for tool in tmgr.func_list
if not isinstance(tool, HandoffTool)
]
)
continue
if not isinstance(tools, list):
continue
for t in tools:
+2 -1
View File
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Generic
from typing import Any, Generic
from .hooks import BaseAgentRunHooks
from .run_context import TContext
@@ -12,3 +12,4 @@ class Agent(Generic[TContext]):
instructions: str | None = None
tools: list[str | FunctionTool] | None = None
run_hooks: BaseAgentRunHooks[TContext] | None = None
begin_dialogs: list[Any] | None = None
+5
View File
@@ -58,6 +58,11 @@ class FunctionTool(ToolSchema, Generic[TContext]):
Whether the tool is active. This field is a special field for AstrBot.
You can ignore it when integrating with other frameworks.
"""
is_background_task: bool = False
"""
Declare this tool as a background task. Background tasks return immediately
with a task identifier while the real work continues asynchronously.
"""
def __repr__(self):
return f"FuncTool(name={self.name}, parameters={self.parameters}, description={self.description})"
+100 -2
View File
@@ -2,6 +2,7 @@ import asyncio
import inspect
import traceback
import typing as T
import uuid
import mcp
@@ -9,14 +10,17 @@ from astrbot import logger
from astrbot.core.agent.handoff import HandoffTool
from astrbot.core.agent.mcp_client import MCPTool
from astrbot.core.agent.run_context import ContextWrapper
from astrbot.core.agent.message import Message
from astrbot.core.agent.tool import FunctionTool, ToolSet
from astrbot.core.agent.tool_executor import BaseFunctionToolExecutor
from astrbot.core.astr_agent_context import AstrAgentContext
from astrbot.core.cron.events import CronMessageEvent
from astrbot.core.message.message_event_result import (
CommandResult,
MessageChain,
MessageEventResult,
)
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.provider.register import llm_tools
@@ -43,6 +47,31 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
yield r
return
elif tool.is_background_task:
task_id = uuid.uuid4().hex
async def _run_in_background():
try:
await cls._execute_background(
tool=tool,
run_context=run_context,
task_id=task_id,
**tool_args,
)
except Exception as e: # noqa: BLE001
logger.error(
f"Background task {task_id} failed: {e!s}",
exc_info=True,
)
asyncio.create_task(_run_in_background())
text_content = mcp.types.TextContent(
type="text",
text=f"Background task submitted. task_id={task_id}",
)
yield mcp.types.CallToolResult(content=[text_content])
return
else:
async for r in cls._execute_local(tool, run_context, **tool_args):
yield r
@@ -80,12 +109,29 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
prov_id = getattr(
tool, "provider_id", None
) or await ctx.get_current_chat_provider_id(umo)
# prepare begin dialogs
contexts = None
dialogs = tool.agent.begin_dialogs
if dialogs:
contexts = []
for dialog in dialogs:
try:
contexts.append(
dialog
if isinstance(dialog, Message)
else Message.model_validate(dialog)
)
except Exception:
continue
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
system_prompt=tool.agent.instructions,
tools=toolset,
contexts=contexts,
max_steps=30,
run_hooks=tool.agent.run_hooks,
)
@@ -93,11 +139,63 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
)
@classmethod
async def _execute_background(
cls,
tool: FunctionTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
):
# run the tool
result_text = ""
try:
async for r in cls._execute_local(
tool, run_context, tool_call_timeout=3600, **tool_args
):
# collect results, currently we just collect the text results
if isinstance(r, mcp.types.CallToolResult):
result_text = ""
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background task execution failed, internal error: {e!s}"
)
event = run_context.context.event
ctx = run_context.context.context
note = (
event.get_extra("background_note")
or f"Background task {tool.name} finished."
)
extras = {
"background_task_result": {
"task_id": task_id,
"tool_name": tool.name,
"result": result_text or "",
"tool_args": tool_args,
}
}
session = MessageSession.from_str(event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras=extras,
message_type=session.message_type,
)
ctx.get_event_queue().put_nowait(cron_event)
@classmethod
async def _execute_local(
cls,
tool: FunctionTool,
run_context: ContextWrapper[AstrAgentContext],
*,
tool_call_timeout: int | None = None,
**tool_args,
):
event = run_context.context.event
@@ -138,7 +236,7 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
try:
resp = await asyncio.wait_for(
anext(wrapper),
timeout=run_context.tool_call_timeout,
timeout=tool_call_timeout or run_context.tool_call_timeout,
)
if resp is not None:
if isinstance(resp, mcp.types.CallToolResult):
@@ -170,7 +268,7 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
yield None
except asyncio.TimeoutError:
raise Exception(
f"tool {tool.name} execution timeout after {run_context.tool_call_timeout} seconds.",
f"tool {tool.name} execution timeout after {tool_call_timeout or run_context.tool_call_timeout} seconds.",
)
except StopAsyncIteration:
break
+22 -4
View File
@@ -25,6 +25,7 @@ from astrbot.core.db import BaseDatabase
from astrbot.core.knowledge_base.kb_mgr import KnowledgeBaseManager
from astrbot.core.persona_mgr import PersonaManager
from astrbot.core.pipeline.scheduler import PipelineContext, PipelineScheduler
from astrbot.core.cron import CronJobManager
from astrbot.core.platform.manager import PlatformManager
from astrbot.core.platform_message_history_mgr import PlatformMessageHistoryManager
from astrbot.core.provider.manager import ProviderManager
@@ -57,6 +58,7 @@ class AstrBotCoreLifecycle:
# Optional orchestrator that registers dynamic handoff tools (transfer_to_*)
# from provider_settings.subagent_orchestrator.
self.subagent_orchestrator: SubAgentOrchestrator | None = None
self.cron_manager: CronJobManager | None = None
# 设置代理
proxy_config = self.astrbot_config.get("http_proxy", "")
@@ -77,7 +79,7 @@ class AstrBotCoreLifecycle:
del os.environ["no_proxy"]
logger.debug("HTTP proxy cleared")
def _init_or_reload_subagent_orchestrator(self) -> None:
async def _init_or_reload_subagent_orchestrator(self) -> None:
"""Create (if needed) and reload the subagent orchestrator from config.
This keeps lifecycle wiring in one place while allowing the orchestrator
@@ -87,8 +89,9 @@ class AstrBotCoreLifecycle:
if self.subagent_orchestrator is None:
self.subagent_orchestrator = SubAgentOrchestrator(
self.provider_manager.llm_tools,
self.persona_mgr,
)
self.subagent_orchestrator.reload_from_config(
await self.subagent_orchestrator.reload_from_config(
self.astrbot_config.get("subagent_orchestrator", {}),
)
except Exception as e:
@@ -159,6 +162,9 @@ class AstrBotCoreLifecycle:
# 初始化知识库管理器
self.kb_manager = KnowledgeBaseManager(self.provider_manager)
# 初始化 CronJob 管理器
self.cron_manager = CronJobManager(self.star_context, self.db)
# 初始化提供给插件的上下文
self.star_context = Context(
self.event_queue,
@@ -171,6 +177,7 @@ class AstrBotCoreLifecycle:
self.persona_mgr,
self.astrbot_config_mgr,
self.kb_manager,
self.cron_manager,
)
# 初始化插件管理器
@@ -198,7 +205,7 @@ class AstrBotCoreLifecycle:
)
# Dynamic subagents (handoff tools) from config.
self._init_or_reload_subagent_orchestrator()
await self._init_or_reload_subagent_orchestrator()
# 记录启动时间
self.start_time = int(time.time())
@@ -221,13 +228,21 @@ class AstrBotCoreLifecycle:
self.event_bus.dispatch(),
name="event_bus",
)
cron_task = None
if self.cron_manager:
cron_task = asyncio.create_task(
self.cron_manager.start(),
name="cron_manager",
)
# 把插件中注册的所有协程函数注册到事件总线中并执行
extra_tasks = []
for task in self.star_context._register_tasks:
extra_tasks.append(asyncio.create_task(task, name=task.__name__)) # type: ignore
tasks_ = [event_bus_task, *extra_tasks]
tasks_ = [event_bus_task, *(extra_tasks if extra_tasks else [])]
if cron_task:
tasks_.append(cron_task)
for task in tasks_:
self.curr_tasks.append(
asyncio.create_task(self._task_wrapper(task), name=task.get_name()),
@@ -283,6 +298,9 @@ class AstrBotCoreLifecycle:
for task in self.curr_tasks:
task.cancel()
if self.cron_manager:
await self.cron_manager.shutdown()
for plugin in self.plugin_manager.context.get_all_stars():
try:
await self.plugin_manager._terminate_plugin(plugin)
+3
View File
@@ -0,0 +1,3 @@
from .manager import CronJobManager
__all__ = ["CronJobManager"]
+66
View File
@@ -0,0 +1,66 @@
import time
import uuid
from typing import Any
from astrbot.core.message.components import Plain
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.platform.astr_message_event import AstrMessageEvent
from astrbot.core.platform.astrbot_message import AstrBotMessage, MessageMember
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.platform.message_type import MessageType
from astrbot.core.platform.platform_metadata import PlatformMetadata
class CronMessageEvent(AstrMessageEvent):
"""Synthetic event used when a cron job triggers the main agent loop."""
def __init__(
self,
*,
context,
session: MessageSession,
message: str,
sender_id: str = "astrbot",
sender_name: str = "Scheduler",
extras: dict[str, Any] | None = None,
message_type: MessageType = MessageType.FRIEND_MESSAGE,
):
platform_meta = PlatformMetadata(
name="cron",
description="CronJob",
id=session.platform_id,
)
msg_obj = AstrBotMessage()
msg_obj.type = message_type
msg_obj.self_id = sender_id
msg_obj.session_id = session.session_id
msg_obj.message_id = uuid.uuid4().hex
msg_obj.sender = MessageMember(user_id=session.session_id, nickname=sender_name)
msg_obj.message = [Plain(message)]
msg_obj.message_str = message
msg_obj.raw_message = message
msg_obj.timestamp = int(time.time())
super().__init__(message, msg_obj, platform_meta, session.session_id)
# Ensure we use the original session for sending messages
self.session = session
self.context_obj = context
self.is_at_or_wake_command = True
self.is_wake = True
if extras:
self._extras.update(extras)
async def send(self, message: MessageChain):
if message is None:
return
await self.context_obj.send_message(self.session, message)
await super().send(message)
async def send_streaming(self, generator, use_fallback: bool = False):
async for chain in generator:
await self.send(chain)
__all__ = ["CronMessageEvent"]
+256
View File
@@ -0,0 +1,256 @@
import asyncio
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable
from zoneinfo import ZoneInfo
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from astrbot import logger
from astrbot.core.cron.events import CronMessageEvent
from astrbot.core.db import BaseDatabase
from astrbot.core.db.po import CronJob
from astrbot.core.platform.message_session import MessageSession
class CronJobManager:
"""Central scheduler for BasicCronJob and ActiveAgentCronJob."""
def __init__(self, ctx, db: BaseDatabase):
self.ctx = ctx
self.db = db
self.scheduler = AsyncIOScheduler()
self._basic_handlers: dict[str, Callable[..., Any]] = {}
self._lock = asyncio.Lock()
self._started = False
async def start(self):
async with self._lock:
if self._started:
return
self.scheduler.start()
self._started = True
await self.sync_from_db()
async def shutdown(self):
async with self._lock:
if not self._started:
return
self.scheduler.shutdown(wait=False)
self._started = False
async def sync_from_db(self):
jobs = await self.db.list_cron_jobs()
for job in jobs:
if not job.enabled or not job.persistent:
continue
if job.job_type == "basic" and job.job_id not in self._basic_handlers:
logger.warning(
"Skip scheduling basic cron job %s due to missing handler.",
job.job_id,
)
continue
self._schedule_job(job)
async def add_basic_job(
self,
*,
name: str,
cron_expression: str,
handler: Callable[..., Any | Awaitable[Any]],
description: str | None = None,
timezone: str | None = None,
payload: dict | None = None,
enabled: bool = True,
persistent: bool = False,
) -> CronJob:
job = await self.db.create_cron_job(
name=name,
job_type="basic",
cron_expression=cron_expression,
timezone=timezone,
payload=payload or {},
description=description,
enabled=enabled,
persistent=persistent,
)
self._basic_handlers[job.job_id] = handler
if enabled:
self._schedule_job(job)
return job
async def add_active_job(
self,
*,
name: str,
cron_expression: str,
payload: dict,
description: str | None = None,
timezone: str | None = None,
enabled: bool = True,
persistent: bool = True,
) -> CronJob:
job = await self.db.create_cron_job(
name=name,
job_type="active_agent",
cron_expression=cron_expression,
timezone=timezone,
payload=payload,
description=description,
enabled=enabled,
persistent=persistent,
)
if enabled:
self._schedule_job(job)
return job
async def update_job(self, job_id: str, **kwargs) -> CronJob | None:
job = await self.db.update_cron_job(job_id, **kwargs)
if not job:
return None
self._remove_scheduled(job_id)
if job.enabled:
self._schedule_job(job)
return job
async def delete_job(self, job_id: str) -> None:
self._remove_scheduled(job_id)
self._basic_handlers.pop(job_id, None)
await self.db.delete_cron_job(job_id)
async def list_jobs(self, job_type: str | None = None) -> list[CronJob]:
return await self.db.list_cron_jobs(job_type)
def _remove_scheduled(self, job_id: str):
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
def _schedule_job(self, job: CronJob):
if not self._started:
self.scheduler.start()
self._started = True
try:
tzinfo = None
if job.timezone:
try:
tzinfo = ZoneInfo(job.timezone)
except Exception:
logger.warning(
"Invalid timezone %s for cron job %s, fallback to system.",
job.timezone,
job.job_id,
)
trigger = CronTrigger.from_crontab(job.cron_expression, timezone=tzinfo)
self.scheduler.add_job(
self._run_job,
id=job.job_id,
trigger=trigger,
args=[job.job_id],
replace_existing=True,
misfire_grace_time=30,
)
asyncio.create_task(
self.db.update_cron_job(
job.job_id, next_run_time=self._get_next_run_time(job.job_id)
)
)
except Exception as e:
logger.error(f"Failed to schedule cron job {job.job_id}: {e!s}")
def _get_next_run_time(self, job_id: str):
aps_job = self.scheduler.get_job(job_id)
return aps_job.next_run_time if aps_job else None
async def _run_job(self, job_id: str):
job = await self.db.get_cron_job(job_id)
if not job or not job.enabled:
return
start_time = datetime.now(timezone.utc)
await self.db.update_cron_job(
job_id, status="running", last_run_at=start_time, last_error=None
)
status = "completed"
last_error = None
try:
if job.job_type == "basic":
await self._run_basic_job(job)
elif job.job_type == "active_agent":
await self._run_active_agent_job(job)
else:
raise ValueError(f"Unknown cron job type: {job.job_type}")
except Exception as e: # noqa: BLE001
status = "failed"
last_error = str(e)
logger.error(f"Cron job {job_id} failed: {e!s}", exc_info=True)
finally:
next_run = self._get_next_run_time(job_id)
await self.db.update_cron_job(
job_id,
status=status,
last_run_at=start_time,
last_error=last_error,
next_run_time=next_run,
)
async def _run_basic_job(self, job: CronJob):
handler = self._basic_handlers.get(job.job_id)
if not handler:
raise RuntimeError(f"Basic cron job handler not found for {job.job_id}")
payload = job.payload or {}
result = handler(**payload) if payload else handler()
if asyncio.iscoroutine(result):
await result
async def _run_active_agent_job(self, job: CronJob):
payload = job.payload or {}
session_str = payload.get("session")
if not session_str:
raise ValueError("ActiveAgentCronJob missing session.")
note = payload.get("note") or job.description or job.name
extras = {
"cron_job": {
"id": job.job_id,
"name": job.name,
"type": job.job_type,
"description": job.description,
"note": note,
},
"cron_payload": payload,
}
await self._dispatch_agent_event(
message=note,
session_str=session_str,
extras=extras,
)
async def _dispatch_agent_event(
self,
*,
message: str,
session_str: str,
extras: dict | None = None,
):
try:
session = (
session_str
if isinstance(session_str, MessageSession)
else MessageSession.from_str(session_str)
)
except Exception as e: # noqa: BLE001
logger.error(f"Invalid session for cron job: {e}")
return
cron_event = CronMessageEvent(
context=self.ctx,
session=session,
message=message,
extras=extras or {},
message_type=session.message_type,
)
await self.ctx.get_event_queue().put(cron_event)
__all__ = ["CronJobManager"]
+58
View File
@@ -13,6 +13,7 @@ from astrbot.core.db.po import (
CommandConfig,
CommandConflict,
ConversationV2,
CronJob,
Persona,
PersonaFolder,
PlatformMessageHistory,
@@ -511,6 +512,63 @@ class BaseDatabase(abc.ABC):
"""Get paginated session conversations with joined conversation and persona details, support search and platform filter."""
...
# ====
# Cron Job Management
# ====
@abc.abstractmethod
async def create_cron_job(
self,
name: str,
job_type: str,
cron_expression: str | None,
*,
timezone: str | None = None,
payload: dict | None = None,
description: str | None = None,
enabled: bool = True,
persistent: bool = True,
status: str | None = None,
job_id: str | None = None,
) -> CronJob:
"""Create and persist a cron job definition."""
...
@abc.abstractmethod
async def update_cron_job(
self,
job_id: str,
*,
name: str | None = None,
cron_expression: str | None = None,
timezone: str | None = None,
payload: dict | None = None,
description: str | None = None,
enabled: bool | None = None,
persistent: bool | None = None,
status: str | None = None,
next_run_time: datetime.datetime | None = None,
last_run_at: datetime.datetime | None = None,
last_error: str | None = None,
) -> CronJob | None:
"""Update fields of a cron job by job_id."""
...
@abc.abstractmethod
async def delete_cron_job(self, job_id: str) -> None:
"""Delete a cron job by its public job_id."""
...
@abc.abstractmethod
async def get_cron_job(self, job_id: str) -> CronJob | None:
"""Fetch a cron job by job_id."""
...
@abc.abstractmethod
async def list_cron_jobs(self, job_type: str | None = None) -> list[CronJob]:
"""List cron jobs, optionally filtered by job_type."""
...
# ====
# Platform Session Management
# ====
+30
View File
@@ -139,6 +139,36 @@ class Persona(TimestampMixin, SQLModel, table=True):
)
class CronJob(TimestampMixin, SQLModel, table=True):
"""Cron job definition for scheduler and WebUI management."""
__tablename__: str = "cron_jobs"
id: int | None = Field(
default=None,
primary_key=True,
sa_column_kwargs={"autoincrement": True},
)
job_id: str = Field(
max_length=64,
nullable=False,
unique=True,
default_factory=lambda: str(uuid.uuid4()),
)
name: str = Field(max_length=255, nullable=False)
description: str | None = Field(default=None, sa_type=Text)
job_type: str = Field(max_length=32, nullable=False) # basic | active_agent | background
cron_expression: str | None = Field(default=None, max_length=255)
timezone: str | None = Field(default=None, max_length=64)
payload: dict = Field(default_factory=dict, sa_type=JSON)
enabled: bool = Field(default=True)
persistent: bool = Field(default=True)
status: str = Field(default="scheduled", max_length=32)
last_run_at: datetime | None = Field(default=None)
next_run_time: datetime | None = Field(default=None)
last_error: str | None = Field(default=None, sa_type=Text)
class Preference(TimestampMixin, SQLModel, table=True):
"""This class represents preferences for bots."""
+116
View File
@@ -15,6 +15,7 @@ from astrbot.core.db.po import (
CommandConfig,
CommandConflict,
ConversationV2,
CronJob,
Persona,
PersonaFolder,
PlatformMessageHistory,
@@ -33,6 +34,7 @@ from astrbot.core.db.po import (
NOT_GIVEN = T.TypeVar("NOT_GIVEN")
TxResult = T.TypeVar("TxResult")
CRON_FIELD_NOT_SET = object()
class SQLiteDatabase(BaseDatabase):
@@ -1576,3 +1578,117 @@ class SQLiteDatabase(BaseDatabase):
),
)
return result.scalar_one_or_none()
# ====
# Cron Job Management
# ====
async def create_cron_job(
self,
name: str,
job_type: str,
cron_expression: str | None,
*,
timezone: str | None = None,
payload: dict | None = None,
description: str | None = None,
enabled: bool = True,
persistent: bool = True,
status: str | None = None,
job_id: str | None = None,
) -> CronJob:
async with self.get_db() as session:
session: AsyncSession
async with session.begin():
job = CronJob(
name=name,
job_type=job_type,
cron_expression=cron_expression,
timezone=timezone,
payload=payload or {},
description=description,
enabled=enabled,
persistent=persistent,
status=status or "scheduled",
)
if job_id:
job.job_id = job_id
session.add(job)
await session.flush()
await session.refresh(job)
return job
async def update_cron_job(
self,
job_id: str,
*,
name: str | None | object = CRON_FIELD_NOT_SET,
cron_expression: str | None | object = CRON_FIELD_NOT_SET,
timezone: str | None | object = CRON_FIELD_NOT_SET,
payload: dict | None | object = CRON_FIELD_NOT_SET,
description: str | None | object = CRON_FIELD_NOT_SET,
enabled: bool | None | object = CRON_FIELD_NOT_SET,
persistent: bool | None | object = CRON_FIELD_NOT_SET,
status: str | None | object = CRON_FIELD_NOT_SET,
next_run_time: datetime | None | object = CRON_FIELD_NOT_SET,
last_run_at: datetime | None | object = CRON_FIELD_NOT_SET,
last_error: str | None | object = CRON_FIELD_NOT_SET,
) -> CronJob | None:
async with self.get_db() as session:
session: AsyncSession
async with session.begin():
updates: dict = {}
for key, val in {
"name": name,
"cron_expression": cron_expression,
"timezone": timezone,
"payload": payload,
"description": description,
"enabled": enabled,
"persistent": persistent,
"status": status,
"next_run_time": next_run_time,
"last_run_at": last_run_at,
"last_error": last_error,
}.items():
if val is CRON_FIELD_NOT_SET:
continue
updates[key] = val
stmt = (
update(CronJob)
.where(col(CronJob.job_id) == job_id)
.values(**updates)
.execution_options(synchronize_session="fetch")
)
await session.execute(stmt)
result = await session.execute(
select(CronJob).where(col(CronJob.job_id) == job_id)
)
return result.scalar_one_or_none()
async def delete_cron_job(self, job_id: str) -> None:
async with self.get_db() as session:
session: AsyncSession
async with session.begin():
await session.execute(
delete(CronJob).where(col(CronJob.job_id) == job_id)
)
async def get_cron_job(self, job_id: str) -> CronJob | None:
async with self.get_db() as session:
session: AsyncSession
result = await session.execute(
select(CronJob).where(col(CronJob.job_id) == job_id)
)
return result.scalar_one_or_none()
async def list_cron_jobs(self, job_type: str | None = None) -> list[CronJob]:
async with self.get_db() as session:
session: AsyncSession
query = select(CronJob)
if job_type:
query = query.where(col(CronJob.job_type) == job_type)
query = query.order_by(desc(CronJob.created_at))
result = await session.execute(query)
return list(result.scalars().all())
@@ -47,9 +47,15 @@ from ...utils import (
SANDBOX_MODE_PROMPT,
TOOL_CALL_PROMPT,
TOOL_CALL_PROMPT_SKILLS_LIKE_MODE,
SEND_MESSAGE_TO_USER_TOOL,
decoded_blocked,
retrieve_knowledge_base,
)
from astrbot.core.tools.cron_tools import (
CREATE_CRON_JOB_TOOL,
DELETE_CRON_JOB_TOOL,
LIST_CRON_JOBS_TOOL,
)
class InternalAgentSubStage(Stage):
@@ -500,6 +506,44 @@ class InternalAgentSubStage(Stage):
req.func_tool.add_tool(FILE_DOWNLOAD_TOOL)
req.system_prompt += f"\n{SANDBOX_MODE_PROMPT}\n"
def _proactive_cron_job_tools(
self, req: ProviderRequest, event: AstrMessageEvent
) -> None:
"""Inject cron job context and tools into the provider request for proactive scheduling."""
if req.func_tool is None:
req.func_tool = ToolSet()
req.func_tool.add_tool(CREATE_CRON_JOB_TOOL)
req.func_tool.add_tool(DELETE_CRON_JOB_TOOL)
req.func_tool.add_tool(LIST_CRON_JOBS_TOOL)
cron_meta = event.get_extra("cron_job")
if cron_meta:
# The message event is triggered by a known cron job
if req.func_tool is None:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
job_name = cron_meta.get("name", "scheduled task")
note = cron_meta.get("note") or cron_meta.get("description") or ""
req.system_prompt += (
f"\n[Scheduler Context] This turn is triggered automatically by cron job "
f'"{job_name}" (type: {cron_meta.get("type", "unknown")}). '
"Act proactively based on the provided note and current context. "
"If you want to proactively notify the user, call `send_message_to_user` with a concise message.\n"
)
if note:
req.system_prompt += f"[Scheduler Note]: {note}\n"
if bg := event.get_extra("background_task_result"):
# The message event is triggered after a background task done
result_text = bg.get("result") or ""
if req.func_tool is None:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)
if result_text:
req.system_prompt += f"\n[Background Task Result] {result_text}\n"
async def process(
self, event: AstrMessageEvent, provider_wake_prefix: str
) -> AsyncGenerator[None, None]:
@@ -14,6 +14,8 @@ from astrbot.core.computer.tools import (
LocalPythonTool,
PythonTool,
)
from astrbot.core.message.message_event_result import MessageChain
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.star.context import Context
LLM_SAFETY_MODE_SYSTEM_PROMPT = """You are running in Safe Mode.
@@ -128,6 +130,53 @@ class KnowledgeBaseQueryTool(FunctionTool[AstrAgentContext]):
return result
@dataclass
class SendMessageToUserTool(FunctionTool[AstrAgentContext]):
name: str = "send_message_to_user"
description: str = (
"Send a short, proactive message to the user. "
"Use this to deliver scheduled/background task results or important updates without waiting for a new user prompt."
)
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "What you want to tell the user.",
},
"session": {
"type": "string",
"description": "Optional target session in format platform_id:message_type:session_id. Defaults to current session.",
},
},
"required": ["message"],
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
message = str(kwargs.get("message", "")).strip()
session = kwargs.get("session") or context.context.event.unified_msg_origin
if not message:
return "error: message is empty."
try:
target_session = (
MessageSession.from_str(session) if isinstance(session, str) else session
)
except Exception as e:
return f"error: invalid session: {e}"
await context.context.context.send_message(
target_session,
MessageChain().message(message),
)
return f"Message sent to session {target_session}"
async def retrieve_knowledge_base(
query: str,
umo: str,
@@ -205,6 +254,7 @@ async def retrieve_knowledge_base(
KNOWLEDGE_BASE_QUERY_TOOL = KnowledgeBaseQueryTool()
SEND_MESSAGE_TO_USER_TOOL = SendMessageToUserTool()
EXECUTE_SHELL_TOOL = ExecuteShellTool()
LOCAL_EXECUTE_SHELL_TOOL = ExecuteShellTool(is_local=True)
+4
View File
@@ -10,6 +10,7 @@ from astrbot.core.agent.message import Message
from astrbot.core.agent.runners.tool_loop_agent_runner import ToolLoopAgentRunner
from astrbot.core.agent.tool import ToolSet
from astrbot.core.astrbot_config_mgr import AstrBotConfigManager
from astrbot.core.cron.manager import CronJobManager
from astrbot.core.config.astrbot_config import AstrBotConfig
from astrbot.core.conversation_mgr import ConversationManager
from astrbot.core.db import BaseDatabase
@@ -65,6 +66,7 @@ class Context:
persona_manager: PersonaManager,
astrbot_config_mgr: AstrBotConfigManager,
knowledge_base_manager: KnowledgeBaseManager,
cron_manager: CronJobManager,
):
self._event_queue = event_queue
"""事件队列。消息平台通过事件队列传递消息事件。"""
@@ -86,6 +88,8 @@ class Context:
"""配置文件管理器(非webui)"""
self.kb_manager = knowledge_base_manager
"""知识库管理器"""
self.cron_manager = cron_manager
"""Cron job manager, initialized by core lifecycle."""
async def llm_generate(
self,
+30 -5
View File
@@ -6,6 +6,7 @@ from astrbot import logger
from astrbot.core.agent.agent import Agent
from astrbot.core.agent.handoff import HandoffTool
from astrbot.core.astr_agent_context import AstrAgentContext
from astrbot.core.persona_mgr import PersonaManager
from astrbot.core.provider.func_tool_manager import FunctionToolManager
@@ -16,10 +17,11 @@ class SubAgentOrchestrator:
Execution happens via HandoffTool in FunctionToolExecutor.
"""
def __init__(self, tool_mgr: FunctionToolManager):
def __init__(self, tool_mgr: FunctionToolManager, persona_mgr: PersonaManager):
self._tool_mgr = tool_mgr
self._persona_mgr = persona_mgr
def reload_from_config(self, cfg: dict[str, Any]) -> None:
async def reload_from_config(self, cfg: dict[str, Any]) -> None:
enabled = bool(cfg.get("main_enable", False))
if not enabled:
@@ -46,21 +48,44 @@ class SubAgentOrchestrator:
if not name:
continue
persona_id = item.get("persona_id")
persona_data = None
if persona_id:
try:
persona_data = await self._persona_mgr.get_persona(persona_id)
except StopIteration:
logger.warning(
"SubAgent persona %s not found, fallback to inline prompt.",
persona_id,
)
instructions = str(item.get("system_prompt", "")).strip()
public_description = str(item.get("public_description", "")).strip()
provider_id = item.get("provider_id")
if provider_id is not None:
provider_id = str(provider_id).strip() or None
tools = item.get("tools", [])
if not isinstance(tools, list):
begin_dialogs = None
if persona_data:
instructions = persona_data.system_prompt or instructions
begin_dialogs = persona_data.begin_dialogs
tools = persona_data.tools
if public_description == "" and persona_data.system_prompt:
public_description = persona_data.system_prompt[:120]
if tools is None:
tools = None
elif not isinstance(tools, list):
tools = []
tools = [str(t).strip() for t in tools if str(t).strip()]
else:
tools = [str(t).strip() for t in tools if str(t).strip()]
agent = Agent[AstrAgentContext](
name=name,
instructions=instructions,
tools=tools,
tools=tools, # type: ignore
)
agent.begin_dialogs = begin_dialogs
# The tool description should be a short description for the main LLM,
# while the subagent system prompt can be longer/more specific.
handoff = HandoffTool(
+144
View File
@@ -0,0 +1,144 @@
from pydantic import Field
from pydantic.dataclasses import dataclass
from astrbot.core.agent.run_context import ContextWrapper
from astrbot.core.agent.tool import FunctionTool, ToolExecResult
from astrbot.core.astr_agent_context import AstrAgentContext
@dataclass
class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
name: str = "create_cron_job"
description: str = (
"Create a scheduled active agent task using a cron expression. "
"Use this when the user asks for recurring tasks (e.g., daily reports)."
)
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"cron_expression": {
"type": "string",
"description": "Cron expression defining when to trigger (e.g., '0 8 * * *').",
},
"note": {
"type": "string",
"description": "Instruction for the future agent run when the job triggers.",
},
"name": {
"type": "string",
"description": "Optional job name for identification.",
},
},
"required": ["cron_expression", "note"],
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
cron_expression = kwargs.get("cron_expression")
note = str(kwargs.get("note", "")).strip()
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
if not cron_expression or not note:
return "error: cron_expression and note are required."
payload = {
"session": context.context.event.unified_msg_origin,
"note": note,
}
job = await cron_mgr.add_active_job(
name=name,
cron_expression=str(cron_expression),
payload=payload,
description=note,
)
next_run = job.next_run_time
return (
f"Scheduled cron job {job.job_id} ({job.name}) with expression '{cron_expression}'. "
f"Next run: {next_run}"
)
@dataclass
class DeleteCronJobTool(FunctionTool[AstrAgentContext]):
name: str = "delete_cron_job"
description: str = "Delete a cron job by its job_id."
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The job_id returned when the job was created.",
}
},
"required": ["job_id"],
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
job_id = kwargs.get("job_id")
if not job_id:
return "error: job_id is required."
await cron_mgr.delete_job(str(job_id))
return f"Deleted cron job {job_id}."
@dataclass
class ListCronJobsTool(FunctionTool[AstrAgentContext]):
name: str = "list_cron_jobs"
description: str = "List existing cron jobs for inspection."
parameters: dict = Field(
default_factory=lambda: {
"type": "object",
"properties": {
"job_type": {
"type": "string",
"description": "Optional filter: basic or active_agent.",
}
},
}
)
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
job_type = kwargs.get("job_type")
jobs = await cron_mgr.list_jobs(job_type)
if not jobs:
return "No cron jobs found."
lines = []
for j in jobs:
lines.append(
f"{j.job_id} | {j.name} | {j.job_type} | enabled={j.enabled} | next={j.next_run_time}"
)
return "\n".join(lines)
CREATE_CRON_JOB_TOOL = CreateActiveCronTool()
DELETE_CRON_JOB_TOOL = DeleteCronJobTool()
LIST_CRON_JOBS_TOOL = ListCronJobsTool()
__all__ = [
"CREATE_CRON_JOB_TOOL",
"DELETE_CRON_JOB_TOOL",
"LIST_CRON_JOBS_TOOL",
"CreateActiveCronTool",
"DeleteCronJobTool",
"ListCronJobsTool",
]
+2
View File
@@ -5,6 +5,7 @@ from .chatui_project import ChatUIProjectRoute
from .command import CommandRoute
from .config import ConfigRoute
from .conversation import ConversationRoute
from .cron import CronRoute
from .file import FileRoute
from .knowledge_base import KnowledgeBaseRoute
from .log import LogRoute
@@ -27,6 +28,7 @@ __all__ = [
"CommandRoute",
"ConfigRoute",
"ConversationRoute",
"CronRoute",
"FileRoute",
"KnowledgeBaseRoute",
"LogRoute",
+130
View File
@@ -0,0 +1,130 @@
import traceback
from datetime import datetime
from quart import jsonify, request
from astrbot.core import logger
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from .route import Response, Route, RouteContext
class CronRoute(Route):
def __init__(self, context: RouteContext, core_lifecycle: AstrBotCoreLifecycle) -> None:
super().__init__(context)
self.core_lifecycle = core_lifecycle
self.routes = [
("/cron/jobs", ("GET", self.list_jobs)),
("/cron/jobs", ("POST", self.create_job)),
("/cron/jobs/<job_id>", ("PATCH", self.update_job)),
("/cron/jobs/<job_id>", ("DELETE", self.delete_job)),
]
self.register_routes()
def _serialize_job(self, job):
data = job.model_dump() if hasattr(job, "model_dump") else job.__dict__
for k in ["created_at", "updated_at", "last_run_at", "next_run_time"]:
if isinstance(data.get(k), datetime):
data[k] = data[k].isoformat()
return data
async def list_jobs(self):
try:
cron_mgr = self.core_lifecycle.cron_manager
if cron_mgr is None:
return jsonify(Response().error("Cron manager not initialized").__dict__)
job_type = request.args.get("type")
jobs = await cron_mgr.list_jobs(job_type)
data = [self._serialize_job(j) for j in jobs]
return jsonify(Response().ok(data=data).__dict__)
except Exception as e: # noqa: BLE001
logger.error(traceback.format_exc())
return jsonify(Response().error(f"Failed to list jobs: {e!s}").__dict__)
async def create_job(self):
try:
cron_mgr = self.core_lifecycle.cron_manager
if cron_mgr is None:
return jsonify(Response().error("Cron manager not initialized").__dict__)
payload = await request.json
if not isinstance(payload, dict):
return jsonify(Response().error("Invalid payload").__dict__)
job_type = payload.get("job_type", "active_agent")
name = payload.get("name") or "active_agent_task"
cron_expression = payload.get("cron_expression")
note = payload.get("note") or payload.get("description") or name
session = payload.get("session")
persona_id = payload.get("persona_id")
provider_id = payload.get("provider_id")
timezone = payload.get("timezone")
enabled = bool(payload.get("enabled", True))
if not cron_expression or not session:
return jsonify(Response().error("cron_expression and session are required").__dict__)
job_payload = {
"session": session,
"note": note,
"persona_id": persona_id,
"provider_id": provider_id,
}
if job_type != "active_agent":
return jsonify(
Response().error("Only active_agent jobs are supported now.").__dict__
)
job = await cron_mgr.add_active_job(
name=name,
cron_expression=cron_expression,
payload=job_payload,
description=note,
timezone=timezone,
enabled=enabled,
)
return jsonify(Response().ok(data=self._serialize_job(job)).__dict__)
except Exception as e: # noqa: BLE001
logger.error(traceback.format_exc())
return jsonify(Response().error(f"Failed to create job: {e!s}").__dict__)
async def update_job(self, job_id: str):
try:
cron_mgr = self.core_lifecycle.cron_manager
if cron_mgr is None:
return jsonify(Response().error("Cron manager not initialized").__dict__)
payload = await request.json
if not isinstance(payload, dict):
return jsonify(Response().error("Invalid payload").__dict__)
updates = {
"name": payload.get("name"),
"cron_expression": payload.get("cron_expression"),
"description": payload.get("description"),
"enabled": payload.get("enabled"),
"timezone": payload.get("timezone"),
}
# remove None values to avoid unwanted resets
updates = {k: v for k, v in updates.items() if v is not None}
job = await cron_mgr.update_job(job_id, **updates)
if not job:
return jsonify(Response().error("Job not found").__dict__)
return jsonify(Response().ok(data=self._serialize_job(job)).__dict__)
except Exception as e: # noqa: BLE001
logger.error(traceback.format_exc())
return jsonify(Response().error(f"Failed to update job: {e!s}").__dict__)
async def delete_job(self, job_id: str):
try:
cron_mgr = self.core_lifecycle.cron_manager
if cron_mgr is None:
return jsonify(Response().error("Cron manager not initialized").__dict__)
await cron_mgr.delete_job(job_id)
return jsonify(Response().ok(message="deleted").__dict__)
except Exception as e: # noqa: BLE001
logger.error(traceback.format_exc())
return jsonify(Response().error(f"Failed to delete job: {e!s}").__dict__)
+1
View File
@@ -61,6 +61,7 @@ class SubAgentRoute(Route):
for a in data["agents"]:
if isinstance(a, dict):
a.setdefault("provider_id", None)
a.setdefault("persona_id", None)
return jsonify(Response().ok(data=data).__dict__)
except Exception as e:
logger.error(traceback.format_exc())
+1
View File
@@ -90,6 +90,7 @@ class AstrBotDashboard:
core_lifecycle,
)
self.persona_route = PersonaRoute(self.context, db, core_lifecycle)
self.cron_route = CronRoute(self.context, core_lifecycle)
self.t2i_route = T2iRoute(self.context, core_lifecycle)
self.kb_route = KnowledgeBaseRoute(self.context, core_lifecycle)
self.platform_route = PlatformRoute(self.context, core_lifecycle)
@@ -8,6 +8,7 @@
"toolUse": "MCP Tools",
"config": "Config",
"chat": "Chat",
"cron": "Cron Jobs",
"extension": "Extensions",
"conversation": "Conversations",
"sessionManagement": "Custom Rules",
@@ -9,6 +9,7 @@
"extension": "插件",
"config": "配置文件",
"chat": "聊天",
"cron": "定时任务",
"conversation": "对话数据",
"sessionManagement": "自定义规则",
"console": "平台日志",
@@ -31,4 +32,4 @@
"selectVersion": "选择版本",
"current": "当前"
}
}
}
@@ -57,6 +57,11 @@ const sidebarItem: menu[] = [
icon: 'mdi-vector-link',
to: '/subagent'
},
{
title: 'core.navigation.cron',
icon: 'mdi-clock-outline',
to: '/cron'
},
{
title: 'core.navigation.conversation',
icon: 'mdi-database',
+5
View File
@@ -61,6 +61,11 @@ const MainRoutes = {
path: '/subagent',
component: () => import('@/views/SubAgentPage.vue')
},
{
name: 'CronJobs',
path: '/cron',
component: () => import('@/views/CronJobPage.vue')
},
{
name: 'Console',
path: '/console',
+236
View File
@@ -0,0 +1,236 @@
<template>
<div class="cron-page">
<div class="d-flex align-center justify-space-between mb-4">
<div>
<h2 class="text-h5 font-weight-bold">Cron Job 管理</h2>
<div class="text-body-2 text-medium-emphasis">查看创建与管理定时任务ActiveAgent & 后台任务</div>
</div>
<div class="d-flex align-center" style="gap: 8px;">
<v-btn variant="tonal" color="primary" :loading="loading" @click="loadJobs">刷新</v-btn>
</div>
</div>
<v-card class="rounded-lg mb-6" variant="flat">
<v-card-text>
<div class="text-subtitle-1 font-weight-bold mb-3">新建主动型 Agent 定时任务</div>
<v-row dense>
<v-col cols="12" md="4">
<v-text-field v-model="form.name" label="任务名称" variant="outlined" density="comfortable" hide-details />
</v-col>
<v-col cols="12" md="4">
<v-text-field v-model="form.cron_expression" label="Cron 表达式" variant="outlined" density="comfortable" placeholder="0 8 * * *" hide-details />
<div class="text-caption text-medium-emphasis mt-1">使用标准 5 Cron0 8 * * * 表示每天 8:00</div>
</v-col>
<v-col cols="12" md="4">
<v-text-field v-model="form.session" label="Session (platform:type:id)" variant="outlined" density="comfortable" placeholder="webchat:friend:SESSION_ID" hide-details />
<div class="text-caption text-medium-emphasis mt-1">从聊天侧栏或 Session 管理中复制 unified_msg_origin</div>
</v-col>
<v-col cols="12">
<v-textarea v-model="form.note" label="给未来 Agent 的说明" variant="outlined" rows="3" auto-grow hide-details />
</v-col>
<v-col cols="12" md="4">
<v-text-field v-model="form.persona_id" label="Persona (可选)" variant="outlined" density="comfortable" hide-details />
</v-col>
<v-col cols="12" md="4">
<v-text-field v-model="form.provider_id" label="Provider ID (可选)" variant="outlined" density="comfortable" hide-details />
</v-col>
<v-col cols="12" md="4">
<v-text-field v-model="form.timezone" label="时区 (可选, 例如 Asia/Shanghai)" variant="outlined" density="comfortable" hide-details />
</v-col>
<v-col cols="12" md="3">
<v-switch v-model="form.enabled" inset color="primary" label="启用" hide-details />
</v-col>
<v-col cols="12" class="d-flex justify-end">
<v-btn color="primary" variant="flat" :loading="saving" @click="createJob">创建任务</v-btn>
</v-col>
</v-row>
</v-card-text>
</v-card>
<v-card class="rounded-lg" variant="flat">
<v-card-text>
<div class="d-flex align-center justify-space-between mb-3">
<div class="text-subtitle-1 font-weight-bold">已注册任务</div>
</div>
<v-alert v-if="!jobs.length && !loading" type="info" variant="tonal">暂无定时任务</v-alert>
<v-data-table
:items="jobs"
:headers="headers"
:loading="loading"
item-key="job_id"
density="comfortable"
class="elevation-0"
>
<template #item.name="{ item }">
<div class="font-weight-medium">{{ item.name }}</div>
<div class="text-caption text-medium-emphasis">{{ item.description }}</div>
</template>
<template #item.type="{ item }">
<v-chip size="small" color="primary" variant="tonal">{{ item.job_type }}</v-chip>
</template>
<template #item.cron_expression="{ item }">
<div>{{ item.cron_expression || '—' }}</div>
<div class="text-caption text-medium-emphasis">{{ item.timezone || 'local' }}</div>
</template>
<template #item.next_run_time="{ item }">{{ formatTime(item.next_run_time) }}</template>
<template #item.status="{ item }">
<v-chip :color="statusColor(item.status)" size="small" variant="flat">{{ item.status }}</v-chip>
</template>
<template #item.actions="{ item }">
<div class="d-flex" style="gap: 8px;">
<v-switch
v-model="item.enabled"
inset
density="compact"
hide-details
color="primary"
@change="toggleJob(item)"
/>
<v-btn size="small" variant="text" color="primary" @click="deleteJob(item)">删除</v-btn>
</div>
</template>
</v-data-table>
</v-card-text>
</v-card>
<v-snackbar v-model="snackbar.show" :color="snackbar.color" timeout="2600">
{{ snackbar.message }}
</v-snackbar>
</div>
</template>
<script setup lang="ts">
import { onMounted, ref } from 'vue'
import axios from 'axios'
const loading = ref(false)
const saving = ref(false)
const jobs = ref<any[]>([])
const form = ref({
name: 'active_agent_task',
cron_expression: '',
session: '',
note: '',
persona_id: '',
provider_id: '',
timezone: '',
enabled: true
})
const snackbar = ref({ show: false, message: '', color: 'success' })
const headers = [
{ title: '名称', key: 'name', minWidth: 200 },
{ title: '类型', key: 'type', width: 110 },
{ title: 'Cron', key: 'cron_expression', minWidth: 160 },
{ title: '下一次执行', key: 'next_run_time', minWidth: 160 },
{ title: '状态', key: 'status', width: 120 },
{ title: '操作', key: 'actions', width: 160, sortable: false }
]
function toast(message: string, color: 'success' | 'error' | 'warning' = 'success') {
snackbar.value = { show: true, message, color }
}
function formatTime(val: any): string {
if (!val) return '—'
try {
return new Date(val).toLocaleString()
} catch (e) {
return String(val)
}
}
function statusColor(status: string) {
switch ((status || '').toLowerCase()) {
case 'running':
return 'blue'
case 'failed':
return 'error'
case 'completed':
return 'success'
default:
return 'secondary'
}
}
async function loadJobs() {
loading.value = true
try {
const res = await axios.get('/api/cron/jobs')
if (res.data.status === 'ok') {
jobs.value = Array.isArray(res.data.data) ? res.data.data : []
} else {
toast(res.data.message || '获取任务失败', 'error')
}
} catch (e: any) {
toast(e?.response?.data?.message || '获取任务失败', 'error')
} finally {
loading.value = false
}
}
async function createJob() {
if (!form.value.cron_expression || !form.value.session || !form.value.note) {
toast('请填写 cron、session 和说明', 'warning')
return
}
saving.value = true
try {
const payload = { ...form.value, job_type: 'active_agent' }
const res = await axios.post('/api/cron/jobs', payload)
if (res.data.status === 'ok') {
toast('创建成功')
await loadJobs()
} else {
toast(res.data.message || '创建失败', 'error')
}
} catch (e: any) {
toast(e?.response?.data?.message || '创建失败', 'error')
} finally {
saving.value = false
}
}
async function toggleJob(job: any) {
try {
const res = await axios.patch(`/api/cron/jobs/${job.job_id}`, { enabled: job.enabled })
if (res.data.status !== 'ok') {
toast(res.data.message || '更新失败', 'error')
await loadJobs()
}
} catch (e: any) {
toast(e?.response?.data?.message || '更新失败', 'error')
await loadJobs()
}
}
async function deleteJob(job: any) {
try {
const res = await axios.delete(`/api/cron/jobs/${job.job_id}`)
if (res.data.status === 'ok') {
toast('已删除')
jobs.value = jobs.value.filter((j) => j.job_id !== job.job_id)
} else {
toast(res.data.message || '删除失败', 'error')
}
} catch (e: any) {
toast(e?.response?.data?.message || '删除失败', 'error')
}
}
onMounted(() => {
loadJobs()
})
</script>
<style scoped>
.cron-page {
padding: 20px;
padding-top: 8px;
padding-bottom: 40px;
}
</style>
+40 -201
View File
@@ -124,85 +124,43 @@
class="subagent-provider"
/>
</v-col>
<v-col cols="12">
<v-col cols="12" md="6">
<v-autocomplete
v-model="agent.__tool_group"
:items="toolGroupOptions"
v-model="agent.persona_id"
:items="personaOptions"
item-title="title"
item-value="value"
label="选择插件/来源"
label="选择 Persona"
variant="outlined"
density="comfortable"
class="subagent-tools"
:loading="toolsLoading"
:disabled="toolsLoading"
clearable
@update:modelValue="onGroupChanged(agent)"
:loading="personaLoading"
:disabled="personaLoading"
hint="SubAgent 将直接继承所选 Persona 的系统设定与工具。"
persistent-hint
/>
</v-col>
<v-col cols="12">
<v-autocomplete
v-model="agent.__tool_group_selected"
:items="getToolOptionsByGroup(agent.__tool_group)"
item-title="title"
item-value="value"
label="选择该插件下的工具(多选)"
<v-col cols="12" md="6">
<v-text-field
v-model="agent.public_description"
label="对主 LLM 的描述(用于决定是否 handoff)"
variant="outlined"
density="comfortable"
class="subagent-tools"
multiple
chips
closable-chips
:menu-props="{ maxHeight: 380 }"
:max-chips="8"
:loading="toolsLoading"
:disabled="toolsLoading || !agent.__tool_group"
clearable
@update:modelValue="syncGroupSelectionToAgentTools(agent)"
hint="这段会作为 transfer_to_* 工具的描述给主 LLM 看,建议简短明确。"
persistent-hint
/>
<div class="text-caption text-medium-emphasis mt-1">
已分配{{ (agent.tools || []).length }} 个工具
</div>
</v-col>
</v-row>
<v-textarea
v-model="agent.public_description"
label="对主 LLM 的描述(用于决定是否 handoff"
variant="outlined"
rows="3"
auto-grow
hint="这段会作为 transfer_to_* 工具的描述给主 LLM 看,建议简短明确。"
persistent-hint
/>
<v-textarea
v-model="agent.system_prompt"
label="SubAgent System Prompt(该 SubAgent 自己的指令)"
variant="outlined"
rows="4"
auto-grow
hint="这段只给该 SubAgent 自己作为 system prompt 使用,可以更长、更严格。"
persistent-hint
class="mt-3"
/>
<div class="mt-3">
<div class="text-caption text-medium-emphasis">预览 LLM 将看到的 handoff 工具</div>
<div class="d-flex align-center" style="gap: 8px; flex-wrap: wrap;">
<v-chip size="small" variant="outlined" color="primary">
transfer_to_{{ agent.name || '...' }}
</v-chip>
<v-chip
v-for="t in (agent.tools || [])"
:key="t"
size="small"
variant="tonal"
color="secondary"
>
{{ t }}
<v-chip size="small" variant="tonal" color="secondary" v-if="agent.persona_id">
Persona: {{ agent.persona_id }}
</v-chip>
</div>
</div>
@@ -224,25 +182,13 @@ import { onMounted, ref } from 'vue'
import axios from 'axios'
import ProviderSelector from '@/components/shared/ProviderSelector.vue'
type ToolOption = { title: string; value: string }
type ToolGroup = {
key: string
label: string
options: ToolOption[]
}
type SubAgentItem = {
__key: string
name: string
persona_id: string
public_description: string
system_prompt: string
tools: string[]
enabled: boolean
provider_id?: string
// UI-only: current tool group selection state
__tool_group?: string
__tool_group_selected?: string[]
}
type MainMode = 'disabled' | 'unassigned_to_main' | 'handoff_only'
@@ -254,7 +200,6 @@ type SubAgentConfig = {
const loading = ref(false)
const saving = ref(false)
const toolsLoading = ref(false)
const snackbar = ref({
show: false,
@@ -277,59 +222,8 @@ const cfg = ref<SubAgentConfig>({
agents: []
})
const toolGroups = ref<ToolGroup[]>([])
const toolGroupOptions = ref<{ title: string; value: string }[]>([])
function modulePathToLabel(mp: unknown): string {
const raw = (mp ?? '').toString().trim()
if (!raw) return '其他/未归类'
// Typical module paths look like:
// - data.plugins.<plugin_name>.main
// - astrbot.builtin_stars.<star_name>.main
// - astrbot.plugins.<plugin_name>.main
// We strip common prefixes and the trailing ".main" for display.
const trimmed = raw.replace(/\.main$/, '')
if (trimmed.startsWith('data.plugins.')) return trimmed.replace(/^data\.plugins\./, '')
if (trimmed.startsWith('astrbot.builtin_stars.')) return `builtin: ${trimmed.replace(/^astrbot\.builtin_stars\./, '')}`
if (trimmed.startsWith('astrbot.plugins.')) return trimmed.replace(/^astrbot\.plugins\./, '')
if (raw.startsWith('plugins.')) return raw.replace(/^plugins\./, '')
if (raw.startsWith('builtin_stars.')) return `builtin: ${raw.replace(/^builtin_stars\./, '')}`
if (raw.startsWith('core.')) return `core: ${raw.replace(/^core\./, '')}`
return raw
}
function rebuildToolGroupOptions() {
toolGroupOptions.value = toolGroups.value.map(g => ({ title: g.label, value: g.key }))
}
function getToolOptionsByGroup(groupKey: string | undefined): ToolOption[] {
if (!groupKey) return []
return toolGroups.value.find(g => g.key === groupKey)?.options ?? []
}
function onGroupChanged(agent: SubAgentItem) {
// When switching groups, reflect already-assigned tools for that group.
const groupOptions = getToolOptionsByGroup(agent.__tool_group)
const allowed = new Set(groupOptions.map(o => o.value))
agent.__tool_group_selected = (agent.tools || []).filter(t => allowed.has(t))
}
function syncGroupSelectionToAgentTools(agent: SubAgentItem) {
const groupOptions = getToolOptionsByGroup(agent.__tool_group)
const allowed = new Set(groupOptions.map(o => o.value))
const selected = Array.isArray(agent.__tool_group_selected)
? agent.__tool_group_selected
: []
// Replace only tools belonging to this group; keep tools from other groups intact.
const kept = (agent.tools || []).filter(t => !allowed.has(t))
const merged = [...kept, ...selected.filter(t => allowed.has(t))]
const seen = new Set<string>()
agent.tools = merged.filter(t => (seen.has(t) ? false : (seen.add(t), true)))
}
const personaOptions = ref<{ title: string; value: string }[]>([])
const personaLoading = ref(false)
function normalizeConfig(raw: any): SubAgentConfig {
const main_enable = !!raw?.main_enable
@@ -341,23 +235,19 @@ function normalizeConfig(raw: any): SubAgentConfig {
const agents: SubAgentItem[] = agentsRaw.map((a: any, i: number) => {
const name = (a?.name ?? '').toString()
const persona_id = (a?.persona_id ?? '').toString()
const public_description = (a?.public_description ?? '').toString()
const system_prompt = (a?.system_prompt ?? '').toString()
const tools = Array.isArray(a?.tools) ? a.tools.map((x: any) => String(x)) : []
const enabled = a?.enabled !== false
const provider_id = (a?.provider_id ?? undefined) as (string | undefined)
return {
__key: `${Date.now()}_${i}_${Math.random().toString(16).slice(2)}`,
name,
persona_id,
public_description,
system_prompt,
tools,
enabled
,
provider_id,
__tool_group: undefined,
__tool_group_selected: []
provider_id
}
})
@@ -380,66 +270,21 @@ async function loadConfig() {
}
}
async function loadTools() {
toolsLoading.value = true
async function loadPersonas() {
personaLoading.value = true
try {
// Prefer our dedicated endpoint (includes handler_module_path)
const res = await axios.get('/api/subagent/available-tools')
const res = await axios.get('/api/persona/list')
if (res.data.status === 'ok') {
const list = Array.isArray(res.data.data) ? res.data.data : []
const groups = new Map<string, ToolOption[]>()
for (const t of list) {
if (!t?.name) continue
const name = String(t.name)
const desc = (t.description ?? '').toString().trim()
const mp = (t.handler_module_path ?? '').toString()
const key = mp || '__other__'
const options = groups.get(key) ?? []
options.push({ title: desc ? `${name}${desc}` : name, value: name })
groups.set(key, options)
}
toolGroups.value = Array.from(groups.entries())
.map(([key, options]) => ({
key,
label: modulePathToLabel(key === '__other__' ? '' : key),
options: options.sort((a, b) => a.value.localeCompare(b.value))
}))
.sort((a, b) => a.label.localeCompare(b.label))
rebuildToolGroupOptions()
} else {
toast(res.data.message || '获取工具列表失败', 'error')
}
} catch {
// Fallback to existing tools list endpoint
try {
const res2 = await axios.get('/api/tools/list')
if (res2.data.status === 'ok') {
const list = Array.isArray(res2.data.data) ? res2.data.data : []
const options = list
.filter((t: any) => !!t?.name)
.map((t: any) => {
const name = String(t.name)
const desc = (t.description ?? '').toString().trim()
return { title: desc ? `${name}${desc}` : name, value: name }
})
.sort((a: ToolOption, b: ToolOption) => a.value.localeCompare(b.value))
toolGroups.value = [
{
key: '__all__',
label: '全部工具',
options
}
]
rebuildToolGroupOptions()
}
} catch {
toast('获取工具列表失败', 'error')
personaOptions.value = list.map((p: any) => ({
title: p.persona_id,
value: p.persona_id
}))
}
} catch (e: any) {
toast(e?.response?.data?.message || '获取 Persona 列表失败', 'error')
} finally {
toolsLoading.value = false
personaLoading.value = false
}
}
@@ -447,13 +292,10 @@ function addAgent() {
cfg.value.agents.push({
__key: `${Date.now()}_${Math.random().toString(16).slice(2)}`,
name: '',
persona_id: '',
public_description: '',
system_prompt: '',
tools: [],
enabled: true,
provider_id: undefined,
__tool_group: undefined,
__tool_group_selected: []
provider_id: undefined
})
}
@@ -479,6 +321,10 @@ function validateBeforeSave(): boolean {
return false
}
seen.add(name)
if (!a.persona_id) {
toast(`SubAgent ${name} 未选择 Persona`, 'warning')
return false
}
}
return true
}
@@ -494,9 +340,8 @@ async function save() {
main_tools_policy: mode,
agents: cfg.value.agents.map(a => ({
name: a.name,
persona_id: a.persona_id,
public_description: a.public_description,
system_prompt: a.system_prompt,
tools: a.tools,
enabled: a.enabled,
provider_id: a.provider_id
}))
@@ -516,13 +361,7 @@ async function save() {
}
async function reload() {
await Promise.all([loadConfig(), loadTools()])
// Initialize UI-only selections after tools load.
for (const a of cfg.value.agents) {
if (!a.__tool_group) a.__tool_group = undefined
if (!Array.isArray(a.__tool_group_selected)) a.__tool_group_selected = []
}
await Promise.all([loadConfig(), loadPersonas()])
}
onMounted(() => {