Compare commits

...

1 Commits

Author SHA1 Message Date
Soulter 6948fac7b6 feat: support bwrap/seatbelt-based local sandbox runtime for Computer Use 2026-02-27 00:20:01 +08:00
12 changed files with 267 additions and 78 deletions
+1 -1
View File
@@ -106,7 +106,7 @@ class FunctionToolExecutor(BaseFunctionToolExecutor[AstrAgentContext]):
FILE_UPLOAD_TOOL.name: FILE_UPLOAD_TOOL,
FILE_DOWNLOAD_TOOL.name: FILE_DOWNLOAD_TOOL,
}
if runtime == "local":
if runtime in {"local", "local_sandboxed"}:
return {
LOCAL_EXECUTE_SHELL_TOOL.name: LOCAL_EXECUTE_SHELL_TOOL,
LOCAL_PYTHON_TOOL.name: LOCAL_PYTHON_TOOL,
+2 -2
View File
@@ -110,7 +110,7 @@ class MainAgentBuildConfig:
to prevent LLM output harmful information"""
safety_mode_strategy: str = "system_prompt"
computer_use_runtime: str = "local"
"""The runtime for agent computer use: none, local, or sandbox."""
"""The runtime for agent computer use: none, local, local_sandboxed, or sandbox."""
sandbox_cfg: dict = field(default_factory=dict)
add_cron_tools: bool = True
"""This will add cron job management tools to the main agent for proactive cron job execution."""
@@ -1050,7 +1050,7 @@ async def build_main_agent(
if config.computer_use_runtime == "sandbox":
_apply_sandbox_tools(config, req, req.session_id)
elif config.computer_use_runtime == "local":
elif config.computer_use_runtime in {"local", "local_sandboxed"}:
_apply_local_env_tools(req)
agent_runner = AgentRunner()
+208 -56
View File
@@ -2,22 +2,24 @@ from __future__ import annotations
import asyncio
import os
import re
import shlex
import shutil
import subprocess
import sys
import uuid
from dataclasses import dataclass
from typing import Any
from pathlib import Path
from typing import Any, Literal
from astrbot.api import logger
from astrbot.core.utils.astrbot_path import (
get_astrbot_data_path,
get_astrbot_root,
get_astrbot_temp_path,
)
from astrbot.core.utils.astrbot_path import get_astrbot_root
from ..olayer import FileSystemComponent, PythonComponent, ShellComponent
from .base import ComputerBooter
SandboxBackend = Literal["none", "bwrap", "seatbelt"]
_BLOCKED_COMMAND_PATTERNS = [
" rm -rf ",
" rm -fr ",
@@ -40,20 +42,132 @@ def _is_safe_command(command: str) -> bool:
return not any(pat in cmd for pat in _BLOCKED_COMMAND_PATTERNS)
def _ensure_safe_path(path: str) -> str:
abs_path = os.path.abspath(path)
allowed_roots = [
os.path.abspath(get_astrbot_root()),
os.path.abspath(get_astrbot_data_path()),
os.path.abspath(get_astrbot_temp_path()),
]
if not any(abs_path.startswith(root) for root in allowed_roots):
raise PermissionError("Path is outside the allowed computer roots.")
return abs_path
def _escape_seatbelt_string(raw: str) -> str:
return raw.replace("\\", "\\\\").replace('"', '\\"')
def _session_workspace_name(session_id: str) -> str:
safe_prefix = re.sub(r"[^A-Za-z0-9._-]+", "_", session_id).strip("._-")
if not safe_prefix:
safe_prefix = "session"
safe_prefix = safe_prefix[:40]
suffix = uuid.uuid5(uuid.NAMESPACE_DNS, session_id).hex[:12]
return f"{safe_prefix}_{suffix}"
def _detect_sandbox_backend() -> SandboxBackend:
if sys.platform.startswith("linux"):
if shutil.which("bwrap"):
return "bwrap"
raise RuntimeError("Local runtime requires 'bwrap' on Linux.")
if sys.platform == "darwin":
if shutil.which("sandbox-exec"):
return "seatbelt"
raise RuntimeError("Local runtime requires 'sandbox-exec' on macOS.")
return "none"
@dataclass(frozen=True)
class LocalSandboxPolicy:
workspace: Path
backend: SandboxBackend
sandboxed: bool
default_cwd: Path
@classmethod
def build_default(cls, session_id: str, sandboxed: bool) -> LocalSandboxPolicy:
workspace_root_raw = os.environ.get(
"ASTRBOT_LOCAL_WORKSPACE_ROOT"
) or os.environ.get("ASTRBOT_LOCAL_WORKSPACE", "~/.astrbot/workspace")
workspace_root = Path(workspace_root_raw).expanduser().resolve()
workspace = workspace_root / _session_workspace_name(session_id)
default_cwd = workspace if sandboxed else Path(get_astrbot_root()).resolve()
return cls(
workspace=workspace,
backend=_detect_sandbox_backend() if sandboxed else "none",
sandboxed=sandboxed,
default_cwd=default_cwd,
)
def ensure_workspace(self) -> None:
try:
self.workspace.mkdir(parents=True, exist_ok=True)
except PermissionError as exc:
raise RuntimeError(
"Cannot create local workspace. "
"Set ASTRBOT_LOCAL_WORKSPACE_ROOT to a writable path."
) from exc
def resolve_path(self, path: str, base: Path | None = None) -> Path:
raw = Path(path).expanduser()
resolved = raw if raw.is_absolute() else (base or self.default_cwd) / raw
return resolved.resolve()
def ensure_writable_path(self, path: str) -> Path:
abs_path = self.resolve_path(path)
if self.sandboxed and not abs_path.is_relative_to(self.workspace):
raise PermissionError(
f"Write path is outside workspace: {self.workspace.as_posix()}"
)
return abs_path
def normalize_working_dir(self, cwd: str | None) -> Path:
target = self.resolve_path(cwd) if cwd else self.default_cwd
if not target.exists():
raise FileNotFoundError(f"Working directory does not exist: {target}")
if not target.is_dir():
raise NotADirectoryError(f"Working directory is not a directory: {target}")
return target
def wrap_command(self, command: list[str], working_dir: Path) -> list[str]:
if not self.sandboxed:
return command
if self.backend == "bwrap":
return [
"bwrap",
"--die-with-parent",
"--new-session",
"--ro-bind",
"/",
"/",
"--bind",
str(self.workspace),
str(self.workspace),
"--proc",
"/proc",
"--dev",
"/dev",
"--chdir",
str(working_dir),
"--",
*command,
]
if self.backend == "seatbelt":
workspace_escaped = _escape_seatbelt_string(str(self.workspace))
profile = "\n".join(
[
"(version 1)",
"(deny default)",
'(import "system.sb")',
"(allow process*)",
"(allow file-read*)",
f'(allow file-write* (subpath "{workspace_escaped}"))',
"(allow network*)",
]
)
return ["sandbox-exec", "-p", profile, *command]
raise RuntimeError("Sandbox backend is not available for local_sandboxed mode.")
@dataclass
class LocalShellComponent(ShellComponent):
policy: LocalSandboxPolicy
async def exec(
self,
command: str,
@@ -67,41 +181,58 @@ class LocalShellComponent(ShellComponent):
raise PermissionError("Blocked unsafe shell command.")
def _run() -> dict[str, Any]:
shell_command = (
["/bin/sh", "-lc", command] if shell else shlex.split(command)
)
run_env = os.environ.copy()
if env:
run_env.update({str(k): str(v) for k, v in env.items()})
working_dir = _ensure_safe_path(cwd) if cwd else get_astrbot_root()
working_dir = self.policy.normalize_working_dir(cwd)
wrapped_command = self.policy.wrap_command(shell_command, working_dir)
if background:
proc = subprocess.Popen(
command,
shell=shell,
wrapped_command,
shell=False,
cwd=working_dir,
env=run_env,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return {"pid": proc.pid, "stdout": "", "stderr": "", "exit_code": None}
result = subprocess.run(
command,
shell=shell,
cwd=working_dir,
env=run_env,
timeout=timeout,
capture_output=True,
text=True,
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode,
}
try:
result = subprocess.run(
wrapped_command,
shell=False,
cwd=working_dir,
env=run_env,
timeout=timeout,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.returncode,
}
except subprocess.TimeoutExpired:
timeout_seconds = timeout if timeout is not None else "configured"
return {
"stdout": "",
"stderr": f"Execution timed out after {timeout_seconds} seconds.",
"exit_code": 124,
}
return await asyncio.to_thread(_run)
@dataclass
class LocalPythonComponent(PythonComponent):
policy: LocalSandboxPolicy
async def exec(
self,
code: str,
@@ -110,9 +241,13 @@ class LocalPythonComponent(PythonComponent):
silent: bool = False,
) -> dict[str, Any]:
def _run() -> dict[str, Any]:
python_command = [os.environ.get("PYTHON", sys.executable), "-c", code]
working_dir = self.policy.normalize_working_dir(None)
wrapped_command = self.policy.wrap_command(python_command, working_dir)
try:
result = subprocess.run(
[os.environ.get("PYTHON", sys.executable), "-c", code],
wrapped_command,
cwd=working_dir,
timeout=timeout,
capture_output=True,
text=True,
@@ -138,23 +273,25 @@ class LocalPythonComponent(PythonComponent):
@dataclass
class LocalFileSystemComponent(FileSystemComponent):
policy: LocalSandboxPolicy
async def create_file(
self, path: str, content: str = "", mode: int = 0o644
) -> dict[str, Any]:
def _run() -> dict[str, Any]:
abs_path = _ensure_safe_path(path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, "w", encoding="utf-8") as f:
abs_path = self.policy.ensure_writable_path(path)
abs_path.parent.mkdir(parents=True, exist_ok=True)
with abs_path.open("w", encoding="utf-8") as f:
f.write(content)
os.chmod(abs_path, mode)
return {"success": True, "path": abs_path}
abs_path.chmod(mode)
return {"success": True, "path": str(abs_path)}
return await asyncio.to_thread(_run)
async def read_file(self, path: str, encoding: str = "utf-8") -> dict[str, Any]:
def _run() -> dict[str, Any]:
abs_path = _ensure_safe_path(path)
with open(abs_path, encoding=encoding) as f:
abs_path = self.policy.resolve_path(path)
with abs_path.open(encoding=encoding) as f:
content = f.read()
return {"success": True, "content": content}
@@ -164,22 +301,22 @@ class LocalFileSystemComponent(FileSystemComponent):
self, path: str, content: str, mode: str = "w", encoding: str = "utf-8"
) -> dict[str, Any]:
def _run() -> dict[str, Any]:
abs_path = _ensure_safe_path(path)
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, mode, encoding=encoding) as f:
abs_path = self.policy.ensure_writable_path(path)
abs_path.parent.mkdir(parents=True, exist_ok=True)
with abs_path.open(mode, encoding=encoding) as f:
f.write(content)
return {"success": True, "path": abs_path}
return {"success": True, "path": str(abs_path)}
return await asyncio.to_thread(_run)
async def delete_file(self, path: str) -> dict[str, Any]:
def _run() -> dict[str, Any]:
abs_path = _ensure_safe_path(path)
if os.path.isdir(abs_path):
abs_path = self.policy.ensure_writable_path(path)
if abs_path.is_dir():
shutil.rmtree(abs_path)
else:
os.remove(abs_path)
return {"success": True, "path": abs_path}
abs_path.unlink()
return {"success": True, "path": str(abs_path)}
return await asyncio.to_thread(_run)
@@ -187,8 +324,8 @@ class LocalFileSystemComponent(FileSystemComponent):
self, path: str = ".", show_hidden: bool = False
) -> dict[str, Any]:
def _run() -> dict[str, Any]:
abs_path = _ensure_safe_path(path)
entries = os.listdir(abs_path)
abs_path = self.policy.resolve_path(path)
entries = [entry.name for entry in abs_path.iterdir()]
if not show_hidden:
entries = [e for e in entries if not e.startswith(".")]
return {"success": True, "entries": entries}
@@ -197,13 +334,28 @@ class LocalFileSystemComponent(FileSystemComponent):
class LocalBooter(ComputerBooter):
def __init__(self) -> None:
self._fs = LocalFileSystemComponent()
self._python = LocalPythonComponent()
self._shell = LocalShellComponent()
def __init__(self, session_id: str, sandboxed: bool = False) -> None:
self._session_id = session_id
self._policy = LocalSandboxPolicy.build_default(
session_id=session_id, sandboxed=sandboxed
)
if sandboxed:
self._policy.ensure_workspace()
if sandboxed and self._policy.backend == "none":
logger.warning(
f"Local runtime sandbox backend is unavailable on {sys.platform}. "
"Only filesystem tools are restricted to workspace."
)
self._fs = LocalFileSystemComponent(policy=self._policy)
self._python = LocalPythonComponent(policy=self._policy)
self._shell = LocalShellComponent(policy=self._policy)
async def boot(self, session_id: str) -> None:
logger.info(f"Local computer booter initialized for session: {session_id}")
logger.info(
f"Local computer booter initialized for session: {session_id} "
f"(sandboxed={self._policy.sandboxed}, "
f"backend={self._policy.backend}, workspace={self._policy.workspace})"
)
async def shutdown(self) -> None:
logger.info("Local computer booter shutdown complete.")
+6 -6
View File
@@ -15,7 +15,7 @@ from .booters.base import ComputerBooter
from .booters.local import LocalBooter
session_booter: dict[str, ComputerBooter] = {}
local_booter: ComputerBooter | None = None
local_booters: dict[tuple[str, bool], ComputerBooter] = {}
async def _sync_skills_to_sandbox(booter: ComputerBooter) -> None:
@@ -104,8 +104,8 @@ async def get_booter(
return session_booter[session_id]
def get_local_booter() -> ComputerBooter:
global local_booter
if local_booter is None:
local_booter = LocalBooter()
return local_booter
def get_local_booter(session_id: str, sandboxed: bool = False) -> ComputerBooter:
key = (session_id, sandboxed)
if key not in local_booters:
local_booters[key] = LocalBooter(session_id=session_id, sandboxed=sandboxed)
return local_booters[key]
+13 -2
View File
@@ -83,7 +83,10 @@ class PythonTool(FunctionTool):
@dataclass
class LocalPythonTool(FunctionTool):
name: str = "astrbot_execute_python"
description: str = "Execute codes in a Python environment."
description: str = (
"Execute code in a local Python environment. "
"In local_sandboxed runtime, writes are restricted to ~/.astrbot/workspace/<session>."
)
parameters: dict = field(default_factory=lambda: param_schema)
@@ -92,7 +95,15 @@ class LocalPythonTool(FunctionTool):
) -> ToolExecResult:
if permission_error := check_admin_permission(context, "Python execution"):
return permission_error
sb = get_local_booter()
event = context.context.event
cfg = context.context.context.get_config(umo=event.unified_msg_origin)
runtime = str(
cfg.get("provider_settings", {}).get("computer_use_runtime", "local")
)
sb = get_local_booter(
event.unified_msg_origin,
sandboxed=runtime == "local_sandboxed",
)
try:
result = await sb.python.exec(code, silent=silent)
return await handle_result(result, context.context.event)
+27 -5
View File
@@ -13,14 +13,21 @@ from .permissions import check_admin_permission
@dataclass
class ExecuteShellTool(FunctionTool):
name: str = "astrbot_execute_shell"
description: str = "Execute a command in the shell."
description: str = (
"Execute a command in the shell. "
"In local_sandboxed runtime, writes are restricted to ~/.astrbot/workspace/<session>."
)
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute. Equal to 'cd {working_dir} && {your_command}'.",
"description": "The shell command to execute.",
},
"cwd": {
"type": "string",
"description": "Optional working directory for command execution.",
},
"background": {
"type": "boolean",
@@ -44,21 +51,36 @@ class ExecuteShellTool(FunctionTool):
self,
context: ContextWrapper[AstrAgentContext],
command: str,
cwd: str | None = None,
background: bool = False,
env: dict = {},
) -> ToolExecResult:
if permission_error := check_admin_permission(context, "Shell execution"):
return permission_error
event = context.context.event
cfg = context.context.context.get_config(umo=event.unified_msg_origin)
runtime = str(
cfg.get("provider_settings", {}).get("computer_use_runtime", "local")
)
if self.is_local:
sb = get_local_booter()
sb = get_local_booter(
event.unified_msg_origin,
sandboxed=runtime == "local_sandboxed",
)
else:
sb = await get_booter(
context.context.context,
context.context.event.unified_msg_origin,
event.unified_msg_origin,
)
try:
result = await sb.shell.exec(command, background=background, env=env)
result = await sb.shell.exec(
command,
cwd=cwd,
background=background,
env=env,
)
return json.dumps(result)
except Exception as e:
return f"Error executing command: {str(e)}"
+2 -2
View File
@@ -2772,8 +2772,8 @@ CONFIG_METADATA_3 = {
"provider_settings.computer_use_runtime": {
"description": "Computer Use Runtime",
"type": "string",
"options": ["none", "local", "sandbox"],
"labels": ["", "本地", "沙箱"],
"options": ["none", "local", "local_sandboxed", "sandbox"],
"labels": ["", "本地", "本地(沙箱增强)", "沙箱"],
"hint": "选择 Computer Use 运行环境。",
},
"provider_settings.computer_use_require_admin": {
+4
View File
@@ -180,6 +180,10 @@ class PlatformManager:
from .sources.line.line_adapter import (
LinePlatformAdapter, # noqa: F401
)
case "email":
from .sources.email.email_adapter import (
EmailPlatformAdapter, # noqa: F401
)
except (ImportError, ModuleNotFoundError) as e:
logger.error(
f"加载平台适配器 {platform_config['type']} 失败,原因:{e}。请检查依赖库是否安装。提示:可以在 管理面板->平台日志->安装Pip库 中安装依赖库。",
@@ -147,7 +147,7 @@
"provider_settings": {
"computer_use_runtime": {
"description": "Computer Use Runtime",
"hint": "sandbox means running in a sandbox environment, local means running in a local environment, none means disabling Computer Use. If skills are uploaded, choosing none will cause them to not be usable by the Agent."
"hint": "sandbox means running in a remote sandbox environment, local means running directly on the local machine, local_sandboxed means local execution with OS-level sandboxing (bwrap/seatbelt), and none means disabling Computer Use. If skills are uploaded, choosing none will cause them to not be usable by the Agent."
},
"computer_use_require_admin": {
"description": "Require AstrBot Admin Permission",
@@ -225,7 +225,7 @@
"deleteSuccess": "Deleted successfully",
"deleteFailed": "Delete failed",
"runtimeNoneWarning": "Computer Use runtime is set to None; Skills may not run correctly because no runtime is enabled.",
"runtimeHint": "Set the Computer Use runtime to Local or Sandbox in settings so AstrBot can use your Skills."
"runtimeHint": "Set the Computer Use runtime to Local, Local Sandboxed, or Sandbox in settings so AstrBot can use your Skills."
},
"card": {
"actions": {
@@ -150,7 +150,7 @@
"provider_settings": {
"computer_use_runtime": {
"description": "运行环境",
"hint": "sandbox 代表在沙箱环境中运行, local 代表在本地环境中运行, none 代表不启用。如果上传了 skills,选择 none 会导致其无法被 Agent 正常使用。"
"hint": "sandbox 代表在远程沙箱环境中运行, local 代表在本地直接运行, local_sandboxed 代表本地运行但使用系统沙箱(bwrap/seatbelt)增强隔离, none 代表不启用。如果上传了 skills,选择 none 会导致其无法被 Agent 正常使用。"
},
"computer_use_require_admin": {
"description": "需要 AstrBot 管理员权限",
@@ -225,7 +225,7 @@
"deleteSuccess": "删除成功",
"deleteFailed": "删除失败",
"runtimeNoneWarning": "Computer Use 运行环境为无,Skills 可能无法正确被 Agent 运行,因为没有启用运行环境。",
"runtimeHint": "需要在配置的 “使用电脑能力” 中将运行环境设置为 “local” 或 “sandbox” 才能让 AstrBot 正常使用你提供的 Skills。"
"runtimeHint": "需要在配置的 “使用电脑能力” 中将运行环境设置为 “local”、“local_sandboxed” 或 “sandbox” 才能让 AstrBot 正常使用你提供的 Skills。"
},
"card": {
"actions": {