From 39aea11c22f6ae9b5b72d44f7f0f817db3551802 Mon Sep 17 00:00:00 2001 From: Raven95676 Date: Thu, 29 May 2025 21:03:51 +0800 Subject: [PATCH] perf: enhance file access security in get_file method Co-authored-by: anka-afk <1350989414@qq.com> --- astrbot/dashboard/routes/chat.py | 51 ++++++++++++++------------------ 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/astrbot/dashboard/routes/chat.py b/astrbot/dashboard/routes/chat.py index b61a60f42..95ac98da6 100644 --- a/astrbot/dashboard/routes/chat.py +++ b/astrbot/dashboard/routes/chat.py @@ -60,29 +60,26 @@ class ChatRoute(Route): if not filename: return Response().error("Missing key: filename").__dict__ - # Prevent path traversal attacks by extracting just the basename - filename_ = os.path.basename(filename) - - # Check if the filename contains suspicious patterns - if ( - filename_ != filename - or ".." in filename - or "/" in filename - or "\\" in filename - ): - return Response().error("Invalid filename").__dict__ - try: - with open(os.path.join(self.imgs_dir, filename_), "rb") as f: - if filename_.endswith(".wav"): + file_path = os.path.join(self.imgs_dir, os.path.basename(filename)) + real_file_path = os.path.realpath(file_path) + real_imgs_dir = os.path.realpath(self.imgs_dir) + + if not real_file_path.startswith(real_imgs_dir): + return Response().error("Invalid file path").__dict__ + + with open(real_file_path, "rb") as f: + filename_ext = os.path.splitext(filename)[1].lower() + + if filename_ext == ".wav": return QuartResponse(f.read(), mimetype="audio/wav") - elif filename_.split(".")[-1] in self.supported_imgs: + elif filename_ext[1:] in self.supported_imgs: return QuartResponse(f.read(), mimetype="image/jpeg") else: return QuartResponse(f.read()) - except FileNotFoundError: - return Response().error("File not found").__dict__ + except (FileNotFoundError, OSError): + return Response().error("File access error").__dict__ async def post_image(self): post_data = await request.files @@ -138,17 +135,15 @@ class ChatRoute(Route): self.curr_user_cid[username] = conversation_id - await web_chat_queue.put( - ( - username, - conversation_id, - { - "message": message, - "image_url": image_url, # list - "audio_url": audio_url, - }, - ) - ) + await web_chat_queue.put(( + username, + conversation_id, + { + "message": message, + "image_url": image_url, # list + "audio_url": audio_url, + }, + )) # ζŒδΉ…εŒ– conversation = self.db.get_conversation_by_user_id(username, conversation_id)