feat: implement CronJob system with support for one-time tasks and enhanced UI for task management

This commit is contained in:
Soulter
2026-02-01 22:04:30 +08:00
parent 3f8d8b5033
commit f66edc8d45
8 changed files with 253 additions and 30 deletions
+18
View File
@@ -0,0 +1,18 @@
我需要让 Agent 能够在未来提醒自己去做某些事情,这样 Agent 能够主动地去完成一些任务,而不是等用户主动来下达命令。
你需要实现一个 CronJob 系统,允许 Agent 创建未来任务,并且在未来的某个时间点自动触发这些任务的执行.
CronJob 系统分为 BasicCronJob 和 ActiveAgentCronJob 两种类型。前者只是简单的提供一个定时任务功能(给插件用),而后者则允许 Agent 主动地去完成一些任务。BasicCronJob 不必多说,就是定时执行某个函数。对于 ActiveAgentCronJobAgent 应该可以主动管理(比如通过Tool来管理)这些 CronJobs,当添加的时候,Agent 可以给 CronJob 捎一段文字,以说明未来的自己需要做什么事情。比如说,Agent 在听到用户 “每天早上都给我整理一份今日早报” 之后,应该可以创建 Cron Job,并且自己写脚本来完成这个任务,并且注册 cron job。Agent 给未来的自己捎去的信息应该只是呈现为一段文字,这样可以保持设计简约。当触发后, CronJobManager 会调用 MainAgent 的一轮循环,MainAgent 通过上下文知道这是一个定时任务触发的循环,从而执行相应的操作。
此外,我还有一个需求,后台长任务。需要给当前的 FunctionTool 类增加一个属性,is_background_task: bool = False,插件可以通过这个属性来声明这是一个异步任务。这是为了解决一些 Tool 需要长时间运行的问题,比如 Deep Search tool 需要长时间搜索网页内容、Sub Agent 需要长时间运行来完成一个复杂任务。
基于上面的讨论,我觉得,应该:
1. 需要给当前的 FunctionTool 类增加一个属性is_background_task: bool = Falsetool runner 在执行这个 tool 的时候,如果发现是后台任务,就不等待结果返回,而是直接返回一个任务 ID (已经创建成功提示)的结果,tool runner 在后台继续执行这个任务。当任务完成之后,任务的结果回传给 MainAgent(其实就是再执行一次 main agent loop,但是上下文应该是最新的),并且 MainAgent 此时应该有 send_message_to_user 的工具,通过这个工具可以选择是否主动通知用户任务完成的结果。
2. 增加一个 CronJobManager 类,负责管理所有的定时任务。Agent 可以通过调用这个类的方法来创建、删除、修改定时任务。通过 cron expression 来定义触发条件。
3. CronJobManager 除了管理普通的定时任务(比如插件可能有一些自己的定时任务),还有一种特殊的任务类型,就是上面提到的主动型 Agent 任务。用户提需求,MainAgent 选择性地调用 CronJobManager 的方法来创建这些任务,并且在任务触发时,CronJobManager 的回调就是执行 MainAgent 的一轮循环(需要加 send_message_to_user tool),MainAgent 通过上下文知道这是一个定时任务触发的循环,从而执行相应的操作。
4. WebUI 需要增加 Cron Job 管理界面,用户可以在界面上查看、创建、修改、删除定时任务。对于主动型 Agent 任务,用户可以看到任务的描述、触发条件等信息。
5. 除此之外,现在的代码中已经有了 subagent 的管理。WebUI 可以创建 SubAgent,但是还没写完。除了结合上面我说的之外,你还需要将 SubAgent 与 Persona 结合起来——因为 Persona 是一个包含了 tool、skills、name、description 的完整体,所以 SubAgent 应该直接继承 Persona 的定义,而不是单独定义 SubAgent。SubAgent 本质上就是一个有特定角色和能力的 Persona!多么美妙的设计啊!
6. 为了实现大一统,is_background_task = True 的时候,后台任务也挂到 CronJobManager 上去管理,只不过这个是立即触发的任务,不需要等到未来某个时间点才触发罢了。
我希望设计尽可能简单,但是强大。
+33 -5
View File
@@ -7,6 +7,7 @@ from zoneinfo import ZoneInfo
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from astrbot import logger
from astrbot.core.agent.tool import ToolSet
@@ -91,13 +92,18 @@ class CronJobManager:
self,
*,
name: str,
cron_expression: str,
cron_expression: str | None,
payload: dict,
description: str | None = None,
timezone: str | None = None,
enabled: bool = True,
persistent: bool = True,
run_once: bool = False,
run_at: datetime | None = None,
) -> CronJob:
# If run_once with run_at, store run_at in payload for later reference.
if run_once and run_at:
payload = {**payload, "run_at": run_at.isoformat()}
job = await self.db.create_cron_job(
name=name,
job_type="active_agent",
@@ -107,6 +113,7 @@ class CronJobManager:
description=description,
enabled=enabled,
persistent=persistent,
run_once=run_once,
)
if enabled:
self._schedule_job(job)
@@ -148,7 +155,19 @@ class CronJobManager:
job.timezone,
job.job_id,
)
trigger = CronTrigger.from_crontab(job.cron_expression, timezone=tzinfo)
if job.run_once:
run_at_str = None
if isinstance(job.payload, dict):
run_at_str = job.payload.get("run_at")
run_at_str = run_at_str or job.cron_expression
if not run_at_str:
raise ValueError("run_once job missing run_at timestamp")
run_at = datetime.fromisoformat(run_at_str)
if run_at.tzinfo is None and tzinfo is not None:
run_at = run_at.replace(tzinfo=tzinfo)
trigger = DateTrigger(run_date=run_at, timezone=tzinfo)
else:
trigger = CronTrigger.from_crontab(job.cron_expression, timezone=tzinfo)
self.scheduler.add_job(
self._run_job,
id=job.job_id,
@@ -199,6 +218,9 @@ class CronJobManager:
last_error=last_error,
next_run_time=next_run,
)
if job.run_once:
# one-shot: remove after execution regardless of success
await self.delete_job(job_id)
async def _run_basic_job(self, job: CronJob):
handler = self._basic_handlers.get(job.job_id)
@@ -221,9 +243,13 @@ class CronJobManager:
"id": job.job_id,
"name": job.name,
"type": job.job_type,
"run_once": job.run_once,
"description": job.description,
"note": note,
"run_started_at": start_time.isoformat(),
"run_at": (
job.payload.get("run_at") if isinstance(job.payload, dict) else None
),
},
"cron_payload": payload,
}
@@ -273,11 +299,13 @@ class CronJobManager:
# judge user's role
umo = cron_event.unified_msg_origin
cfg = self.ctx.get_config(umo=umo)
cron_payload = extras.get("cron_payload", {}) if extras else {}
sender_id = cron_payload.get("sender_id")
admin_ids = cfg.get("admins_id", [])
if admin_ids:
cron_event.role = (
"admin" if cron_event.get_sender_id() in admin_ids else "member"
)
cron_event.role = "admin" if sender_id in admin_ids else "member"
if cron_payload.get("origin", "tool") == "api":
cron_event.role = "admin"
config = MainAgentBuildConfig(
tool_call_timeout=3600,
+2
View File
@@ -528,6 +528,7 @@ class BaseDatabase(abc.ABC):
description: str | None = None,
enabled: bool = True,
persistent: bool = True,
run_once: bool = False,
status: str | None = None,
job_id: str | None = None,
) -> CronJob:
@@ -546,6 +547,7 @@ class BaseDatabase(abc.ABC):
description: str | None = None,
enabled: bool | None = None,
persistent: bool | None = None,
run_once: bool | None = None,
status: str | None = None,
next_run_time: datetime.datetime | None = None,
last_run_at: datetime.datetime | None = None,
+1
View File
@@ -163,6 +163,7 @@ class CronJob(TimestampMixin, SQLModel, table=True):
payload: dict = Field(default_factory=dict, sa_type=JSON)
enabled: bool = Field(default=True)
persistent: bool = Field(default=True)
run_once: bool = Field(default=False)
status: str = Field(default="scheduled", max_length=32)
last_run_at: datetime | None = Field(default=None)
next_run_time: datetime | None = Field(default=None)
+4
View File
@@ -1594,6 +1594,7 @@ class SQLiteDatabase(BaseDatabase):
description: str | None = None,
enabled: bool = True,
persistent: bool = True,
run_once: bool = False,
status: str | None = None,
job_id: str | None = None,
) -> CronJob:
@@ -1609,6 +1610,7 @@ class SQLiteDatabase(BaseDatabase):
description=description,
enabled=enabled,
persistent=persistent,
run_once=run_once,
status=status or "scheduled",
)
if job_id:
@@ -1629,6 +1631,7 @@ class SQLiteDatabase(BaseDatabase):
description: str | None | object = CRON_FIELD_NOT_SET,
enabled: bool | None | object = CRON_FIELD_NOT_SET,
persistent: bool | None | object = CRON_FIELD_NOT_SET,
run_once: bool | None | object = CRON_FIELD_NOT_SET,
status: str | None | object = CRON_FIELD_NOT_SET,
next_run_time: datetime | None | object = CRON_FIELD_NOT_SET,
last_run_at: datetime | None | object = CRON_FIELD_NOT_SET,
@@ -1646,6 +1649,7 @@ class SQLiteDatabase(BaseDatabase):
"description": description,
"enabled": enabled,
"persistent": persistent,
"run_once": run_once,
"status": status,
"next_run_time": next_run_time,
"last_run_at": last_run_at,
+41 -12
View File
@@ -1,3 +1,4 @@
from datetime import datetime
from pydantic import Field
from pydantic.dataclasses import dataclass
@@ -10,8 +11,8 @@ from astrbot.core.astr_agent_context import AstrAgentContext
class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
name: str = "create_future_task"
description: str = (
"Create a future task for your future using a cron expression. "
"Use this when you or the user want recurring follow-up (e.g., daily report to self)."
"Create a future task for your future. Supports recurring cron expressions or one-time run_at datetime. "
"Use this when you or the user want scheduled follow-up or proactive actions."
)
parameters: dict = Field(
default_factory=lambda: {
@@ -19,7 +20,11 @@ class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
"properties": {
"cron_expression": {
"type": "string",
"description": "Cron expression defining when your future agent should wake (e.g., '0 8 * * *').",
"description": "Cron expression defining recurring schedule (e.g., '0 8 * * *').",
},
"run_at": {
"type": "string",
"description": "ISO datetime for one-time execution, e.g., 2026-02-02T08:00:00+08:00. Use with run_once=true.",
},
"note": {
"type": "string",
@@ -29,8 +34,12 @@ class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
"type": "string",
"description": "Optional label to recognize this future task.",
},
"run_once": {
"type": "boolean",
"description": "If true, the task will run only once and then be deleted. Use run_at to specify the time.",
},
},
"required": ["cron_expression", "note"],
"required": ["note"],
}
)
@@ -42,28 +51,48 @@ class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
return "error: cron manager is not available."
cron_expression = kwargs.get("cron_expression")
run_at = kwargs.get("run_at")
run_once = bool(kwargs.get("run_once", False))
note = str(kwargs.get("note", "")).strip()
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
if not cron_expression or not note:
return "error: cron_expression and note are required."
if not note:
return "error: note is required."
if run_once and not run_at:
return "error: run_at is required when run_once=true."
if (not run_once) and not cron_expression:
return "error: cron_expression is required when run_once=false."
if run_once and cron_expression:
cron_expression = None
run_at_dt = None
if run_at:
try:
run_at_dt = datetime.fromisoformat(str(run_at))
except Exception:
return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00"
payload = {
"session": context.context.event.unified_msg_origin,
"sender_id": context.context.event.get_sender_id(),
"note": note,
"origin": "tool",
}
job = await cron_mgr.add_active_job(
name=name,
cron_expression=str(cron_expression),
cron_expression=str(cron_expression) if cron_expression else None,
payload=payload,
description=note,
run_once=run_once,
run_at=run_at_dt,
)
next_run = job.next_run_time
return (
f"Scheduled future task {job.job_id} ({job.name}) with expression '{cron_expression}'. "
f"You will be awakened at: {next_run}"
next_run = job.next_run_time or run_at_dt
suffix = (
f"one-time at {next_run}"
if run_once
else f"expression '{cron_expression}' (next {next_run})"
)
return f"Scheduled future task {job.job_id} ({job.name}) {suffix}."
@dataclass
@@ -125,7 +154,7 @@ class ListCronJobsTool(FunctionTool[AstrAgentContext]):
lines = []
for j in jobs:
lines.append(
f"{j.job_id} | {j.name} | {j.job_type} | enabled={j.enabled} | next={j.next_run_time}"
f"{j.job_id} | {j.name} | {j.job_type} | run_once={getattr(j, 'run_once', False)} | enabled={j.enabled} | next={j.next_run_time}"
)
return "\n".join(lines)
+35 -10
View File
@@ -31,6 +31,8 @@ class CronRoute(Route):
# expose note explicitly for UI (prefer payload.note then description)
payload = data.get("payload") or {}
data["note"] = payload.get("note") or data.get("description") or ""
data["run_at"] = payload.get("run_at")
data["run_once"] = data.get("run_once", False)
# status is internal; hide to avoid implying one-time completion for recurring jobs
data.pop("status", None)
return data
@@ -62,7 +64,6 @@ class CronRoute(Route):
if not isinstance(payload, dict):
return jsonify(Response().error("Invalid payload").__dict__)
job_type = payload.get("job_type", "active_agent")
name = payload.get("name") or "active_agent_task"
cron_expression = payload.get("cron_expression")
note = payload.get("note") or payload.get("description") or name
@@ -71,28 +72,43 @@ class CronRoute(Route):
provider_id = payload.get("provider_id")
timezone = payload.get("timezone")
enabled = bool(payload.get("enabled", True))
run_once = bool(payload.get("run_once", False))
run_at = payload.get("run_at")
if not cron_expression or not session:
if not session:
return jsonify(
Response().error("session is required").__dict__
)
if run_once and not run_at:
return jsonify(
Response().error("run_at is required when run_once=true").__dict__
)
if (not run_once) and not cron_expression:
return jsonify(
Response()
.error("cron_expression and session are required")
.error("cron_expression is required when run_once=false")
.__dict__
)
if run_once and cron_expression:
cron_expression = None # ignore cron when run_once specified
run_at_dt = None
if run_at:
try:
run_at_dt = datetime.fromisoformat(str(run_at))
except Exception:
return jsonify(
Response().error("run_at must be ISO datetime").__dict__
)
job_payload = {
"session": session,
"note": note,
"persona_id": persona_id,
"provider_id": provider_id,
"run_at": run_at,
"origin": "api",
}
if job_type != "active_agent":
return jsonify(
Response()
.error("Only active_agent jobs are supported now.")
.__dict__
)
job = await cron_mgr.add_active_job(
name=name,
cron_expression=cron_expression,
@@ -100,6 +116,8 @@ class CronRoute(Route):
description=note,
timezone=timezone,
enabled=enabled,
run_once=run_once,
run_at=run_at_dt,
)
return jsonify(Response().ok(data=self._serialize_job(job)).__dict__)
@@ -125,9 +143,16 @@ class CronRoute(Route):
"description": payload.get("description"),
"enabled": payload.get("enabled"),
"timezone": payload.get("timezone"),
"run_once": payload.get("run_once"),
"payload": payload.get("payload"),
}
# remove None values to avoid unwanted resets
updates = {k: v for k, v in updates.items() if v is not None}
if "run_at" in payload:
updates.setdefault("payload", {})
if updates["payload"] is None:
updates["payload"] = {}
updates["payload"]["run_at"] = payload.get("run_at")
job = await cron_mgr.update_job(job_id, **updates)
if not job:
+119 -3
View File
@@ -16,6 +16,7 @@
</div>
</div>
<div class="d-flex align-center" style="gap: 8px;">
<v-btn variant="tonal" color="primary" @click="openCreate">新建任务</v-btn>
<v-btn variant="tonal" color="primary" :loading="loading" @click="loadJobs">刷新</v-btn>
</div>
</div>
@@ -35,11 +36,16 @@
<div class="text-caption text-medium-emphasis">{{ item.description }}</div>
</template>
<template #item.type="{ item }">
<v-chip size="small" color="primary" variant="tonal">{{ item.job_type }}</v-chip>
<v-chip size="small" :color="item.run_once ? 'orange' : 'primary'" variant="tonal">
{{ item.run_once ? '一次性' : (item.job_type || 'active_agent') }}
</v-chip>
</template>
<template #item.cron_expression="{ item }">
<div>{{ item.cron_expression || '—' }}</div>
<div class="text-caption text-medium-emphasis">{{ item.timezone || 'local' }}</div>
<div v-if="item.run_once">{{ formatTime(item.run_at) }}</div>
<div v-else>
<div>{{ item.cron_expression || '—' }}</div>
<div class="text-caption text-medium-emphasis">{{ item.timezone || 'local' }}</div>
</div>
</template>
<template #item.next_run_time="{ item }">{{ formatTime(item.next_run_time) }}</template>
<template #item.last_run_at="{ item }">{{ formatTime(item.last_run_at) }}</template>
@@ -58,6 +64,50 @@
<v-snackbar v-model="snackbar.show" :color="snackbar.color" timeout="2600">
{{ snackbar.message }}
</v-snackbar>
<v-dialog v-model="createDialog" max-width="560">
<v-card>
<v-card-title class="text-h6">新建任务</v-card-title>
<v-card-text>
<v-switch v-model="newJob.run_once" label="一次性任务" inset color="primary" hide-details />
<v-text-field v-model="newJob.name" label="任务名称" variant="outlined" density="comfortable" />
<v-text-field v-model="newJob.note" label="任务说明" variant="outlined" density="comfortable" />
<v-text-field
v-if="!newJob.run_once"
v-model="newJob.cron_expression"
label="Cron 表达式"
placeholder="0 9 * * *"
variant="outlined"
density="comfortable"
/>
<v-text-field
v-else
v-model="newJob.run_at"
label="执行时间"
type="datetime-local"
variant="outlined"
density="comfortable"
/>
<v-text-field
v-model="newJob.session"
label="目标 session (platform_id:message_type:session_id)"
variant="outlined"
density="comfortable"
/>
<v-text-field
v-model="newJob.timezone"
label="时区(可选,如 Asia/Shanghai"
variant="outlined"
density="comfortable"
/>
<v-switch v-model="newJob.enabled" label="启用" inset color="primary" hide-details />
</v-card-text>
<v-card-actions class="justify-end">
<v-btn variant="text" @click="createDialog = false">取消</v-btn>
<v-btn variant="tonal" color="primary" :loading="creating" @click="createJob">创建</v-btn>
</v-card-actions>
</v-card>
</v-dialog>
</div>
</template>
@@ -68,6 +118,18 @@ import axios from 'axios'
const loading = ref(false)
const jobs = ref<any[]>([])
const proactivePlatforms = ref<{ id: string; name: string; display_name?: string }[]>([])
const createDialog = ref(false)
const creating = ref(false)
const newJob = ref({
run_once: false,
name: '',
note: '',
cron_expression: '',
run_at: '',
session: '',
timezone: '',
enabled: true
})
const snackbar = ref({ show: false, message: '', color: 'success' })
@@ -154,6 +216,60 @@ async function deleteJob(job: any) {
}
}
function openCreate() {
resetNewJob()
createDialog.value = true
}
function resetNewJob() {
newJob.value = {
run_once: false,
name: '',
note: '',
cron_expression: '',
run_at: '',
session: '',
timezone: '',
enabled: true
}
}
async function createJob() {
if (!newJob.value.session) {
toast('请填写 session', 'warning')
return
}
if (!newJob.value.note) {
toast('请填写说明', 'warning')
return
}
if (!newJob.value.run_once && !newJob.value.cron_expression) {
toast('请填写 Cron 表达式', 'warning')
return
}
if (newJob.value.run_once && !newJob.value.run_at) {
toast('请选择执行时间', 'warning')
return
}
creating.value = true
try {
const payload: any = { ...newJob.value }
const res = await axios.post('/api/cron/jobs', payload)
if (res.data.status === 'ok') {
toast('创建成功')
createDialog.value = false
resetNewJob()
await loadJobs()
} else {
toast(res.data.message || '创建失败', 'error')
}
} catch (e: any) {
toast(e?.response?.data?.message || '创建失败', 'error')
} finally {
creating.value = false
}
}
onMounted(() => {
loadJobs()
loadPlatforms()