feat: 新增自定义指令/keyword

This commit is contained in:
Soulter
2023-04-08 20:21:49 +08:00
parent 45bb30692d
commit 9bca158174
5 changed files with 134 additions and 110 deletions
+85 -85
View File
@@ -77,6 +77,8 @@ gpt_config = {}
baidu_judge = None
# 回复前缀
reply_prefix = {}
# 关键词回复
keywords = {}
def new_sub_thread(func, args=()):
@@ -162,7 +164,7 @@ def upload():
def initBot(cfg, prov):
global chatgpt, provider, rev_chatgpt, baidu_judge, rev_edgegpt, chosen_provider
global reply_prefix, gpt_config, config, uniqueSession, frequency_count, frequency_time,announcement, direct_message_mode, version
global command_openai_official, command_rev_chatgpt, command_rev_edgegpt,reply_prefix
global command_openai_official, command_rev_chatgpt, command_rev_edgegpt,reply_prefix, keywords
provider = prov
config = cfg
if 'reply_prefix' in cfg:
@@ -192,6 +194,11 @@ def initBot(cfg, prov):
command_openai_official = CommandOpenAIOfficial(chatgpt)
chosen_provider = OPENAI_OFFICIAL
# 得到关键词
if os.path.exists("keyword.json"):
with open("keyword.json", 'r', encoding='utf-8') as f:
keywords = json.load(f)
# 检查provider设置偏好
if os.path.exists("provider_preference.txt"):
with open("provider_preference.txt", 'r', encoding='utf-8') as f:
@@ -351,8 +358,9 @@ def oper_msg(message, at=False, msg_ref = None):
session_id = ''
user_id = message.author.id
user_name = message.author.username
global chosen_provider, reply_prefix
print(chosen_provider)
global chosen_provider, reply_prefix, keywords
hit = False # 是否命中指令
command_result = ()
# 检查发言频率
if not check_frequency(user_id):
@@ -378,11 +386,13 @@ def oper_msg(message, at=False, msg_ref = None):
qq_msg = message.content
session_id = user_id
# 这里是预设
if qq_msg.strip() == 'hello' or qq_msg.strip() == '你好' or qq_msg.strip() == '':
send_qq_msg(message, f"你好呀🥰,输入/help查看指令噢", msg_ref=msg_ref)
return
# 关键词回复
for k in keywords:
if qq_msg == k:
send_qq_msg(message, keywords[k], msg_ref=msg_ref)
return
# 关键词拦截器
for i in uw.unfit_words_q:
matches = re.match(i, qq_msg.strip(), re.I | re.M)
@@ -415,16 +425,75 @@ def oper_msg(message, at=False, msg_ref = None):
chatgpt_res = ""
if chosen_provider == OPENAI_OFFICIAL:
# 检查指令
hit, command_result = command_openai_official.check_command(qq_msg, session_id, user_name)
print(f"{hit} {command_result}")
# hit: 是否触发指令
if hit:
if command_result != None and command_result[0]:
# 是否是画图模式
if len(command_result) == 3 and command_result[2] == 'image':
# hit: 是否触发指令.
if not hit:
# 请求ChatGPT获得结果
try:
chatgpt_res = chatgpt.text_chat(qq_msg, session_id)
if OPENAI_OFFICIAL in reply_prefix:
chatgpt_res = reply_prefix[OPENAI_OFFICIAL] + chatgpt_res
except (BaseException) as e:
print("[System-Err] OpenAI API错误。原因如下:\n"+str(e))
if 'exceeded' in str(e):
send_qq_msg(message, f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)")
return
else:
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, f"OpenAI API错误。原因如下:\n{f_res} \n前往官方频道反馈~")
return
elif chosen_provider == REV_CHATGPT:
hit, command_result = command_rev_chatgpt.check_command(qq_msg)
if not hit:
try:
chatgpt_res = str(rev_chatgpt.text_chat(qq_msg))
if REV_CHATGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_CHATGPT] + chatgpt_res
except BaseException as e:
print("[System-Err] Rev ChatGPT API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev ChatGPT API错误。原因如下: \n{str(e)} \n前往官方频道反馈~")
return
elif chosen_provider == REV_EDGEGPT:
hit, command_result = command_rev_edgegpt.check_command(qq_msg, client.loop)
if not hit:
try:
if rev_edgegpt.is_busy():
send_qq_msg(message, f"[RevBing] 正忙,请稍后再试",msg_ref=msg_ref)
return
else:
res, res_code = asyncio.run_coroutine_threadsafe(rev_edgegpt.text_chat(qq_msg), client.loop).result()
if res_code == 0: # bing不想继续话题,重置会话后重试。
send_qq_msg(message, f"Bing不想继续话题了, 正在自动重置会话并重试。", msg_ref=msg_ref)
asyncio.run_coroutine_threadsafe(rev_edgegpt.forget(), client.loop).result()
res, res_code = asyncio.run_coroutine_threadsafe(rev_edgegpt.text_chat(qq_msg), client.loop).result()
if res_code == 0: # bing还是不想继续话题,大概率说明提问有问题。
send_qq_msg(message, f"Bing仍然不想继续话题, 请检查您的提问。", msg_ref=msg_ref)
return
chatgpt_res = str(res)
if REV_EDGEGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_EDGEGPT] + chatgpt_res
except BaseException as e:
print("[System-Err] Rev NewBing API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev NewBing API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
return
# 指令回复
if hit:
# 检查指令. command_result是一个元组:(指令调用是否成功, 指令返回的文本结果, 指令类型)
if command_result != None:
command = command_result[2]
if command == "keyword":
with open("keyword.json", "r", encoding="utf-8") as f:
keywords = json.load(f)
if command_result[0]:
# 是否是画图指令
if len(command_result) == 3 and command_result[2] == 'draw':
for i in command_result[1]:
send_qq_msg(message, i, image_mode=True, msg_ref=command_result[2])
send_qq_msg(message, i, image_mode=True, msg_ref=msg_ref)
else:
try:
send_qq_msg(message, command_result[1], msg_ref=msg_ref)
@@ -433,77 +502,8 @@ def oper_msg(message, at=False, msg_ref = None):
send_qq_msg(message, t, msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{command_result[1]}", msg_ref=msg_ref)
return
# 请求chatGPT获得结果
try:
return
chatgpt_res = chatgpt.text_chat(qq_msg, session_id)
if OPENAI_OFFICIAL in reply_prefix:
chatgpt_res = reply_prefix[OPENAI_OFFICIAL] + chatgpt_res
except (BaseException) as e:
print("[System-Err] OpenAI API错误。原因如下:\n"+str(e))
if 'exceeded' in str(e):
send_qq_msg(message, f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)")
return
else:
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, f"OpenAI API错误。原因如下:\n{f_res} \n前往官方频道反馈~")
return
elif chosen_provider == REV_CHATGPT:
hit, command_result = command_rev_chatgpt.check_command(qq_msg)
if hit:
if command_result != None and command_result[0]:
try:
send_qq_msg(message, command_result[1], msg_ref=msg_ref)
except BaseException as e:
t = command_result[1].replace(".", " . ")
send_qq_msg(message, t, msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{command_result[1]}", msg_ref=msg_ref)
return
try:
chatgpt_res = str(rev_chatgpt.text_chat(qq_msg))
if REV_CHATGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_CHATGPT] + chatgpt_res
except BaseException as e:
print("[System-Err] Rev ChatGPT API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev ChatGPT API错误。原因如下: \n{str(e)} \n前往官方频道反馈~")
return
elif chosen_provider == REV_EDGEGPT:
hit, command_result = command_rev_edgegpt.check_command(qq_msg, client.loop)
if hit:
if command_result != None and command_result[0]:
try:
send_qq_msg(message, command_result[1], msg_ref=msg_ref)
except BaseException as e:
t = command_result[1].replace(".", " . ")
send_qq_msg(message, t, msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{command_result[1]}", msg_ref=msg_ref)
return
try:
if rev_edgegpt.is_busy():
send_qq_msg(message, f"[RevBing] 正忙,请稍后再试",msg_ref=msg_ref)
return
else:
res, res_code = asyncio.run_coroutine_threadsafe(rev_edgegpt.text_chat(qq_msg), client.loop).result()
if res_code == 0: # bing不想继续话题,重置会话后重试。
send_qq_msg(message, f"Bing不想继续话题了, 正在自动重置会话并重试。", msg_ref=msg_ref)
asyncio.run_coroutine_threadsafe(rev_edgegpt.forget(), client.loop).result()
res, res_code = asyncio.run_coroutine_threadsafe(rev_edgegpt.text_chat(qq_msg), client.loop).result()
if res_code == 0: # bing还是不想继续话题,大概率说明提问有问题。
send_qq_msg(message, f"Bing仍然不想继续话题, 请检查您的提问。", msg_ref=msg_ref)
return
chatgpt_res = str(res)
if REV_EDGEGPT in reply_prefix:
chatgpt_res = reply_prefix[REV_EDGEGPT] + chatgpt_res
except BaseException as e:
print("[System-Err] Rev NewBing API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev NewBing API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
return
# 记录日志
logf.write(f"{reply_prefix} {str(chatgpt_res)}\n")
logf.flush()
+21 -3
View File
@@ -27,6 +27,24 @@ class Command:
return True
return False
def keyword(self, message: str):
if len(message.split(" ")) != 3:
return True, "【设置关键词/关键指令回复】示例:\nkeyword hi 你好\n当发送hi的时候会回复你好\nkeyword /hi 你好\n当发送/hi时会回复你好", "keyword"
l = message.split(" ")
try:
if os.path.exists("keyword.json"):
with open("keyword.json", "r", encoding="utf-8") as f:
keyword = json.load(f)
keyword[l[1]] = l[2]
else:
keyword = {l[1]: l[2]}
with open("keyword.json", "w", encoding="utf-8") as f:
json.dump(keyword, f, ensure_ascii=False, indent=4)
return True, "设置成功: "+l[1]+" -> "+l[2], "keyword"
except BaseException as e:
return False, "设置失败: "+str(e), "keyword"
def update(self, message: str):
l = message.split(" ")
if len(l) == 1:
@@ -48,7 +66,7 @@ class Command:
index+=1
remote_commit_hash = origin.refs.master.commit.hexsha[:6]
return True, f"当前版本: {now_commit.hexsha[:6]}\n最新版本: {remote_commit_hash}\n\n最新3条commit:\n{str(commits_log)}\n使用update latest更新至最新版本\n"
return True, f"当前版本: {now_commit.hexsha[:6]}\n最新版本: {remote_commit_hash}\n\n最新3条commit:\n{str(commits_log)}\n使用update latest更新至最新版本\n", "update"
else:
if l[1] == "latest":
pash_tag = ""
@@ -83,7 +101,7 @@ class Command:
# os.execl(py, py, *sys.argv)
except BaseException as e:
return False, "更新失败: "+str(e)
return False, "更新失败: "+str(e), "update"
def reset(self):
return False
@@ -98,7 +116,7 @@ class Command:
return False
def help(self):
return True, f"[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\n指令面板:\nstatus 查看机器人key状态\ncount 查看机器人统计信息\nreset 重置会话\nhis 查看历史记录\ntoken 查看会话token数\nhelp 查看帮助\nset 人格指令菜单\nkey 动态添加key"
return True, f"[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\n指令面板:\nstatus 查看机器人key状态\ncount 查看机器人统计信息\nreset 重置会话\nhis 查看历史记录\ntoken 查看会话token数\nhelp 查看帮助\nset 人格指令菜单\nkey 动态添加key", "help"
def status(self):
return False
+20 -18
View File
@@ -31,13 +31,15 @@ class CommandOpenAIOfficial(Command):
return True, self.update(message)
elif self.command_start_with(message, ""):
return True, self.draw(message)
elif self.command_start_with(message, "keyword"):
return True, self.keyword(message)
return False, None
def reset(self, session_id: str):
self.provider.forget(session_id)
return True, "重置成功"
return True, "重置成功", "reset"
def his(self, message: str, session_id: str, name: str):
#分页,每页5条
@@ -49,17 +51,17 @@ class CommandOpenAIOfficial(Command):
# 检查是否有过历史记录
if session_id not in self.provider.session_dict:
msg = f"历史记录为空"
return True, msg
return True, msg, "his"
l = self.provider.session_dict[session_id]
max_page = len(l)//size_per_page + 1 if len(l)%size_per_page != 0 else len(l)//size_per_page
p = self.provider.get_prompts_by_cache_list(self.provider.session_dict[session_id], divide=True, paging=True, size=size_per_page, page=page)
return True, f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页"
return True, f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页", "his"
def token(self, session_id: str):
return True, f"会话的token数: {self.provider.get_user_usage_tokens(self.provider.session_dict[session_id])}\n系统最大缓存token数: {self.provider.max_tokens}"
return True, f"会话的token数: {self.provider.get_user_usage_tokens(self.provider.session_dict[session_id])}\n系统最大缓存token数: {self.provider.max_tokens}", "token"
def gpt(self):
return True, f"OpenAI GPT配置:\n {self.provider.chatGPT_configs}"
return True, f"OpenAI GPT配置:\n {self.provider.chatGPT_configs}", "gpt"
def status(self):
chatgpt_cfg_str = ""
@@ -79,52 +81,52 @@ class CommandOpenAIOfficial(Command):
sponsor = key_stat[key]['sponsor']
chatgpt_cfg_str += f" |-{index}: {key_stat[key]['used']}/{max} {sponsor}赞助{tag}\n"
index += 1
return True, f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens"
return True, f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens", "status"
def count(self):
guild_count, guild_msg_count, guild_direct_msg_count, session_count = self.provider.get_stat()
return True, f"当前会话数: {len(self.provider.session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}"
return True, f"当前会话数: {len(self.provider.session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}", "count"
def key(self, message: str, user_name: str):
l = message.split(" ")
if len(l) == 1:
msg = "感谢您赞助key,key为官方API使用,请以以下格式赞助:\n/key xxxxx"
return True, msg
return True, msg, "key"
key = l[1]
if self.provider.check_key(key):
self.provider.append_key(key, user_name)
return True, f"*★,°*:.☆( ̄▽ ̄)/$:*.°★* 。\n该Key被验证为有效。感谢{user_name}赞助~"
else:
return True, "该Key被验证为无效。也许是输入错误了,或者重试。"
return True, "该Key被验证为无效。也许是输入错误了,或者重试。", "key"
def unset(self, session_id: str):
self.provider.now_personality = {}
self.provider.forget(session_id)
return True, "已清除人格并重置历史记录。"
return True, "已清除人格并重置历史记录。", "unset"
def set(self, message: str, session_id: str):
l = message.split(" ")
if len(l) == 1:
return True, f"【由Github项目QQChannelChatGPT支持】\n\n【人格文本由PlexPt开源项目awesome-chatgpt-pr \
ompts-zh提供】\n\n这个是人格设置指令。\n设置人格: \n/set 人格名。例如/set 编剧\n人格列表: /set list\n人格详细信息: \
/set view 人格名\n自定义人格: /set 人格文本\n清除人格: /unset\n【当前人格】: {str(self.provider.now_personality)}"
/set view 人格名\n自定义人格: /set 人格文本\n清除人格: /unset\n【当前人格】: {str(self.provider.now_personality)}", "set"
elif l[1] == "list":
msg = "人格列表:\n"
for key in personalities.keys():
msg += f" |-{key}\n"
msg += '\n\n*输入/set view 人格名查看人格详细信息'
msg += '\n*不定时更新人格库,请及时更新本项目。'
return True, msg
return True, msg, "set"
elif l[1] == "view":
if len(l) == 2:
return True, "请输入/set view 人格名"
return True, "请输入/set view 人格名", "set"
ps = l[2].strip()
if ps in personalities:
msg = f"人格{ps}的详细信息:\n"
msg += f"{personalities[ps]}\n"
else:
msg = f"人格{ps}不存在"
return True, msg
return True, msg, "set"
else:
ps = l[1].strip()
if ps in personalities:
@@ -142,7 +144,7 @@ class CommandOpenAIOfficial(Command):
'single-tokens': 0
}
self.provider.session_dict[session_id].append(new_record)
return True, f"人格{ps}已设置."
return True, f"人格{ps}已设置.", "set"
else:
self.provider.now_personality = {
'name': '自定义人格',
@@ -158,17 +160,17 @@ class CommandOpenAIOfficial(Command):
}
self.provider.session_dict[session_id] = []
self.provider.session_dict[session_id].append(new_record)
return True, f"自定义人格已设置。 \n人格信息: {ps}"
return True, f"自定义人格已设置。 \n人格信息: {ps}", "set"
def draw(self, message):
try:
# 画图模式传回3个参数
img_url = self.provider.image_chat(message)
return True, img_url, "image"
return True, img_url, "draw"
except Exception as e:
if 'exceeded' in str(e):
return f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)"
return False, f"图片生成失败: {e}"
return False, f"图片生成失败: {e}", "draw"
+3 -1
View File
@@ -10,8 +10,10 @@ class CommandRevChatGPT(Command):
return True, self.help()
elif self.command_start_with(message, "update"):
return True, self.update(message)
elif self.command_start_with(message, "keyword"):
return True, self.keyword(message)
return False, None
def help(self):
return True, "[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\nRevChatGPT指令面板:\n当前语言模型RevChatGPT未实现任何指令\n"
return True, "[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\nRevChatGPT指令面板:\n当前语言模型RevChatGPT未实现任何指令\n", "help"
+5 -3
View File
@@ -12,16 +12,18 @@ class CommandRevEdgeGPT(Command):
return True, self.help()
elif self.command_start_with(message, "update"):
return True, self.update(message)
elif self.command_start_with(message, "keyword"):
return True, self.keyword(message)
return False, None
def reset(self, loop):
res = asyncio.run_coroutine_threadsafe(self.provider.forget(), loop).result()
print(res)
if res:
return res, "重置成功"
return res, "重置成功", "reset"
else:
return res, "重置失败"
return res, "重置失败", "reset"
def help(self):
return True, "[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\nRevBing指令面板:\nreset: 重置\nhelp: 帮助"
return True, "[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\nRevBing指令面板:\nreset: 重置\nhelp: 帮助", "help"