This commit is contained in:
2025-08-31 08:44:03 +08:00
parent 09a5aa166e
commit 6b241749ae
3 changed files with 29 additions and 118 deletions

View File

@@ -1,7 +1,9 @@
from openai import OpenAI, AsyncOpenAI
from Config.Config import *
import sys
from openai import AsyncOpenAI
from Config.Config import *
# 保留原有的prompt定义
# prompt=""" |
@@ -46,6 +48,7 @@ async def get_xueban_response_async(query_text: str, stream: bool = True):
绝不:
- 长篇大论,叽叽歪歪
- 长时间严肃对话
- 每次回答不要太长控制在3分钟以内
"""
# 打开文件读取知识内容
f = open(r"D:\dsWork\dsProject\dsLightRag\static\YunXiao.txt", "r", encoding="utf-8")
@@ -87,117 +90,3 @@ async def get_xueban_response_async(query_text: str, stream: bool = True):
except Exception as e:
print(f"大模型请求异常: {str(e)}", file=sys.stderr)
yield f"处理请求时发生异常: {str(e)}"
# 同步获取大模型响应
def get_xueban_response(query_text: str, stream: bool = True):
"""
获取学伴角色的大模型响应
@param query_text: 查询文本
@param stream: 是否使用流式输出
@return: 完整响应文本
"""
client = OpenAI(
api_key=LLM_API_KEY,
base_url=LLM_BASE_URL,
)
# 创建请求
completion = client.chat.completions.create(
model=LLM_MODEL_NAME,
messages=[
{'role': 'system', 'content': prompt.strip()},
{'role': 'user', 'content': query_text}
],
stream=stream
)
full_response = []
if stream:
for chunk in completion:
# 提取当前块的内容
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response.append(content)
# 实时输出内容,不换行
print(content, end='', flush=True)
else:
# 非流式处理
full_response.append(completion.choices[0].message.content)
return ''.join(full_response)
# 测试用例 main 函数
def main():
"""
测试学伴工具接口的主函数
"""
print("===== 测试学伴工具接口 =====")
# 测试同步接口
test_sync_interface()
# 测试异步接口
import asyncio
print("\n测试异步接口...")
asyncio.run(test_async_interface())
print("\n===== 测试完成 =====")
def test_sync_interface():
"""测试同步接口"""
print("\n测试同步接口...")
# 测试问题
questions = [
"你是谁?",
"讲个冷笑话",
"你男朋友是做什么的?"
]
for question in questions:
print(f"\n问题: {question}")
try:
# 调用同步接口获取响应
print("获取学伴响应中...")
response = get_xueban_response(question, stream=False)
print(f"学伴响应: {response}")
# 简单验证响应
assert response.strip(), "响应内容为空"
print("✅ 同步接口测试通过")
except Exception as e:
print(f"❌ 同步接口测试失败: {str(e)}")
async def test_async_interface():
"""测试异步接口"""
# 测试问题
questions = [
"你是谁?",
"讲个冷笑话",
"你男朋友是做什么的?"
]
for question in questions:
print(f"\n问题: {question}")
try:
# 调用异步接口获取响应
print("获取学伴响应中...")
response_generator = get_xueban_response_async(question, stream=False)
response = ""
async for chunk in response_generator:
response += chunk
print(f"学伴响应: {response}")
# 简单验证响应
assert response.strip(), "响应内容为空"
print("✅ 异步接口测试通过")
except Exception as e:
print(f"❌ 异步接口测试失败: {str(e)}")
if __name__ == "__main__":
main()