Compare commits

...

6 Commits

Author SHA1 Message Date
Soulter 3c8ec2f42e 📦 release: v3.5.7 2025-05-05 12:47:21 -04:00
Soulter 7e193f7f52 Merge pull request #1473 from AstrBotDevs/feat-wechat-kf
Feature: 支持接入微信客服
2025-05-06 00:15:37 +08:00
Soulter 7069b02929 chore: add license 2025-05-05 12:11:55 -04:00
Soulter 66995db927 feat: 支持微信客服图片消息 2025-05-05 12:08:23 -04:00
Soulter c36054ca1b feat: 微信客服支持文本消息 2025-05-05 11:53:50 -04:00
Soulter 3e07fbf3dc feat: 微信客服 2025-05-05 11:32:35 -04:00
7 changed files with 652 additions and 55 deletions
+8 -2
View File
@@ -2,7 +2,7 @@
如需修改配置,请在 `data/cmd_config.json` 中修改或者在管理面板中可视化修改。
"""
VERSION = "3.5.6"
VERSION = "3.5.7"
DB_PATH = "data/data_v3.db"
# 默认配置
@@ -159,6 +159,7 @@ CONFIG_METADATA_2 = {
"secret": "",
"token": "",
"encoding_aes_key": "",
"kf_name": "",
"api_base_url": "https://qyapi.weixin.qq.com/cgi-bin/",
"callback_server_host": "0.0.0.0",
"port": 6195,
@@ -193,6 +194,11 @@ CONFIG_METADATA_2 = {
},
},
"items": {
"kf_name": {
"description": "微信客服账号名",
"type": "string",
"hint": "可选。微信客服账号名(不是 ID)。可在 https://kf.weixin.qq.com/kf/frame#/accounts 获取"
},
"telegram_token": {
"description": "Bot Token",
"type": "string",
@@ -237,7 +243,7 @@ CONFIG_METADATA_2 = {
"secret": {
"description": "secret",
"type": "string",
"hint": "必填项。QQ 官方机器人平台的 secret。如何获取请参考文档。",
"hint": "必填项。",
},
"enable_group_c2c": {
"description": "启用消息列表单聊",
@@ -2,6 +2,7 @@ import sys
import uuid
import asyncio
import quart
import aiohttp
from astrbot.api.platform import (
Platform,
@@ -20,10 +21,14 @@ from requests import Response
from wechatpy.enterprise.crypto import WeChatCrypto
from wechatpy.enterprise import WeChatClient
from wechatpy.enterprise.messages import TextMessage, ImageMessage, VoiceMessage
from wechatpy.messages import BaseMessage
from wechatpy.exceptions import InvalidSignatureException
from wechatpy.enterprise import parse_message
from .wecom_event import WecomPlatformEvent
from .wecom_kf import WeChatKF
from .wecom_kf_message import WeChatKFMessage
if sys.version_info >= (3, 12):
from typing import override
else:
@@ -131,9 +136,40 @@ class WecomPlatformAdapter(Platform):
self.config["corpid"].strip(),
self.config["secret"].strip(),
)
self.client.API_BASE_URL = self.api_base_url
async def callback(msg):
# 微信客服
self.kf_name = self.config.get("kf_name", None)
if self.kf_name:
# inject
self.wechat_kf_api = WeChatKF(client=self.client)
self.wechat_kf_message_api = WeChatKFMessage(self.client)
self.client.kf = self.wechat_kf_api
self.client.kf_message = self.wechat_kf_message_api
self.client.API_BASE_URL = self.api_base_url
async def callback(msg: BaseMessage):
if msg.type == "unknown" and msg._data["Event"] == "kf_msg_or_event":
def get_latest_msg_item() -> dict | None:
token = msg._data["Token"]
kfid = msg._data["OpenKfId"]
has_more = 1
ret = {}
while has_more:
ret = self.wechat_kf_api.sync_msg(token, kfid)
has_more = ret["has_more"]
msg_list = ret.get("msg_list", [])
if msg_list:
return msg_list[-1]
return None
msg_new = await asyncio.get_event_loop().run_in_executor(
None, get_latest_msg_item
)
if msg_new:
await self.convert_wechat_kf_message(msg_new)
return
await self.convert_message(msg)
self.server.callback = callback
@@ -153,9 +189,39 @@ class WecomPlatformAdapter(Platform):
@override
async def run(self):
loop = asyncio.get_event_loop()
if self.kf_name:
try:
acc_list = (
await loop.run_in_executor(
None, self.wechat_kf_api.get_account_list
)
).get("account_list", [])
logger.debug(f"获取到微信客服列表: {str(acc_list)}")
for acc in acc_list:
name = acc.get("name", None)
if name != self.kf_name:
continue
open_kfid = acc.get("open_kfid", None)
if not open_kfid:
logger.error("获取微信客服失败,open_kfid 为空。")
logger.debug(f"Found open_kfid: {str(open_kfid)}")
kf_url = (
await loop.run_in_executor(
None,
self.wechat_kf_api.add_contact_way,
open_kfid,
"astrbot_placeholder",
)
).get("url", "")
logger.info(
f"请打开以下链接,在微信扫码以获取客服微信: https://api.cl2wm.cn/api/qrcode/code?text={kf_url}"
)
except Exception as e:
logger.error(e)
await self.server.start_polling()
async def convert_message(self, msg):
async def convert_message(self, msg: BaseMessage) -> AstrBotMessage | None:
abm = AstrBotMessage()
if msg.type == "text":
assert isinstance(msg, TextMessage)
@@ -218,10 +284,42 @@ class WecomPlatformAdapter(Platform):
abm.timestamp = msg.time
abm.session_id = abm.sender.user_id
abm.raw_message = msg
else:
logger.warning(f"暂未实现的事件: {msg.type}")
return
logger.info(f"abm: {abm}")
await self.handle_msg(abm)
async def convert_wechat_kf_message(self, msg: dict) -> AstrBotMessage | None:
msgtype = msg.get("msgtype", None)
external_userid = msg.get("external_userid", None)
abm = AstrBotMessage()
abm.raw_message = msg
abm.raw_message["_wechat_kf_flag"] = None # 方便处理
abm.self_id = msg["open_kfid"]
abm.sender = MessageMember(external_userid, external_userid)
abm.session_id = external_userid
abm.type = MessageType.FRIEND_MESSAGE
if msgtype == "text":
text = msg.get("text", {}).get("content", "").strip()
abm.message = [Plain(text=text)]
abm.message_str = text
elif msgtype == "image":
media_id = msg.get("image", {}).get("media_id", "")
resp: Response = await asyncio.get_event_loop().run_in_executor(
None, self.client.media.download, media_id
)
path = f"data/temp/wechat_kf_{media_id}.jpg"
with open(path, "wb") as f:
f.write(resp.content)
abm.message = [Image(file=path, url=path)]
abm.message_str = "[图片]"
else:
logger.warning(f"未实现的微信客服消息事件: {msg}")
return
await self.handle_msg(abm)
async def handle_msg(self, message: AstrBotMessage):
message_event = WecomPlatformEvent(
message_str=message.message_str,
@@ -4,6 +4,7 @@ from astrbot.api.event import AstrMessageEvent, MessageChain
from astrbot.api.platform import AstrBotMessage, PlatformMetadata
from astrbot.api.message_components import Plain, Image, Record
from wechatpy.enterprise import WeChatClient
from .wecom_kf_message import WeChatKFMessage
from astrbot.api import logger
@@ -52,19 +53,29 @@ class WecomPlatformEvent(AstrMessageEvent):
if start + 2048 >= len(plain):
result.append(plain[start:])
break
# 向前搜索分割标点符号
end = min(start + 2048, len(plain))
cut_position = end
for i in range(end, start, -1):
if i < len(plain) and plain[i-1] in ["", "", "", ".", "!", "?", "\n", ";", ""]:
if i < len(plain) and plain[i - 1] in [
"",
"",
"",
".",
"!",
"?",
"\n",
";",
"",
]:
cut_position = i
break
# 没找到合适的位置分割, 直接切分
if cut_position == end and end < len(plain):
cut_position = end
result.append(plain[start:cut_position])
start = cut_position
@@ -73,57 +84,97 @@ class WecomPlatformEvent(AstrMessageEvent):
async def send(self, message: MessageChain):
message_obj = self.message_obj
for comp in message.chain:
if isinstance(comp, Plain):
# Split long text messages if needed
plain_chunks = await self.split_plain(comp.text)
for chunk in plain_chunks:
self.client.message.send_text(
message_obj.self_id, message_obj.session_id, chunk
)
await asyncio.sleep(0.5) # Avoid sending too fast
elif isinstance(comp, Image):
img_path = await comp.convert_to_file_path()
is_wechat_kf = hasattr(self.client, "kf_message")
if is_wechat_kf:
# 微信客服
kf_message_api = getattr(self.client, "kf_message", None)
if not kf_message_api:
logger.warning("未找到微信客服发送消息方法。")
return
assert isinstance(kf_message_api, WeChatKFMessage)
user_id = self.get_sender_id()
for comp in message.chain:
if isinstance(comp, Plain):
# Split long text messages if needed
plain_chunks = await self.split_plain(comp.text)
for chunk in plain_chunks:
kf_message_api.send_text(user_id, self.get_self_id(), chunk)
await asyncio.sleep(0.5) # Avoid sending too fast
elif isinstance(comp, Image):
img_path = await comp.convert_to_file_path()
with open(img_path, "rb") as f:
try:
response = self.client.media.upload("image", f)
except Exception as e:
logger.error(f"企业微信上传图片失败: {e}")
await self.send(
MessageChain().message(f"企业微信上传图片失败: {e}")
with open(img_path, "rb") as f:
try:
response = self.client.media.upload("image", f)
except Exception as e:
logger.error(f"微信客服上传图片失败: {e}")
await self.send(
MessageChain().message(f"微信客服上传图片失败: {e}")
)
return
logger.debug(f"微信客服上传图片返回: {response}")
kf_message_api.send_image(
user_id,
self.get_self_id(),
response["media_id"],
)
return
logger.info(f"企业微信上传图片返回: {response}")
self.client.message.send_image(
message_obj.self_id,
message_obj.session_id,
response["media_id"],
)
elif isinstance(comp, Record):
record_path = await comp.convert_to_file_path()
# 转成amr
record_path_amr = f"data/temp/{uuid.uuid4()}.amr"
pydub.AudioSegment.from_wav(record_path).export(
record_path_amr, format="amr"
)
else:
logger.warning(f"还没实现这个消息类型的发送逻辑: {comp.type}")
else:
# 企业微信应用
for comp in message.chain:
if isinstance(comp, Plain):
# Split long text messages if needed
plain_chunks = await self.split_plain(comp.text)
for chunk in plain_chunks:
self.client.message.send_text(
message_obj.self_id, message_obj.session_id, chunk
)
await asyncio.sleep(0.5) # Avoid sending too fast
elif isinstance(comp, Image):
img_path = await comp.convert_to_file_path()
with open(record_path_amr, "rb") as f:
try:
response = self.client.media.upload("voice", f)
except Exception as e:
logger.error(f"企业微信上传语音失败: {e}")
await self.send(
MessageChain().message(f"企业微信上传语音失败: {e}")
with open(img_path, "rb") as f:
try:
response = self.client.media.upload("image", f)
except Exception as e:
logger.error(f"企业微信上传图片失败: {e}")
await self.send(
MessageChain().message(f"企业微信上传图片失败: {e}")
)
return
logger.debug(f"企业微信上传图片返回: {response}")
self.client.message.send_image(
message_obj.self_id,
message_obj.session_id,
response["media_id"],
)
return
logger.info(f"企业微信上传语音返回: {response}")
self.client.message.send_voice(
message_obj.self_id,
message_obj.session_id,
response["media_id"],
elif isinstance(comp, Record):
record_path = await comp.convert_to_file_path()
# 转成amr
record_path_amr = f"data/temp/{uuid.uuid4()}.amr"
pydub.AudioSegment.from_wav(record_path).export(
record_path_amr, format="amr"
)
with open(record_path_amr, "rb") as f:
try:
response = self.client.media.upload("voice", f)
except Exception as e:
logger.error(f"企业微信上传语音失败: {e}")
await self.send(
MessageChain().message(f"企业微信上传语音失败: {e}")
)
return
logger.info(f"企业微信上传语音返回: {response}")
self.client.message.send_voice(
message_obj.self_id,
message_obj.session_id,
response["media_id"],
)
else:
logger.warning(f"还没实现这个消息类型的发送逻辑: {comp.type}")
await super().send(message)
async def send_streaming(self, generator, use_fallback: bool = False):
@@ -0,0 +1,278 @@
# -*- coding: utf-8 -*-
"""
The MIT License (MIT)
Copyright (c) 2014-2020 messense
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from wechatpy.client.api.base import BaseWeChatAPI
class WeChatKF(BaseWeChatAPI):
"""
微信客服接口
https://work.weixin.qq.com/api/doc/90000/90135/94670
"""
def sync_msg(self, token, open_kfid, cursor="", limit=1000):
"""
微信客户发送的消息、接待人员在企业微信回复的消息、发送消息接口发送失败事件(如被用户拒收)
、客户点击菜单消息的回复消息,可以通过该接口获取具体的消息内容和事件。不支持读取通过发送消息接口发送的消息。
支持的消息类型:文本、图片、语音、视频、文件、位置、链接、名片、小程序、事件。
:param token: 回调事件返回的token字段,10分钟内有效;可不填,如果不填接口有严格的频率限制。不多于128字节
:param open_kfid: 客服帐号ID
:param cursor: 上一次调用时返回的next_cursor,第一次拉取可以不填。不多于64字节
:param limit: 期望请求的数据量,默认值和最大值都为1000。
注意:可能会出现返回条数少于limit的情况,需结合返回的has_more字段判断是否继续请求。
:return: 接口调用结果
"""
data = {"token": token, "cursor": cursor, "limit": limit, "open_kfid": open_kfid}
return self._post("kf/sync_msg", data=data)
def get_service_state(self, open_kfid, external_userid):
"""
获取会话状态
ID 状态 说明
0 未处理 新会话接入。可选择:1.直接用API自动回复消息。2.放进待接入池等待接待人员接待。3.指定接待人员进行接待
1 由智能助手接待 可使用API回复消息。可选择转入待接入池或者指定接待人员处理。
2 待接入池排队中 在待接入池中排队等待接待人员接入。可选择转为指定人员接待
3 由人工接待 人工接待中。可选择结束会话
4 已结束 会话已经结束。不允许变更会话状态,等待用户重新发起咨询
:param open_kfid: 客服帐号ID
:param external_userid: 微信客户的external_userid
:return: 接口调用结果
"""
data = {
"open_kfid": open_kfid,
"external_userid": external_userid,
}
return self._post("kf/service_state/get", data=data)
def trans_service_state(self, open_kfid, external_userid, service_state, servicer_userid=""):
"""
变更会话状态
:param open_kfid: 客服帐号ID
:param external_userid: 微信客户的external_userid
:param service_state: 当前的会话状态,状态定义参考概述中的表格
:return: 接口调用结果
"""
data = {
"open_kfid": open_kfid,
"external_userid": external_userid,
"service_state": service_state,
}
if servicer_userid:
data["servicer_userid"] = servicer_userid
return self._post("kf/service_state/trans", data=data)
def get_servicer_list(self, open_kfid):
"""
获取接待人员列表
:param open_kfid: 客服帐号ID
:return: 接口调用结果
"""
data = {
"open_kfid": open_kfid,
}
return self._get("kf/servicer/list", params=data)
def add_servicer(self, open_kfid, userid_list):
"""
添加接待人员
添加指定客服帐号的接待人员。
:param open_kfid: 客服帐号ID
:param userid_list: 接待人员userid列表
:return: 接口调用结果
"""
if not isinstance(userid_list, list):
userid_list = [userid_list]
data = {
"open_kfid": open_kfid,
"userid_list": userid_list,
}
return self._post("kf/servicer/add", data=data)
def del_servicer(self, open_kfid, userid_list):
"""
删除接待人员
从客服帐号删除接待人员
:param open_kfid: 客服帐号ID
:param userid_list: 接待人员userid列表
:return: 接口调用结果
"""
if not isinstance(userid_list, list):
userid_list = [userid_list]
data = {
"open_kfid": open_kfid,
"userid_list": userid_list,
}
return self._post("kf/servicer/del", data=data)
def batchget_customer(self, external_userid_list):
"""
客户基本信息获取
:param external_userid_list: external_userid列表
:return: 接口调用结果
"""
if not isinstance(external_userid_list, list):
external_userid_list = [external_userid_list]
data = {
"external_userid_list": external_userid_list,
}
return self._post("kf/customer/batchget", data=data)
def get_account_list(self):
"""
获取客服帐号列表
:return: 接口调用结果
"""
return self._get("kf/account/list")
def add_contact_way(self, open_kfid, scene):
"""
获取客服帐号链接
:param open_kfid: 客服帐号ID
:param scene: 场景值,字符串类型,由开发者自定义。不多于32字节;字符串取值范围(正则表达式)[0-9a-zA-Z_-]*
:return: 接口调用结果
"""
data = {"open_kfid": open_kfid, "scene": scene}
return self._post("kf/add_contact_way", data=data)
def get_upgrade_service_config(self):
"""
获取配置的专员与客户群
:return: 接口调用结果
"""
return self._get("kf/customer/get_upgrade_service_config")
def upgrade_service(self, open_kfid, external_userid, service_type, member=None, groupchat=None):
"""
为客户升级为专员或客户群服务
:param open_kfid: 客服帐号ID
:param external_userid: 微信客户的external_userid
:param service_type: 表示是升级到专员服务还是客户群服务。1:专员服务。2:客户群服务
:param member: 推荐的服务专员,type等于1时有效
:param groupchat: 推荐的客户群,type等于2时有效
:return: 接口调用结果
"""
data = {
"open_kfid": open_kfid,
"external_userid": external_userid,
"type": service_type,
}
if service_type == 1:
data["member"] = member
else:
data["groupchat"] = groupchat
return self._post("kf/customer/upgrade_service", data=data)
def cancel_upgrade_service(self, open_kfid, external_userid):
"""
为客户取消推荐
:param open_kfid: 客服帐号ID
:param external_userid: 微信客户的external_userid
:return: 接口调用结果
"""
data = {"open_kfid": open_kfid, "external_userid": external_userid}
return self._post("kf/customer/cancel_upgrade_service", data=data)
def send_msg_on_event(self, code, msgtype, msg_content, msgid=None):
"""
当特定的事件回调消息包含code字段,可以此code为凭证,调用该接口给用户发送相应事件场景下的消息,如客服欢迎语。
支持发送消息类型:文本、菜单消息。
:param code: 事件响应消息对应的code。通过事件回调下发,仅可使用一次。
:param msgtype: 消息类型。对不同的msgtype,有相应的结构描述,详见消息类型
:param msg_content: 目前支持文本与菜单消息,具体查看文档
:param msgid: 消息ID。如果请求参数指定了msgid,则原样返回,否则系统自动生成并返回。不多于32字节;
字符串取值范围(正则表达式)[0-9a-zA-Z_-]*
:return: 接口调用结果
"""
data = {"code": code, "msgtype": msgtype}
if msgid:
data["msgid"] = msgid
data.update(msg_content)
return self._post("kf/send_msg_on_event", data=data)
def get_corp_statistic(self, start_time, end_time, open_kfid=None):
"""
获取「客户数据统计」企业汇总数据
:param start_time: 开始时间
:param end_time: 结束时间
:param open_kfid: 客服帐号ID
:return: 接口调用结果
"""
data = {"open_kfid": open_kfid, "start_time": start_time, "end_time": end_time}
return self._post("kf/get_corp_statistic", data=data)
def get_servicer_statistic(self, start_time, end_time, open_kfid=None, servicer_userid=None):
"""
获取「客户数据统计」接待人员明细数据
:param start_time: 开始时间
:param end_time: 结束时间
:param open_kfid: 客服帐号ID
:param servicer_userid: 接待人员
:return: 接口调用结果
"""
data = {
"open_kfid": open_kfid,
"servicer_userid": servicer_userid,
"start_time": start_time,
"end_time": end_time,
}
return self._post("kf/get_servicer_statistic", data=data)
def account_update(self, open_kfid, name, media_id):
"""
修改客服账号
:param open_kfid: 客服帐号ID
:param name: 客服名称
:param media_id: 客服头像临时素材
:return: 接口调用结果
"""
data = {"open_kfid": open_kfid, "name": name, "media_id": media_id}
return self._post("kf/account/update", data=data)
@@ -0,0 +1,159 @@
"""
The MIT License (MIT)
Copyright (c) 2014-2020 messense
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from optionaldict import optionaldict
from wechatpy.client.api.base import BaseWeChatAPI
class WeChatKFMessage(BaseWeChatAPI):
"""
发送微信客服消息
https://work.weixin.qq.com/api/doc/90000/90135/94677
支持:
* 文本消息
* 图片消息
* 语音消息
* 视频消息
* 文件消息
* 图文链接
* 小程序
* 菜单消息
* 地理位置
"""
def send(self, user_id, open_kfid, msgid="", msg=None):
"""
当微信客户处于“新接入待处理”或“由智能助手接待”状态下,可调用该接口给用户发送消息。
注意仅当微信客户在主动发送消息给客服后的48小时内,企业可发送消息给客户,最多可发送5条消息;若用户继续发送消息,企业可再次下发消息。
支持发送消息类型:文本、图片、语音、视频、文件、图文、小程序、菜单消息、地理位置。
:param user_id: 指定接收消息的客户UserID
:param open_kfid: 指定发送消息的客服帐号ID
:param msgid: 指定消息ID
:param tag_ids: 标签ID列表。
:param msg: 发送消息的 dict 对象
:type msg: dict | None
:return: 接口调用结果
"""
msg = msg or {}
data = {
"touser": user_id,
"open_kfid": open_kfid,
}
if msgid:
data["msgid"] = msgid
data.update(msg)
return self._post("kf/send_msg", data=data)
def send_text(self, user_id, open_kfid, content, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "text", "text": {"content": content}},
)
def send_image(self, user_id, open_kfid, media_id, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "image", "image": {"media_id": media_id}},
)
def send_voice(self, user_id, open_kfid, media_id, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "voice", "voice": {"media_id": media_id}},
)
def send_video(self, user_id, open_kfid, media_id, msgid=""):
video_data = optionaldict()
video_data["media_id"] = media_id
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "video", "video": dict(video_data)},
)
def send_file(self, user_id, open_kfid, media_id, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "file", "file": {"media_id": media_id}},
)
def send_articles_link(self, user_id, open_kfid, article, msgid=""):
articles_data = {
"title": article["title"],
"desc": article["desc"],
"url": article["url"],
"thumb_media_id": article["thumb_media_id"],
}
return self.send(
user_id,
open_kfid,
msgid,
msg={"msgtype": "news", "link": {"link": articles_data}},
)
def send_msgmenu(self, user_id, open_kfid, head_content, menu_list, tail_content, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={
"msgtype": "msgmenu",
"msgmenu": {"head_content": head_content, "list": menu_list, "tail_content": tail_content},
},
)
def send_location(self, user_id, open_kfid, name, address, latitude, longitude, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={
"msgtype": "location",
"msgmenu": {"name": name, "address": address, "latitude": latitude, "longitude": longitude},
},
)
def send_miniprogram(self, user_id, open_kfid, appid, title, thumb_media_id, pagepath, msgid=""):
return self.send(
user_id,
open_kfid,
msgid,
msg={
"msgtype": "miniprogram",
"msgmenu": {"appid": appid, "title": title, "thumb_media_id": thumb_media_id, "pagepath": pagepath},
},
)
+5
View File
@@ -0,0 +1,5 @@
# What's Changed
> Gewechat 已经停止维护,此版本提供了 `微信客服` 的接入方式,可以在直接微信内聊天。这是微信官方推出的接入方式,因此没有风控风险。详见 [AstrBot 接入企业微信](https://astrbot.app/deploy/platform/wecom.html)。此接入方式处于测试阶段,有问题请及时在 GitHub 上提交 Issue。
1. 支持接入微信客服。
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "AstrBot"
version = "3.5.6"
version = "3.5.7"
description = "易上手的多平台 LLM 聊天机器人及开发框架"
readme = "README.md"
requires-python = ">=3.10"