refactor: 重构部分代码 #61

This commit is contained in:
Soulter
2023-04-01 01:02:16 +08:00
parent 1a137a8639
commit e6f6bee7ee
7 changed files with 403 additions and 305 deletions
+106 -295
View File
@@ -167,6 +167,7 @@ def upload():
def initBot(cfg, prov):
global chatgpt, provider, rev_chatgpt, baidu_judge, rev_ernie, rev_edgegpt
global reply_prefix, now_personality, gpt_config, config, uniqueSession, frequency_count, frequency_time,announcement, direct_message_mode, version
global command_openai_official, command_rev_chatgpt, command_rev_edgegpt
provider = prov
config = cfg
@@ -177,37 +178,42 @@ def initBot(cfg, prov):
# 语言模型提供商
if prov == REV_CHATGPT:
if 'account' in cfg['rev_ChatGPT']:
from addons.revChatGPT.revchatgpt import revChatGPT
from model.provider.provider_rev_chatgpt import ProviderRevChatGPT
from model.command.command_rev_chatgpt import CommandRevChatGPT
for i in range(0, len(cfg['rev_ChatGPT']['account'])):
try:
print(f"[System] 创建rev_ChatGPT负载{str(i)}: " + str(cfg['rev_ChatGPT']['account'][i]))
revstat = {
'obj': revChatGPT(cfg['rev_ChatGPT']['account'][i]),
'obj': ProviderRevChatGPT(cfg['rev_ChatGPT']['account'][i]),
'busy': False
}
rev_chatgpt.append(revstat)
except:
print("[System] 创建rev_ChatGPT负载失败")
command_rev_chatgpt = CommandRevChatGPT(rev_chatgpt)
if REV_CHATGPT in reply_prefix_config:
reply_prefix = reply_prefix_config[REV_CHATGPT]
else:
input("[System-err] 请退出本程序, 然后在配置文件中填写rev_ChatGPT相关配置")
elif prov == OPENAI_OFFICIAL:
from model.provider.provider_openai_official import ProviderOpenAIOfficial
from model.command.command_openai_official import CommandOpenAIOfficial
chatgpt = ProviderOpenAIOfficial(cfg['openai'])
command_openai_official = CommandOpenAIOfficial(chatgpt)
if OPENAI_OFFICIAL in reply_prefix_config:
reply_prefix = reply_prefix_config[OPENAI_OFFICIAL]
elif prov == REV_ERNIE:
from addons.revERNIE import revernie
rev_ernie = revernie.wx
# elif prov == REV_ERNIE:
# from addons.revERNIE import revernie
# rev_ernie = revernie.wx
elif prov == REV_EDGEGPT:
from addons.revEdgeGPT import revedgegpt
rev_edgegpt = revedgegpt.revEdgeGPT()
from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT
from model.command.command_rev_edgegpt import CommandRevEdgeGPT
rev_edgegpt = ProviderRevEdgeGPT()
command_rev_edgegpt = CommandRevEdgeGPT(rev_edgegpt)
if REV_EDGEGPT in reply_prefix_config:
reply_prefix = reply_prefix_config[REV_EDGEGPT]
# 百度内容审核
if 'baidu_aip' in cfg and 'enable' in cfg['baidu_aip'] and cfg['baidu_aip']['enable']:
try:
@@ -357,34 +363,6 @@ def send_qq_msg(message, res, image_mode=False, msg_ref = None):
asyncio.run_coroutine_threadsafe(message.reply(file_image='tmp_image.jpg', content=""), client.loop)
'''
获取缓存的会话
'''
def get_prompts_by_cache_list(cache_data_list, divide=False, paging=False, size=5, page=1):
prompts = ""
if paging:
page_begin = (page-1)*size
page_end = page*size
if page_begin < 0:
page_begin = 0
if page_end > len(cache_data_list):
page_end = len(cache_data_list)
cache_data_list = cache_data_list[page_begin:page_end]
for item in cache_data_list:
prompts += str(item['user']['role']) + ":\n" + str(item['user']['content']) + "\n"
prompts += str(item['AI']['role']) + ":\n" + str(item['AI']['content']) + "\n"
if divide:
prompts += "----------\n"
return prompts
def get_user_usage_tokens(cache_list):
usage_tokens = 0
for item in cache_list:
usage_tokens += int(item['single_tokens'])
return usage_tokens
'''
检查发言频率
'''
@@ -446,21 +424,21 @@ def oper_msg(message, at=False, msg_ref = None):
else:
name = "频道"
command_type = -1
# 特殊指令
if qq_msg == "/继续":
qq_msg = "继续"
# 普通指令
else:
# 如果第一个字符是/,则为指令
if qq_msg[0] == "/":
res, go, command_type = command_oper(qq_msg, message, session_id, name, user_id, user_name, at)
send_qq_msg(message, res)
if not go:
return
if command_type == 1 and 'prompt' in now_personality:
# 设置人格
qq_msg = now_personality['prompt']
# command_type = -1
# # 特殊指令
# if qq_msg == "/继续":
# qq_msg = "继续"
# # 普通指令
# else:
# # 如果第一个字符是/,则为指令
# if qq_msg[0] == "/":
# res, go, command_type = command_oper(qq_msg, message, session_id, name, user_id, user_name, at)
# send_qq_msg(message, res)
# if not go:
# return
# if command_type == 1 and 'prompt' in now_personality:
# # 设置人格
# qq_msg = now_personality['prompt']
# 这里是预设,你可以按需更改
if qq_msg.strip() == 'hello' or qq_msg.strip() == '你好' or qq_msg.strip() == '':
@@ -488,66 +466,60 @@ def oper_msg(message, at=False, msg_ref = None):
send_qq_msg(message, f"你的提问得到的回复未通过【百度AI内容审核】服务,不予回复。\n\n{msg}", msg_ref=msg_ref)
return
chatgpt_res = "[Error] 占位符"
chatgpt_res = ""
if provider == OPENAI_OFFICIAL:
# ChatGPT API 回复倾向(人格)
# if command_type == 1:
# record_obj["user"]["role"] = "system"
# record_obj_img["user"]["role"] = "system"
if qq_msg[0] == '':
print("[Debug] 画图模式")
# 请求chatGPT获得结果
try:
chatgpt_res = chatgpt.image_chat(qq_msg)
# print(chatgpt_res)
for i in range(len(chatgpt_res)):
send_qq_msg(message, chatgpt_res[i], image_mode=True)
# print(chatgpt_res)
except (BaseException) as e:
print("[System-Err] OpenAI API错误。原因如下:\n" + str(e))
if 'exceeded' in str(e):
send_qq_msg(message,
f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)")
return
else:
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e),
flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, f"OpenAI API错误。原因如下:\n{f_res} \n前往官方频道反馈~")
return
else:
# 请求chatGPT获得结果
try:
chatgpt_res = reply_prefix + chatgpt.text_chat(qq_msg, session_id)
except (BaseException) as e:
print("[System-Err] OpenAI API错误。原因如下:\n"+str(e))
if 'exceeded' in str(e):
send_qq_msg(message, f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)")
return
else:
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, f"OpenAI API错误。原因如下:\n{f_res} \n前往官方频道反馈~")
return
hit, command_result = command_openai_official.check_command(qq_msg, session_id, user_name)
print(f"{hit} {command_result}")
if hit:
if command_result != None and command_result[0]:
send_qq_msg(message, command_result[1], msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{command_result[1]}", msg_ref=msg_ref)
return
# 请求chatGPT获得结果
try:
chatgpt_res = reply_prefix + chatgpt.text_chat(qq_msg, session_id)
except (BaseException) as e:
print("[System-Err] OpenAI API错误。原因如下:\n"+str(e))
if 'exceeded' in str(e):
send_qq_msg(message, f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)")
return
else:
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, f"OpenAI API错误。原因如下\n{f_res} \n前往官方频道反馈~")
return
elif provider == REV_CHATGPT:
hit, success, res = command_rev_chatgpt.check_command(qq_msg)
if hit:
if success:
send_qq_msg(message, res, msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{res}", msg_ref=msg_ref)
return
try:
chatgpt_res = reply_prefix+str(get_rev_ChatGPT_response(qq_msg))
except BaseException as e:
print("[System-Err] Rev ChatGPT API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev ChatGPT API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
return
elif provider == REV_ERNIE:
try:
chatgpt_res = reply_prefix+str(rev_ernie.chatViaSelenium(qq_msg))
except BaseException as e:
print("[System-Err] Rev ERNIE API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev ERNIE API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
return
# elif provider == REV_ERNIE:
# try:
# chatgpt_res = reply_prefix+str(rev_ernie.chatViaSelenium(qq_msg))
# except BaseException as e:
# print("[System-Err] Rev ERNIE API错误。原因如下:\n"+str(e))
# send_qq_msg(message, f"Rev ERNIE API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
# return
elif provider == REV_EDGEGPT:
hit, success, res = command_rev_chatgpt.check_command(qq_msg)
if hit:
if success:
send_qq_msg(message, res, msg_ref=msg_ref)
else:
send_qq_msg(message, f"指令调用错误: \n{res}", msg_ref=msg_ref)
return
try:
if rev_edgegpt.is_busy():
send_qq_msg(message, f"[RevBing] 正忙,请稍后再试",msg_ref=msg_ref)
@@ -559,49 +531,48 @@ def oper_msg(message, at=False, msg_ref = None):
print("[System-Err] Rev NewBing API错误。原因如下:\n"+str(e))
send_qq_msg(message, f"Rev NewBing API错误。原因如下:\n{str(e)} \n前往官方频道反馈~")
return
# 记录日志
logf.write(f"{reply_prefix} {str(chatgpt_res)}\n")
logf.flush()
if qq_msg[0] != '':
# 敏感过滤
# 过滤不合适的词
judged_res = chatgpt_res
for i in uw.unfit_words:
judged_res = re.sub(i, "***", judged_res)
# 百度内容审核服务二次审核
if baidu_judge != None:
check, msg = baidu_judge.judge(judged_res)
if not check:
send_qq_msg(message, f"你的提问得到的回复【百度内容审核】未通过,不予回复。\n\n{msg}", msg_ref=msg_ref)
return
# 发送qq信息
# 敏感过滤
# 过滤不合适的词
judged_res = chatgpt_res
for i in uw.unfit_words:
judged_res = re.sub(i, "***", judged_res)
# 百度内容审核服务二次审核
if baidu_judge != None:
check, msg = baidu_judge.judge(judged_res)
if not check:
send_qq_msg(message, f"你的提问得到的回复【百度内容审核】未通过,不予回复。\n\n{msg}", msg_ref=msg_ref)
return
# 发送qq信息
try:
# 防止被qq频道过滤消息
gap_chatgpt_res = judged_res.replace(".", " . ")
send_qq_msg(message, ''+gap_chatgpt_res, msg_ref=msg_ref)
# 发送信息
except BaseException as e:
print("QQ频道API错误: \n"+str(e))
f_res = ""
for t in chatgpt_res:
f_res += t + ' '
try:
# 防止被qq频道过滤消息
gap_chatgpt_res = judged_res.replace(".", " . ")
send_qq_msg(message, ''+gap_chatgpt_res, msg_ref=msg_ref)
# 发送信息
send_qq_msg(message, ''+f_res, msg_ref=msg_ref)
# send(message, f"QQ频道API错误:{str(e)}\n下面是格式化后的回答:\n{f_res}")
except BaseException as e:
print("QQ频道API错误: \n"+str(e))
f_res = ""
for t in chatgpt_res:
f_res += t + ' '
try:
send_qq_msg(message, ''+f_res, msg_ref=msg_ref)
# send(message, f"QQ频道API错误:{str(e)}\n下面是格式化后的回答:\n{f_res}")
except BaseException as e:
# 如果还是不行则过滤url
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, ''+f_res, msg_ref=msg_ref)
# send(message, f"QQ频道API错误:{str(e)}\n下面是格式化后的回答:\n{f_res}")
# 如果还是不行则过滤url
f_res = re.sub(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b', '[被隐藏的链接]', str(e), flags=re.MULTILINE)
f_res = f_res.replace(".", "·")
send_qq_msg(message, ''+f_res, msg_ref=msg_ref)
# send(message, f"QQ频道API错误:{str(e)}\n下面是格式化后的回答:\n{f_res}")
'''
获取统计信息
'''
def get_stat():
def get_stat(self):
try:
f = open(abs_path+"configs/stat", "r", encoding="utf-8")
fjson = json.loads(f.read())
@@ -624,164 +595,4 @@ def get_stat():
session_count += 1
return guild_count, guild_msg_count, guild_direct_msg_count, session_count
except:
return -1, -1, -1, -1
'''
指令处理
'''
def command_oper(qq_msg, message, session_id, name, user_id, user_name, at):
go = False # 是否处理完指令后继续执行msg_oper后面的代码
msg = ''
global session_dict, now_personality, provider, rev_edgegpt, client
# 指令返回值,/set设置人格是1
type = -1
# 指令控制
if qq_msg == "/reset" or qq_msg == "/重置":
msg = ''
if provider == REV_EDGEGPT and rev_edgegpt is not None:
asyncio.run_coroutine_threadsafe(rev_edgegpt.reset(), client.loop).result()
else:
session_dict[session_id] = []
if at:
msg = f"{name}(id: {session_id})的历史记录重置成功\n\n{announcement}"
else:
msg = f"你的历史记录重置成功"
if qq_msg[:4] == "/his":
if provider == REV_CHATGPT:
msg = "[QQChannelChatGPT]当前使用的语言模型提供商是Rev_ChatGPT, 不支持查看历史记录"
return msg, go, type
#分页,每页5条
msg = ''
size_per_page = 3
page = 1
if qq_msg[5:]:
page = int(qq_msg[5:])
# 检查是否有过历史记录
if session_id not in session_dict:
msg = f"{name} 的历史记录为空"
l = session_dict[session_id]
max_page = len(l)//size_per_page + 1 if len(l)%size_per_page != 0 else len(l)//size_per_page
p = get_prompts_by_cache_list(session_dict[session_id], divide=True, paging=True, size=size_per_page, page=page)
if at:
msg=f"{name}的历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页"
else:
msg=f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页\n\n{announcement}"
if qq_msg == "/token":
if provider == REV_CHATGPT:
msg = "[QQChannelChatGPT]当前使用的语言模型提供商是Rev_ChatGPT, 不支持使用此指令"
return msg, go, type
msg = ''
if at:
msg=f"{name} 会话的token数: {get_user_usage_tokens(session_dict[session_id])}\n系统最大缓存token数: {max_tokens}"
else:
msg=f"会话的token数: {get_user_usage_tokens(session_dict[session_id])}\n系统最大缓存token数: {max_tokens}"
if qq_msg == '/gpt':
if provider == REV_CHATGPT:
msg = "[QQChannelChatGPT]当前使用的语言模型提供商是Rev_ChatGPT, 不支持使用此指令"
return msg, go, type
global gpt_config
msg=f"OpenAI GPT配置:\n {gpt_config}"
if qq_msg == "/status" or qq_msg == "/状态":
chatgpt_cfg_str = ""
key_stat = chatgpt.get_key_stat()
key_list = chatgpt.get_key_list()
index = 1
max = 9000000
gg_count = 0
total = 0
tag = ''
for key in key_stat.keys():
sponsor = ''
total += key_stat[key]['used']
if key_stat[key]['exceed']:
gg_count += 1
continue
if 'sponsor' in key_stat[key]:
sponsor = key_stat[key]['sponsor']
chatgpt_cfg_str += f" |-{index}: {key_stat[key]['used']}/{max} {sponsor}赞助{tag}\n"
index += 1
msg = f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens\n{announcement}"
if qq_msg == "/count" or qq_msg == "/统计":
guild_count, guild_msg_count, guild_direct_msg_count, session_count = get_stat()
msg = f"当前会话数: {len(session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}"
if qq_msg == "/help":
ol_version = 'Unknown'
try:
global version
res = requests.get("https://soulter.top/channelbot/update.json")
res_obj = json.loads(res.text)
ol_version = res_obj['version']
except BaseException:
pass
msg = f"[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\n当前版本:{version}\n最新版本:{str(ol_version)}\n请及时更新!\n\n指令面板:\n/status 查看机器人key状态\n/count 查看机器人统计信息\n/reset 重置会话\n/his 查看历史记录\n/token 查看会话token数\n/help 查看帮助\n/set 人格指令菜单\n/key 动态添加key"
if qq_msg[:4] == "/key":
if len(qq_msg) == 4:
msg = "感谢您赞助key,key为官方API使用,请以以下格式赞助:\n/key xxxxx"
key = qq_msg[5:]
send_qq_msg(message, "收到!正在核验...")
if chatgpt.check_key(key):
msg = f"*★,°*:.☆( ̄▽ ̄)/$:*.°★* 。\n该Key被验证为有效。感谢{user_name}赞助~"
chatgpt.append_key(key, user_name)
else:
msg = "该Key被验证为无效。也许是输入错误了,或者重试。"
if qq_msg[:6] == "/unset":
now_personality = {}
session_dict[session_id] = []
msg = "已清除人格并重置历史记录。"
if qq_msg[:4] == "/set":
if len(qq_msg) == 4:
np = ''
if "name" in now_personality:
np=now_personality["name"]
msg = f"【由Github项目QQChannelChatGPT支持】\n\n【人格文本由PlexPt开源项目awesome-chatgpt-prompts-zh提供】\n\n这个是人格设置指令。\n设置人格: \n/set 人格名。例如/set 编剧\n人格列表: /set list\n人格详细信息: /set view 人格名\n自定义人格: /set 人格文本\n清除人格: /unset\n【当前人格】: {np}"
elif qq_msg[5:] == "list":
per_dict = personalities
msg = "人格列表:\n"
for key in per_dict.keys():
msg += f" |-{key}\n"
msg += '\n\n*输入/set view 人格名查看人格详细信息'
msg += '\n\n*不定时更新人格库,请及时更新本项目。'
elif qq_msg[5:9] == "view":
ps = qq_msg[10:]
ps = ps.strip()
per_dict = personalities
if ps in per_dict:
msg = f"人格{ps}的详细信息:\n"
msg += f"{per_dict[ps]}\n"
else:
msg = f"人格{ps}不存在"
else:
ps = qq_msg[5:]
ps = ps.strip()
per_dict = personalities
if ps in per_dict:
now_personality = {
'name': ps,
'prompt': per_dict[ps]
}
session_dict[session_id] = []
msg = f"人格{ps}已设置,请耐心等待机器人回复第一条信息。"
go = True
type = 1
else:
now_personality = {
'name': '自定义人格',
'prompt': ps
}
session_dict[session_id] = []
msg = f"你的自定义人格已设置。 \n人格信息: {ps}\n请耐心等待机器人回复第一条信息。"
go = True
type = 1
return msg, go, type
return -1, -1, -1, -1
+26 -5
View File
@@ -1,15 +1,27 @@
import abc
import json
import requests
from model.provider.provider import Provider
class Command:
def __init__(self):
pass
def __init__(self, provider: Provider):
self.provider = Provider
@abc.abstractmethod
def check_command(self, message):
pass
if message.startswith("reset") or message.startswith("重置"):
return True, self.reset()
elif message.startswith("help") or message.startswith("帮助"):
return True, self.help()
return False, None
def reset(self):
return False
res = self.provider.forget()
if res:
return res, "重置成功"
else:
return res, "重置失敗"
def set(self):
return False
@@ -21,7 +33,16 @@ class Command:
return False
def help(self):
return False
ol_version = 'Unknown'
try:
global version
res = requests.get("https://soulter.top/channelbot/update.json")
res_obj = json.loads(res.text)
ol_version = res_obj['version']
except BaseException:
pass
return True, f"[Github项目名: QQChannelChatGPT,有问题请前往提交issue,欢迎Star此项目~]\n\n当前版本:{version}\n最新版本:{str(ol_version)}\n请及时更新!\n\n指令面板:\n/status 查看机器人key状态\n/count 查看机器人统计信息\n/reset 重置会话\n/his 查看历史记录\n/token 查看会话token数\n/help 查看帮助\n/set 人格指令菜单\n/key 动态添加key"
def status(self):
return False
+169
View File
@@ -0,0 +1,169 @@
from model.command.command import Command
from model.provider.provider_openai_official import ProviderOpenAIOfficial
from cores.qqbot.personality import personalities
class CommandOpenAIOfficial(Command):
def __init__(self, provider: ProviderOpenAIOfficial):
self.provider = provider
def check_command(self, message: str, session_id: str, user_name: str):
if message.startswith("reset") or message.startswith("重置"):
return True, self.reset(session_id)
elif message.startswith("his") or message.startswith("历史"):
return True, self.his(message)
elif message.startswith("token"):
return True, self.token(session_id)
elif message.startswith("gpt"):
return True, self.gpt()
elif message.startswith("status") or message.startswith("状态"):
return True, self.status()
elif message.startswith("count") or message.startswith("统计"):
return True, self.count()
elif message.startswith("help") or message.startswith("帮助"):
return True, self.help()
elif message.startswith("key") or message.startswith("动态添加key"):
return True, self.key(message, user_name)
elif message.startswith("unset"):
return True, self.set(session_id)
elif message.startswith("set"):
return True, self.set(message, session_id)
elif message.startswith(""):
return True, self.draw(message)
return False, None
def reset(self, session_id: str):
self.provider.forget(session_id)
return True, "重置成功"
def his(self, message: str, session_id: str, name: str):
#分页,每页5条
msg = ''
size_per_page = 3
page = 1
if message[4:]:
page = int(message[4:])
# 检查是否有过历史记录
if session_id not in self.provider.session_dict:
msg = f"{name} 的历史记录为空"
l = self.provider.session_dict[session_id]
max_page = len(l)//size_per_page + 1 if len(l)%size_per_page != 0 else len(l)//size_per_page
p = self.provider.get_prompts_by_cache_list(self.provider.session_dict[session_id], divide=True, paging=True, size=size_per_page, page=page)
return True, f"历史记录如下:\n{p}\n{page}页 | 共{max_page}\n*输入/his 2跳转到第2页"
def token(self, session_id: str):
return True, f"会话的token数: {self.provider.get_user_usage_tokens(self.provider.session_dict[session_id])}\n系统最大缓存token数: {self.provider.max_tokens}"
def gpt(self):
return True, f"OpenAI GPT配置:\n {self.provider.chatGPT_configs}"
def status(self):
chatgpt_cfg_str = ""
key_stat = self.provider.get_key_stat()
index = 1
max = 9000000
gg_count = 0
total = 0
tag = ''
for key in key_stat.keys():
sponsor = ''
total += key_stat[key]['used']
if key_stat[key]['exceed']:
gg_count += 1
continue
if 'sponsor' in key_stat[key]:
sponsor = key_stat[key]['sponsor']
chatgpt_cfg_str += f" |-{index}: {key_stat[key]['used']}/{max} {sponsor}赞助{tag}\n"
index += 1
return True, f"⭐使用情况({str(gg_count)}个已用):\n{chatgpt_cfg_str}⏰全频道已用{total}tokens"
def count(self):
guild_count, guild_msg_count, guild_direct_msg_count, session_count = self.provider.get_stat()
return True, f"当前会话数: {len(self.provider.session_dict)}\n共有频道数: {guild_count} \n共有消息数: {guild_msg_count}\n私信数: {guild_direct_msg_count}\n历史会话数: {session_count}"
def key(self, message: str, user_name: str):
l = message.split(" ")
if len(l) == 1:
msg = "感谢您赞助key,key为官方API使用,请以以下格式赞助:\n/key xxxxx"
return True, msg
key = l[1]
if self.provider.check_key(key):
self.provider.append_key(key, user_name)
return True, f"*★,°*:.☆( ̄▽ ̄)/$:*.°★* 。\n该Key被验证为有效。感谢{user_name}赞助~"
else:
return True, "该Key被验证为无效。也许是输入错误了,或者重试。"
def unset(self, session_id: str):
self.provider.now_personality = {}
self.provider.forget(session_id)
return True, "已清除人格并重置历史记录。"
def set(self, message: str, session_id: str):
l = message.split(" ")
if len(l) == 1:
return True, f"【由Github项目QQChannelChatGPT支持】\n\n【人格文本由PlexPt开源项目awesome-chatgpt-prompts-zh提供】\n\n这个是人格设置指令。\n设置人格: \n/set 人格名。例如/set 编剧\n人格列表: /set list\n人格详细信息: /set view 人格名\n自定义人格: /set 人格文本\n清除人格: /unset\n【当前人格】: {np}"
elif l[1] == "list":
msg = "人格列表:\n"
for key in personalities.keys():
msg += f" |-{key}\n"
msg += '\n\n*输入/set view 人格名查看人格详细信息'
msg += '\n*不定时更新人格库,请及时更新本项目。'
return True, msg
elif l[1] == "view":
if len(l) == 2:
return True, "请输入/set view 人格名"
ps = l[2].strip()
if ps in personalities:
msg = f"人格{ps}的详细信息:\n"
msg += f"{personalities[ps]}\n"
else:
msg = f"人格{ps}不存在"
return True, msg
else:
ps = l[1].strip()
if ps in personalities:
self.provider.now_personality = {
'name': ps,
'prompt': personalities[ps]
}
self.provider.session_dict[session_id] = []
new_record = {
"user": {
"role": "system",
"content": personalities[ps],
},
'usage_tokens': 0,
'single-tokens': 0
}
self.provider.session_dict[session_id].append(new_record)
return True, f"人格{ps}已设置."
else:
self.provider.now_personality = {
'name': '自定义人格',
'prompt': ps
}
new_record = {
"user": {
"role": "system",
"content": ps,
},
'usage_tokens': 0,
'single-tokens': 0
}
self.provider.session_dict[session_id] = []
self.provider.session_dict[session_id].append(new_record)
return True, f"自定义人格已设置。 \n人格信息: {ps}"
def draw(self, message):
try:
img_url = self.provider.image_chat(message)
return True, img_url
except Exception as e:
if 'exceeded' in str(e):
return f"OpenAI API错误。原因:\n{str(e)} \n超额了。可自己搭建一个机器人(Github仓库:QQChannelChatGPT)"
return False, f"图片生成失败: {e}"
+16
View File
@@ -0,0 +1,16 @@
from model.command.command import Command
from model.provider.provider_rev_chatgpt import ProviderRevChatGPT
class CommandRevChatGPT(Command):
def __init__(self, provider: ProviderRevChatGPT):
super().__init__(provider)
def check_command(self, message: str):
hit, res = super().check_command(message)
if hit:
return res
return False, None
+14
View File
@@ -0,0 +1,14 @@
from model.command.command import Command
from model.provider.provider_rev_edgegpt import ProviderRevEdgeGPT
class CommandRevEdgeGPT(Command):
def __init__(self, provider: ProviderRevEdgeGPT):
super().__init__(provider)
def check_command(self, message: str):
hit, res = super().check_command(message)
if hit:
return res
return False, None
+69 -5
View File
@@ -63,6 +63,10 @@ class ProviderOpenAIOfficial(Provider):
# 创建转储定时器线程
threading.Thread(target=self.dump_history, daemon=True).start()
# 人格
self.now_personality = {}
# 转储历史记录的定时器~ Soulter
def dump_history(self):
time.sleep(10)
@@ -149,7 +153,7 @@ class ProviderOpenAIOfficial(Provider):
while t > self.max_tokens:
if index >= len(cache_data_list):
break
# 保留倾向(人格信息
# 保留人格信息
if 'user' in cache_data_list[index] and cache_data_list[index]['user']['role'] != 'system':
t -= int(cache_data_list[index]['single_tokens'])
del cache_data_list[index]
@@ -211,7 +215,67 @@ class ProviderOpenAIOfficial(Provider):
raise BaseException("连接超时")
return image_url
def forget(self, session_id) -> bool:
self.session_dict[session_id] = []
return True
'''
获取缓存的会话
'''
def get_prompts_by_cache_list(cache_data_list, divide=False, paging=False, size=5, page=1):
prompts = ""
if paging:
page_begin = (page-1)*size
page_end = page*size
if page_begin < 0:
page_begin = 0
if page_end > len(cache_data_list):
page_end = len(cache_data_list)
cache_data_list = cache_data_list[page_begin:page_end]
for item in cache_data_list:
prompts += str(item['user']['role']) + ":\n" + str(item['user']['content']) + "\n"
prompts += str(item['AI']['role']) + ":\n" + str(item['AI']['content']) + "\n"
if divide:
prompts += "----------\n"
return prompts
def get_user_usage_tokens(cache_list):
usage_tokens = 0
for item in cache_list:
usage_tokens += int(item['single_tokens'])
return usage_tokens
'''
获取统计信息
'''
def get_stat(self):
try:
f = open(abs_path+"configs/stat", "r", encoding="utf-8")
fjson = json.loads(f.read())
f.close()
guild_count = 0
guild_msg_count = 0
guild_direct_msg_count = 0
for k,v in fjson.items():
guild_count += 1
guild_msg_count += v['count']
guild_direct_msg_count += v['direct_count']
session_count = 0
f = open(abs_path+"configs/session", "r", encoding="utf-8")
fjson = json.loads(f.read())
f.close()
for k,v in fjson.items():
session_count += 1
return guild_count, guild_msg_count, guild_direct_msg_count, session_count
except:
return -1, -1, -1, -1
# 包装信息
def wrap(self, prompt, session_id):
# 获得缓存信息
@@ -226,12 +290,12 @@ class ProviderOpenAIOfficial(Provider):
}
req_list = []
for i in context:
req_list.append(i['user'])
req_list.append(i['AI'])
if 'user' in i:
req_list.append(i['user'])
if 'AI' in i:
req_list.append(i['AI'])
req_list.append(new_record['user'])
return context, new_record, req_list
def handle_switch_key(self, req):
# messages = [{"role": "user", "content": prompt}]
+3
View File
@@ -11,6 +11,9 @@ class ProviderRevEdgeGPT(Provider):
cookies = json.load(f)
self.bot = Chatbot(cookies=cookies)
def is_busy(self):
return self.busy
async def forget(self):
try:
await self.bot.reset()