202 lines
7.2 KiB
Python
202 lines
7.2 KiB
Python
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from pydantic import Field
|
|
from pydantic.dataclasses import dataclass
|
|
|
|
from astrbot.core.agent.run_context import ContextWrapper
|
|
from astrbot.core.agent.tool import FunctionTool, ToolExecResult
|
|
from astrbot.core.astr_agent_context import AstrAgentContext
|
|
|
|
|
|
def _extract_job_session(job: Any) -> str | None:
|
|
payload = getattr(job, "payload", None)
|
|
if not isinstance(payload, dict):
|
|
return None
|
|
session = payload.get("session")
|
|
return str(session) if session is not None else None
|
|
|
|
|
|
@dataclass
|
|
class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
|
|
name: str = "create_future_task"
|
|
description: str = (
|
|
"Create a future task for your future. Supports recurring cron expressions or one-time run_at datetime. "
|
|
"Use this when you or the user want scheduled follow-up or proactive actions."
|
|
)
|
|
parameters: dict = Field(
|
|
default_factory=lambda: {
|
|
"type": "object",
|
|
"properties": {
|
|
"cron_expression": {
|
|
"type": "string",
|
|
"description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.",
|
|
},
|
|
"run_at": {
|
|
"type": "string",
|
|
"description": "ISO datetime for one-time execution, e.g., 2026-02-02T08:00:00+08:00. Use with run_once=true.",
|
|
},
|
|
"note": {
|
|
"type": "string",
|
|
"description": "Detailed instructions for your future agent to execute when it wakes.",
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Optional label to recognize this future task.",
|
|
},
|
|
"run_once": {
|
|
"type": "boolean",
|
|
"description": "If true, the task will run only once and then be deleted. Use run_at to specify the time.",
|
|
},
|
|
},
|
|
"required": ["note"],
|
|
}
|
|
)
|
|
|
|
async def call(
|
|
self, context: ContextWrapper[AstrAgentContext], **kwargs
|
|
) -> ToolExecResult:
|
|
cron_mgr = context.context.context.cron_manager
|
|
if cron_mgr is None:
|
|
return "error: cron manager is not available."
|
|
|
|
cron_expression = kwargs.get("cron_expression")
|
|
run_at = kwargs.get("run_at")
|
|
run_once = bool(kwargs.get("run_once", False))
|
|
note = str(kwargs.get("note", "")).strip()
|
|
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
|
|
|
|
if not note:
|
|
return "error: note is required."
|
|
if run_once and not run_at:
|
|
return "error: run_at is required when run_once=true."
|
|
if (not run_once) and not cron_expression:
|
|
return "error: cron_expression is required when run_once=false."
|
|
if run_once and cron_expression:
|
|
cron_expression = None
|
|
run_at_dt = None
|
|
if run_at:
|
|
try:
|
|
run_at_dt = datetime.fromisoformat(str(run_at))
|
|
except Exception:
|
|
return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00"
|
|
|
|
payload = {
|
|
"session": context.context.event.unified_msg_origin,
|
|
"sender_id": context.context.event.get_sender_id(),
|
|
"note": note,
|
|
"origin": "tool",
|
|
}
|
|
|
|
job = await cron_mgr.add_active_job(
|
|
name=name,
|
|
cron_expression=str(cron_expression) if cron_expression else None,
|
|
payload=payload,
|
|
description=note,
|
|
run_once=run_once,
|
|
run_at=run_at_dt,
|
|
)
|
|
next_run = job.next_run_time or run_at_dt
|
|
suffix = (
|
|
f"one-time at {next_run}"
|
|
if run_once
|
|
else f"expression '{cron_expression}' (next {next_run})"
|
|
)
|
|
return f"Scheduled future task {job.job_id} ({job.name}) {suffix}."
|
|
|
|
|
|
@dataclass
|
|
class DeleteCronJobTool(FunctionTool[AstrAgentContext]):
|
|
name: str = "delete_future_task"
|
|
description: str = "Delete a future task (cron job) by its job_id."
|
|
parameters: dict = Field(
|
|
default_factory=lambda: {
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {
|
|
"type": "string",
|
|
"description": "The job_id returned when the job was created.",
|
|
}
|
|
},
|
|
"required": ["job_id"],
|
|
}
|
|
)
|
|
|
|
async def call(
|
|
self, context: ContextWrapper[AstrAgentContext], **kwargs
|
|
) -> ToolExecResult:
|
|
cron_mgr = context.context.context.cron_manager
|
|
if cron_mgr is None:
|
|
return "error: cron manager is not available."
|
|
current_umo = context.context.event.unified_msg_origin
|
|
job_id = kwargs.get("job_id")
|
|
if not job_id:
|
|
return "error: job_id is required."
|
|
job = await cron_mgr.db.get_cron_job(str(job_id))
|
|
if not job:
|
|
return f"error: cron job {job_id} not found."
|
|
if _extract_job_session(job) != current_umo:
|
|
return "error: you can only delete future tasks in the current umo."
|
|
await cron_mgr.delete_job(str(job_id))
|
|
return f"Deleted cron job {job_id}."
|
|
|
|
|
|
@dataclass
|
|
class ListCronJobsTool(FunctionTool[AstrAgentContext]):
|
|
name: str = "list_future_tasks"
|
|
description: str = "List existing future tasks (cron jobs) for inspection."
|
|
parameters: dict = Field(
|
|
default_factory=lambda: {
|
|
"type": "object",
|
|
"properties": {
|
|
"job_type": {
|
|
"type": "string",
|
|
"description": "Optional filter: basic or active_agent.",
|
|
}
|
|
},
|
|
}
|
|
)
|
|
|
|
async def call(
|
|
self, context: ContextWrapper[AstrAgentContext], **kwargs
|
|
) -> ToolExecResult:
|
|
cron_mgr = context.context.context.cron_manager
|
|
if cron_mgr is None:
|
|
return "error: cron manager is not available."
|
|
current_umo = context.context.event.unified_msg_origin
|
|
job_type = kwargs.get("job_type")
|
|
jobs = [
|
|
job
|
|
for job in await cron_mgr.list_jobs(job_type)
|
|
if _extract_job_session(job) == current_umo
|
|
]
|
|
if not jobs:
|
|
return "No cron jobs found."
|
|
lines = []
|
|
for j in jobs:
|
|
lines.append(
|
|
f"{j.job_id} | {j.name} | {j.job_type} | run_once={getattr(j, 'run_once', False)} | enabled={j.enabled} | next={j.next_run_time}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
CREATE_CRON_JOB_TOOL = CreateActiveCronTool()
|
|
DELETE_CRON_JOB_TOOL = DeleteCronJobTool()
|
|
LIST_CRON_JOBS_TOOL = ListCronJobsTool()
|
|
|
|
|
|
def get_all_tools() -> list[FunctionTool]:
|
|
"""Return all cron-related tools for registration."""
|
|
return [CREATE_CRON_JOB_TOOL, DELETE_CRON_JOB_TOOL, LIST_CRON_JOBS_TOOL]
|
|
|
|
|
|
__all__ = [
|
|
"CREATE_CRON_JOB_TOOL",
|
|
"DELETE_CRON_JOB_TOOL",
|
|
"LIST_CRON_JOBS_TOOL",
|
|
"CreateActiveCronTool",
|
|
"DeleteCronJobTool",
|
|
"ListCronJobsTool",
|
|
"get_all_tools",
|
|
]
|