perf: enhance file access security in get_file method

Co-authored-by: anka-afk <1350989414@qq.com>
This commit is contained in:
Raven95676
2025-05-29 21:03:51 +08:00
parent c2f1227700
commit 39aea11c22
+23 -28
View File
@@ -60,29 +60,26 @@ class ChatRoute(Route):
if not filename:
return Response().error("Missing key: filename").__dict__
# Prevent path traversal attacks by extracting just the basename
filename_ = os.path.basename(filename)
# Check if the filename contains suspicious patterns
if (
filename_ != filename
or ".." in filename
or "/" in filename
or "\\" in filename
):
return Response().error("Invalid filename").__dict__
try:
with open(os.path.join(self.imgs_dir, filename_), "rb") as f:
if filename_.endswith(".wav"):
file_path = os.path.join(self.imgs_dir, os.path.basename(filename))
real_file_path = os.path.realpath(file_path)
real_imgs_dir = os.path.realpath(self.imgs_dir)
if not real_file_path.startswith(real_imgs_dir):
return Response().error("Invalid file path").__dict__
with open(real_file_path, "rb") as f:
filename_ext = os.path.splitext(filename)[1].lower()
if filename_ext == ".wav":
return QuartResponse(f.read(), mimetype="audio/wav")
elif filename_.split(".")[-1] in self.supported_imgs:
elif filename_ext[1:] in self.supported_imgs:
return QuartResponse(f.read(), mimetype="image/jpeg")
else:
return QuartResponse(f.read())
except FileNotFoundError:
return Response().error("File not found").__dict__
except (FileNotFoundError, OSError):
return Response().error("File access error").__dict__
async def post_image(self):
post_data = await request.files
@@ -138,17 +135,15 @@ class ChatRoute(Route):
self.curr_user_cid[username] = conversation_id
await web_chat_queue.put(
(
username,
conversation_id,
{
"message": message,
"image_url": image_url, # list
"audio_url": audio_url,
},
)
)
await web_chat_queue.put((
username,
conversation_id,
{
"message": message,
"image_url": image_url, # list
"audio_url": audio_url,
},
))
# 持久化
conversation = self.db.get_conversation_by_user_id(username, conversation_id)