🎨: clean codes

This commit is contained in:
Soulter
2024-04-21 22:20:23 +08:00
parent 03bb932f8f
commit f66091e08f
22 changed files with 671 additions and 479 deletions
+33 -23
View File
@@ -3,30 +3,35 @@ import json
import util.general_utils as gu
import time
class FuncCallJsonFormatError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class FuncNotFoundError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return self.msg
class FuncCall():
def __init__(self, provider) -> None:
self.func_list = []
self.provider = provider
def add_func(self, name: str = None, func_args: list = None, desc: str = None, func_obj = None) -> None:
def add_func(self, name: str = None, func_args: list = None, desc: str = None, func_obj=None) -> None:
if name == None or func_args == None or desc == None or func_obj == None:
raise FuncCallJsonFormatError("name, func_args, desc must be provided.")
raise FuncCallJsonFormatError(
"name, func_args, desc must be provided.")
params = {
"type": "object", # hardcore here
"type": "object", # hardcore here
"properties": {}
}
for param in func_args:
@@ -51,7 +56,7 @@ class FuncCall():
"description": f["description"],
})
return json.dumps(_l, indent=intent, ensur_ascii=False)
def get_func(self) -> list:
_l = []
for f in self.func_list:
@@ -64,8 +69,8 @@ class FuncCall():
}
})
return _l
def func_call(self, question, func_definition, is_task = False, tasks = None, taskindex = -1, is_summary = True, session_id = None):
def func_call(self, question, func_definition, is_task=False, tasks=None, taskindex=-1, is_summary=True, session_id=None):
funccall_prompt = """
我正实现function call功能,该功能旨在让你变成给定的问题到给定的函数的解析器(意味着你不是创造函数)。
@@ -120,7 +125,8 @@ class FuncCall():
res = self.provider.text_chat(prompt, session_id)
if res.find('```') != -1:
res = res[res.find('```json') + 7: res.rfind('```')]
gu.log("REVGPT func_call json result", bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
gu.log("REVGPT func_call json result",
bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
print(res)
res = json.loads(res)
break
@@ -151,11 +157,13 @@ class FuncCall():
func_target = func["func_obj"]
break
if func_target == None:
raise FuncNotFoundError(f"Request function {func_name} not found.")
raise FuncNotFoundError(
f"Request function {func_name} not found.")
t_res = str(func_target(**args))
invoke_func_res += f"{func_name} 调用结果:\n```\n{t_res}\n```\n"
invoke_func_res_list.append(invoke_func_res)
gu.log(f"[FUNC| {func_name} invoked]", bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
gu.log(f"[FUNC| {func_name} invoked]",
bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
# print(str(t_res))
if is_summary:
@@ -181,12 +189,16 @@ class FuncCall():
try:
res = self.provider.text_chat(after_prompt, session_id)
# 截取```之间的内容
gu.log("DEBUG BEGIN", bg=gu.BG_COLORS["yellow"], fg=gu.FG_COLORS["white"])
gu.log(
"DEBUG BEGIN", bg=gu.BG_COLORS["yellow"], fg=gu.FG_COLORS["white"])
print(res)
gu.log("DEBUG END", bg=gu.BG_COLORS["yellow"], fg=gu.FG_COLORS["white"])
gu.log(
"DEBUG END", bg=gu.BG_COLORS["yellow"], fg=gu.FG_COLORS["white"])
if res.find('```') != -1:
res = res[res.find('```json') + 7: res.rfind('```')]
gu.log("REVGPT after_func_call json result", bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
res = res[res.find('```json') +
7: res.rfind('```')]
gu.log("REVGPT after_func_call json result",
bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
after_prompt_res = res
after_prompt_res = json.loads(after_prompt_res)
break
@@ -197,7 +209,8 @@ class FuncCall():
if "The message you submitted was too long" in str(e):
# 如果返回的内容太长了,那么就截取一部分
time.sleep(3)
invoke_func_res = invoke_func_res[:int(len(invoke_func_res) / 2)]
invoke_func_res = invoke_func_res[:int(
len(invoke_func_res) / 2)]
after_prompt = """
函数返回以下内容:"""+invoke_func_res+"""
请以AI助手的身份结合返回的内容对用户提问做详细全面的回答。
@@ -218,11 +231,13 @@ class FuncCall():
if "func_call_again" in after_prompt_res and after_prompt_res["func_call_again"]:
# 如果需要重新调用函数
# 重新调用函数
gu.log("REVGPT func_call_again", bg=gu.BG_COLORS["purple"], fg=gu.FG_COLORS["white"])
gu.log("REVGPT func_call_again",
bg=gu.BG_COLORS["purple"], fg=gu.FG_COLORS["white"])
res = self.func_call(question, func_definition)
return res, True
gu.log("REVGPT func callback:", bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
gu.log("REVGPT func callback:",
bg=gu.BG_COLORS["green"], fg=gu.FG_COLORS["white"])
# print(after_prompt_res["res"])
return after_prompt_res["res"], True
else:
@@ -230,8 +245,3 @@ class FuncCall():
else:
# print(res["res"])
return res["res"], False
+25 -15
View File
@@ -9,8 +9,8 @@ from readability import Document
from bs4 import BeautifulSoup
from openai.types.chat.chat_completion_message_tool_call import Function
from util.function_calling.func_call import (
FuncCall,
FuncCallJsonFormatError,
FuncCall,
FuncCallJsonFormatError,
FuncNotFoundError
)
from model.provider.provider import Provider
@@ -22,6 +22,7 @@ def tidy_text(text: str) -> str:
'''
return text.strip().replace("\n", " ").replace("\r", " ").replace(" ", " ")
def special_fetch_zhihu(link: str) -> str:
'''
function-calling 函数, 用于获取知乎文章的内容
@@ -43,6 +44,7 @@ def special_fetch_zhihu(link: str) -> str:
raise Exception("zhihu none")
return tidy_text(r.text)
def google_web_search(keyword) -> str:
'''
获取 google 搜索结果, 得到 title、desc、link
@@ -66,6 +68,7 @@ def google_web_search(keyword) -> str:
return web_keyword_search_via_bing(keyword)
return ret
def web_keyword_search_via_bing(keyword) -> str:
'''
获取bing搜索结果, 得到 title、desc、link
@@ -104,7 +107,8 @@ def web_keyword_search_via_bing(keyword) -> str:
res += f"# No.{str(result_cnt + 1)}\ntitle: {title}\nurl: {link}\ncontent: {desc}\n\n"
result_cnt += 1
if result_cnt > 5: break
if result_cnt > 5:
break
# if len(_detail_store) >= 3:
# continue
@@ -122,16 +126,18 @@ def web_keyword_search_via_bing(keyword) -> str:
except Exception as e:
print(f"bing parse err: {str(e)}")
if result_cnt == 0: break
if result_cnt == 0:
break
return res
except Exception as e:
# gu.log(f"bing fetch err: {str(e)}")
_cnt += 1
time.sleep(1)
# gu.log("fail to fetch bing info, using sougou.")
return web_keyword_search_via_sougou(keyword)
def web_keyword_search_via_sougou(keyword) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
@@ -141,7 +147,7 @@ def web_keyword_search_via_sougou(keyword) -> str:
response = requests.get(url, headers=headers)
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
res = []
results = soup.find("div", class_="results")
for i in results.find_all("div", class_="vrwrap"):
@@ -154,7 +160,7 @@ def web_keyword_search_via_sougou(keyword) -> str:
"title": title,
"link": link,
})
if len(res) >= 5: # 限制5条
if len(res) >= 5: # 限制5条
break
except Exception as e:
pass
@@ -173,6 +179,7 @@ def web_keyword_search_via_sougou(keyword) -> str:
ret += f"\n网页内容: {str(_detail_store)}"
return ret
def fetch_website_content(url):
# gu.log(f"fetch_website_content: {url}", tag="fetch_website_content", level=gu.LEVEL_DEBUG)
headers = {
@@ -188,6 +195,7 @@ def fetch_website_content(url):
ret = tidy_text(soup.get_text())
return ret
async def web_search(question, provider: Provider, session_id, official_fc=False):
'''
official_fc: 使用官方 function-calling
@@ -197,17 +205,17 @@ async def web_search(question, provider: Provider, session_id, official_fc=False
"type": "string",
"name": "keyword",
"description": "google search query (分词,尽量保留所有信息)"
}],
"通过搜索引擎搜索。如果问题需要获取近期、实时的消息,在网页上搜索(如天气、新闻或任何需要通过网页获取信息的问题),则调用此函数;如果没有,不要调用此函数。",
web_keyword_search_via_bing
}],
"通过搜索引擎搜索。如果问题需要获取近期、实时的消息,在网页上搜索(如天气、新闻或任何需要通过网页获取信息的问题),则调用此函数;如果没有,不要调用此函数。",
web_keyword_search_via_bing
)
new_func_call.add_func("fetch_website_content", [{
"type": "string",
"name": "url",
"description": "网址"
}],
"获取网页的内容。如果问题带有合法的网页链接(例如: `帮我总结一下 https://github.com 的内容`), 就调用此函数。如果没有,不要调用此函数。",
fetch_website_content
}],
"获取网页的内容。如果问题带有合法的网页链接(例如: `帮我总结一下 https://github.com 的内容`), 就调用此函数。如果没有,不要调用此函数。",
fetch_website_content
)
question1 = f"{question} \n> hint: 最多只能调用1个function, 并且存在不会调用任何function的可能性。"
has_func = False
@@ -282,9 +290,11 @@ async def web_search(question, provider: Provider, session_id, official_fc=False
except Exception as e:
print(e)
_c += 1
if _c == 3: raise e
if _c == 3:
raise e
if "The message you submitted was too long" in str(e):
await provider.forget(session_id)
function_invoked_ret = function_invoked_ret[:int(len(function_invoked_ret) / 2)]
function_invoked_ret = function_invoked_ret[:int(
len(function_invoked_ret) / 2)]
time.sleep(3)
return function_invoked_ret