4ff07e3c74
* feat: support fallback image parsing for quoted messages * fix: fallback parse quoted images when reply chain has placeholders * style: format network utils with ruff * test: expand quoted parser coverage and improve fallback diagnostics * fix: fallback to text-only retry when image requests fail * fix: tighten image fallback and resolve nested quoted forwards * refactor: simplify quoted message extraction and dedupe images * fix: harden quoted parsing and openai error candidates * fix: harden quoted image ref normalization * refactor: organize quoted parser settings and logging * fix: cap quoted fallback images and avoid retry loops * refactor: split quoted message parser into focused modules * refactor: share onebot segment parsing logic * refactor: unify quoted message parsing flow * feat: move quoted parser tuning to provider settings * fix: add missing i18n metadata for quoted parser settings * chore: refine forwarded message setting labels
495 lines
15 KiB
Python
495 lines
15 KiB
Python
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from astrbot.core.message.components import Image, Plain, Reply
|
|
from astrbot.core.utils.quoted_message_parser import (
|
|
extract_quoted_message_images,
|
|
extract_quoted_message_text,
|
|
)
|
|
|
|
|
|
class _DummyAPI:
|
|
def __init__(
|
|
self,
|
|
responses: dict[tuple[str, str], dict],
|
|
param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict]
|
|
| None = None,
|
|
):
|
|
self._responses = responses
|
|
self._param_responses = param_responses or {}
|
|
|
|
async def call_action(self, action: str, **params):
|
|
param_key = (action, tuple(sorted((k, str(v)) for k, v in params.items())))
|
|
if param_key in self._param_responses:
|
|
return self._param_responses[param_key]
|
|
|
|
msg_id = params.get("message_id")
|
|
if msg_id is None:
|
|
msg_id = params.get("id")
|
|
key = (action, str(msg_id))
|
|
if key not in self._responses:
|
|
raise RuntimeError(f"no mock response for {key}")
|
|
return self._responses[key]
|
|
|
|
|
|
class _FailIfCalledAPI:
|
|
async def call_action(self, action: str, **params):
|
|
raise AssertionError(
|
|
f"call_action should not be called, got action={action}, params={params}"
|
|
)
|
|
|
|
|
|
def _make_event(
|
|
reply: Reply,
|
|
responses: dict[tuple[str, str], dict] | None = None,
|
|
param_responses: dict[tuple[str, tuple[tuple[str, str], ...]], dict] | None = None,
|
|
):
|
|
if responses is None:
|
|
responses = {}
|
|
if param_responses is None:
|
|
param_responses = {}
|
|
return SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[reply]),
|
|
bot=SimpleNamespace(api=_DummyAPI(responses, param_responses)),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_from_reply_chain():
|
|
reply = Reply(id="1", chain=[Plain(text="quoted content")], message_str="")
|
|
event = _make_event(reply)
|
|
text = await extract_quoted_message_text(event)
|
|
assert text == "quoted content"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_no_reply_component():
|
|
event = SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]),
|
|
bot=SimpleNamespace(api=_DummyAPI({}, {})),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_no_reply_component():
|
|
event = SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[Plain(text="unquoted message")]),
|
|
bot=SimpleNamespace(api=_FailIfCalledAPI()),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("reply_id", [None, ""])
|
|
async def test_extract_quoted_message_text_reply_without_id_does_not_call_get_msg(
|
|
reply_id: str | None,
|
|
):
|
|
reply = Reply(
|
|
id="placeholder", chain=[Plain(text="quoted content")], message_str=""
|
|
)
|
|
object.__setattr__(reply, "id", reply_id)
|
|
event = SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[reply]),
|
|
bot=SimpleNamespace(api=_FailIfCalledAPI()),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text == "quoted content"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_fallback_get_msg_and_forward():
|
|
reply = Reply(id="100", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
(
|
|
"get_msg",
|
|
"100",
|
|
): {
|
|
"data": {
|
|
"message": [
|
|
{"type": "text", "data": {"text": "parent"}},
|
|
{"type": "forward", "data": {"id": "fwd_1"}},
|
|
]
|
|
}
|
|
},
|
|
(
|
|
"get_forward_msg",
|
|
"fwd_1",
|
|
): {
|
|
"data": {
|
|
"messages": [
|
|
{
|
|
"sender": {"nickname": "Alice"},
|
|
"message": [{"type": "text", "data": {"text": "hello"}}],
|
|
},
|
|
{
|
|
"sender": {"nickname": "Bob"},
|
|
"message": [
|
|
{"type": "image", "data": {"url": "http://img"}},
|
|
{"type": "text", "data": {"text": "world"}},
|
|
],
|
|
},
|
|
]
|
|
}
|
|
},
|
|
},
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text is not None
|
|
assert "parent" in text
|
|
assert "Alice: hello" in text
|
|
assert "Bob: [Image]world" in text
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"placeholder_text",
|
|
[
|
|
"[Forward Message]",
|
|
"[转发消息]",
|
|
"[合并转发]",
|
|
"Alice: [Forward Message]",
|
|
"(Alice): [转发消息]",
|
|
"[Forward Message]\n[转发消息]",
|
|
"Alice: [Forward Message]\n(Bob): [合并转发]",
|
|
"[转发消息]\n\n[合并转发]",
|
|
],
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_forward_placeholder_variants_trigger_fallback(
|
|
placeholder_text: str,
|
|
):
|
|
reply = Reply(id="400", chain=[Plain(text=placeholder_text)], message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "400"): {
|
|
"data": {
|
|
"message": [
|
|
{"type": "text", "data": {"text": "Bob: "}},
|
|
{"type": "image", "data": {}},
|
|
{"type": "text", "data": {"text": "world"}},
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert "Bob: [Image]world" in text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_mixed_placeholder_does_not_trigger_fallback():
|
|
reply = Reply(
|
|
id="402",
|
|
chain=[Plain(text="Alice: [Forward Message]\nreal text")],
|
|
message_str="",
|
|
)
|
|
event = SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[reply]),
|
|
bot=SimpleNamespace(api=_FailIfCalledAPI()),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text is not None
|
|
assert "[Forward Message]" in text
|
|
assert "real text" in text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_forward_placeholder_fallback_failure():
|
|
reply = Reply(id="401", chain=[Plain(text="[Forward Message]")], message_str="")
|
|
event = _make_event(reply, responses={})
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text == "[Forward Message]"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_text_multimsg_malformed_config_does_not_raise():
|
|
reply = Reply(id="402", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "402"): {
|
|
"data": {
|
|
"message": [
|
|
{
|
|
"type": "json",
|
|
"data": {
|
|
"data": (
|
|
'{"app":"com.tencent.multimsg",'
|
|
'"config":"oops","meta":{}}'
|
|
)
|
|
},
|
|
},
|
|
{"type": "text", "data": {"text": "still works"}},
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text == "still works"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_from_reply_chain():
|
|
reply = Reply(
|
|
id="1",
|
|
chain=[
|
|
Plain(text="quoted"),
|
|
Image(file="https://img.example.com/a.jpg"),
|
|
],
|
|
message_str="",
|
|
)
|
|
event = _make_event(reply)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == ["https://img.example.com/a.jpg"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_fallback_get_msg_direct_url():
|
|
reply = Reply(id="200", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "200"): {
|
|
"data": {
|
|
"message": [
|
|
{
|
|
"type": "image",
|
|
"data": {"url": "https://img.example.com/direct.jpg"},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == ["https://img.example.com/direct.jpg"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_data_image_ref_normalized_to_base64():
|
|
data_image_ref = "data:image/png;base64,abcd1234=="
|
|
reply = Reply(id="201", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "201"): {
|
|
"data": {
|
|
"message": [
|
|
{"type": "image", "data": {"url": data_image_ref}},
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == ["base64://abcd1234=="]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_file_url_with_query_string():
|
|
url_with_query = "https://img.example.com/direct.jpg?token=abc123#frag"
|
|
reply = Reply(id="205", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "205"): {
|
|
"data": {
|
|
"message": [
|
|
{
|
|
"type": "file",
|
|
"data": {
|
|
"url": url_with_query,
|
|
"name": "direct.jpg",
|
|
},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == [url_with_query]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_non_image_local_path_is_ignored(tmp_path):
|
|
non_image_file = tmp_path / "secret.txt"
|
|
non_image_file.write_text("not an image", encoding="utf-8")
|
|
|
|
reply = Reply(
|
|
id="placeholder", chain=[Image(file=str(non_image_file))], message_str=""
|
|
)
|
|
object.__setattr__(reply, "id", None)
|
|
event = SimpleNamespace(
|
|
message_obj=SimpleNamespace(message=[reply]),
|
|
bot=SimpleNamespace(api=_FailIfCalledAPI()),
|
|
get_group_id=lambda: "",
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_chain_placeholder_triggers_fallback():
|
|
reply = Reply(id="210", chain=[Plain(text="[Forward Message]")], message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "210"): {
|
|
"data": {
|
|
"message": [
|
|
{
|
|
"type": "image",
|
|
"data": {
|
|
"url": "https://img.example.com/from-fallback.jpg"
|
|
},
|
|
}
|
|
]
|
|
}
|
|
}
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == ["https://img.example.com/from-fallback.jpg"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_fallback_resolve_file_id_with_get_image():
|
|
reply = Reply(id="300", chain=None, message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "300"): {
|
|
"data": {"message": [{"type": "image", "data": {"file": "abc123.jpg"}}]}
|
|
}
|
|
},
|
|
param_responses={
|
|
("get_image", (("file", "abc123.jpg"),)): {
|
|
"data": {"url": "https://img.example.com/resolved.jpg"}
|
|
}
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == ["https://img.example.com/resolved.jpg"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_images_deduplicates_across_sources():
|
|
dup_url = "https://img.example.com/dup.jpg"
|
|
chain_only_url = "https://img.example.com/only-chain.jpg"
|
|
get_msg_only_url = "https://img.example.com/only-get-msg.jpg"
|
|
forward_only_url = "https://img.example.com/only-forward.jpg"
|
|
|
|
reply = Reply(
|
|
id="310",
|
|
chain=[Image(file=dup_url), Image(file=chain_only_url)],
|
|
message_str="",
|
|
)
|
|
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "310"): {
|
|
"data": {
|
|
"message": [
|
|
{"type": "image", "data": {"url": dup_url}},
|
|
{"type": "image", "data": {"url": get_msg_only_url}},
|
|
{"type": "forward", "data": {"id": "999"}},
|
|
]
|
|
}
|
|
},
|
|
("get_forward_msg", "999"): {
|
|
"data": {
|
|
"messages": [
|
|
{
|
|
"sender": {"nickname": "Tester"},
|
|
"message": [
|
|
{"type": "image", "data": {"url": dup_url}},
|
|
{"type": "image", "data": {"url": forward_only_url}},
|
|
],
|
|
}
|
|
]
|
|
}
|
|
},
|
|
},
|
|
)
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == [
|
|
dup_url,
|
|
chain_only_url,
|
|
get_msg_only_url,
|
|
forward_only_url,
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_quoted_message_nested_forward_id_is_resolved():
|
|
nested_image = "https://img.example.com/nested.jpg"
|
|
reply = Reply(id="320", chain=[Plain(text="[Forward Message]")], message_str="")
|
|
event = _make_event(
|
|
reply,
|
|
responses={
|
|
("get_msg", "320"): {
|
|
"data": {"message": [{"type": "forward", "data": {"id": "fwd_1"}}]}
|
|
},
|
|
("get_forward_msg", "fwd_1"): {
|
|
"data": {
|
|
"messages": [
|
|
{
|
|
"sender": {"nickname": "Alice"},
|
|
"message": [{"type": "forward", "data": {"id": "fwd_2"}}],
|
|
}
|
|
]
|
|
}
|
|
},
|
|
("get_forward_msg", "fwd_2"): {
|
|
"data": {
|
|
"messages": [
|
|
{
|
|
"sender": {"nickname": "Bob"},
|
|
"message": [
|
|
{"type": "text", "data": {"text": "deep"}},
|
|
{"type": "image", "data": {"url": nested_image}},
|
|
],
|
|
}
|
|
]
|
|
}
|
|
},
|
|
},
|
|
)
|
|
|
|
text = await extract_quoted_message_text(event)
|
|
assert text is not None
|
|
assert "Bob: deep" in text
|
|
|
|
images = await extract_quoted_message_images(event)
|
|
assert images == [nested_image]
|