fix: enforce admin guard for sandbox file transfer tools (#5402)
* fix: enforce admin guard for sandbox file transfer tools * refactor: deduplicate computer tools admin permission checks * fix: add missing space in permission error message
This commit is contained in:
@@ -11,6 +11,7 @@ from astrbot.core.message.components import File
|
||||
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
|
||||
|
||||
from ..computer_client import get_booter
|
||||
from .permissions import check_admin_permission
|
||||
|
||||
# @dataclass
|
||||
# class CreateFileTool(FunctionTool):
|
||||
@@ -102,6 +103,8 @@ class FileUploadTool(FunctionTool):
|
||||
context: ContextWrapper[AstrAgentContext],
|
||||
local_path: str,
|
||||
) -> str | None:
|
||||
if permission_error := check_admin_permission(context, "File upload/download"):
|
||||
return permission_error
|
||||
sb = await get_booter(
|
||||
context.context.context,
|
||||
context.context.event.unified_msg_origin,
|
||||
@@ -161,6 +164,8 @@ class FileDownloadTool(FunctionTool):
|
||||
remote_path: str,
|
||||
also_send_to_user: bool = True,
|
||||
) -> ToolExecResult:
|
||||
if permission_error := check_admin_permission(context, "File upload/download"):
|
||||
return permission_error
|
||||
sb = await get_booter(
|
||||
context.context.context,
|
||||
context.context.event.unified_msg_origin,
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
from astrbot.core.agent.run_context import ContextWrapper
|
||||
from astrbot.core.astr_agent_context import AstrAgentContext
|
||||
|
||||
|
||||
def check_admin_permission(
|
||||
context: ContextWrapper[AstrAgentContext], operation_name: str
|
||||
) -> str | None:
|
||||
cfg = context.context.context.get_config(
|
||||
umo=context.context.event.unified_msg_origin
|
||||
)
|
||||
provider_settings = cfg.get("provider_settings", {})
|
||||
require_admin = provider_settings.get("computer_use_require_admin", True)
|
||||
if require_admin and context.context.event.role != "admin":
|
||||
return (
|
||||
f"error: Permission denied. {operation_name} is only allowed for admin users. "
|
||||
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature. "
|
||||
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
|
||||
)
|
||||
return None
|
||||
@@ -7,6 +7,7 @@ from astrbot.core.agent.run_context import ContextWrapper
|
||||
from astrbot.core.agent.tool import ToolExecResult
|
||||
from astrbot.core.astr_agent_context import AstrAgentContext, AstrMessageEvent
|
||||
from astrbot.core.computer.computer_client import get_booter, get_local_booter
|
||||
from astrbot.core.computer.tools.permissions import check_admin_permission
|
||||
from astrbot.core.message.message_event_result import MessageChain
|
||||
|
||||
param_schema = {
|
||||
@@ -26,21 +27,6 @@ param_schema = {
|
||||
}
|
||||
|
||||
|
||||
def _check_admin_permission(context: ContextWrapper[AstrAgentContext]) -> str | None:
|
||||
cfg = context.context.context.get_config(
|
||||
umo=context.context.event.unified_msg_origin
|
||||
)
|
||||
provider_settings = cfg.get("provider_settings", {})
|
||||
require_admin = provider_settings.get("computer_use_require_admin", True)
|
||||
if require_admin and context.context.event.role != "admin":
|
||||
return (
|
||||
"error: Permission denied. Python execution is only allowed for admin users. "
|
||||
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature."
|
||||
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def handle_result(result: dict, event: AstrMessageEvent) -> ToolExecResult:
|
||||
data = result.get("data", {})
|
||||
output = data.get("output", {})
|
||||
@@ -81,7 +67,7 @@ class PythonTool(FunctionTool):
|
||||
async def call(
|
||||
self, context: ContextWrapper[AstrAgentContext], code: str, silent: bool = False
|
||||
) -> ToolExecResult:
|
||||
if permission_error := _check_admin_permission(context):
|
||||
if permission_error := check_admin_permission(context, "Python execution"):
|
||||
return permission_error
|
||||
sb = await get_booter(
|
||||
context.context.context,
|
||||
@@ -104,7 +90,7 @@ class LocalPythonTool(FunctionTool):
|
||||
async def call(
|
||||
self, context: ContextWrapper[AstrAgentContext], code: str, silent: bool = False
|
||||
) -> ToolExecResult:
|
||||
if permission_error := _check_admin_permission(context):
|
||||
if permission_error := check_admin_permission(context, "Python execution"):
|
||||
return permission_error
|
||||
sb = get_local_booter()
|
||||
try:
|
||||
|
||||
@@ -7,21 +7,7 @@ from astrbot.core.agent.tool import ToolExecResult
|
||||
from astrbot.core.astr_agent_context import AstrAgentContext
|
||||
|
||||
from ..computer_client import get_booter, get_local_booter
|
||||
|
||||
|
||||
def _check_admin_permission(context: ContextWrapper[AstrAgentContext]) -> str | None:
|
||||
cfg = context.context.context.get_config(
|
||||
umo=context.context.event.unified_msg_origin
|
||||
)
|
||||
provider_settings = cfg.get("provider_settings", {})
|
||||
require_admin = provider_settings.get("computer_use_require_admin", True)
|
||||
if require_admin and context.context.event.role != "admin":
|
||||
return (
|
||||
"error: Permission denied. Shell execution is only allowed for admin users. "
|
||||
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature."
|
||||
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
|
||||
)
|
||||
return None
|
||||
from .permissions import check_admin_permission
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -61,7 +47,7 @@ class ExecuteShellTool(FunctionTool):
|
||||
background: bool = False,
|
||||
env: dict = {},
|
||||
) -> ToolExecResult:
|
||||
if permission_error := _check_admin_permission(context):
|
||||
if permission_error := check_admin_permission(context, "Shell execution"):
|
||||
return permission_error
|
||||
|
||||
if self.is_local:
|
||||
|
||||
Reference in New Issue
Block a user