From f3e56f1769d5fd03da52bd81f364bc8f148e7ef5 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Mon, 24 Mar 2025 13:51:57 +0800 Subject: [PATCH] 'commit' --- AI/WxMini/Start.py | 148 +++++++++++++++++++++++++++++------ AI/WxMini/X_Start.py | 179 ------------------------------------------- 2 files changed, 125 insertions(+), 202 deletions(-) delete mode 100644 AI/WxMini/X_Start.py diff --git a/AI/WxMini/Start.py b/AI/WxMini/Start.py index b0f78154..defadd3a 100644 --- a/AI/WxMini/Start.py +++ b/AI/WxMini/Start.py @@ -1,23 +1,70 @@ import os import uuid +import time +import jieba from fastapi import FastAPI, Form, HTTPException -from fastapi.middleware.cors import CORSMiddleware # 导入 CORSMiddleware from openai import OpenAI +from gensim.models import KeyedVectors +from contextlib import asynccontextmanager from TtsConfig import * from WxMini.OssUtil import upload_mp3_to_oss from WxMini.TtsUtil import TTS +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +import jieba.analyse + +# 提取用户输入的关键词 +def extract_keywords(text, topK=3): + """ + 提取用户输入的关键词 + :param text: 用户输入的文本 + :param topK: 返回的关键词数量 + :return: 关键词列表 + """ + keywords = jieba.analyse.extract_tags(text, topK=topK) + return keywords + +# 初始化 Word2Vec 模型 +model_path = MS_MODEL_PATH +model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) +print(f"模型加载成功,词向量维度: {model.vector_size}") + +# 初始化 Milvus 连接池 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 将文本转换为嵌入向量 +def text_to_embedding(text): + words = jieba.lcut(text) # 使用 jieba 分词 + print(f"文本: {text}, 分词结果: {words}") + embeddings = [model[word] for word in words if word in model] + print(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + print("未找到有效词,返回零向量") + return [0.0] * model.vector_size + +# 使用 Lifespan Events 处理应用启动和关闭逻辑 +@asynccontextmanager +async def lifespan(app: FastAPI): + # 应用启动时加载集合到内存 + collection_manager.load_collection() + print(f"集合 '{collection_name}' 已加载到内存。") + yield + # 应用关闭时释放连接池 + milvus_pool.close() + print("Milvus 连接池已关闭。") # 初始化 FastAPI 应用 -app = FastAPI() - -# 添加跨域支持 -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # 允许所有来源,也可以指定具体的域名 - allow_credentials=True, - allow_methods=["*"], # 允许所有 HTTP 方法 - allow_headers=["*"], # 允许所有 HTTP 头 -) +app = FastAPI(lifespan=lifespan) # 初始化 OpenAI 客户端 client = OpenAI( @@ -25,21 +72,57 @@ client = OpenAI( base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) - @app.post("/reply") -async def reply(prompt: str = Form(...)): +async def reply(session_id: str = Form(...), prompt: str = Form(...)): """ 接收用户输入的 prompt,调用大模型并返回结果 + :param session_id: 用户会话 ID :param prompt: 用户输入的 prompt :return: 大模型的回复 """ try: - # 调用大模型 + # 从连接池中获取一个连接 + connection = milvus_pool.get_connection() + + # 将用户输入转换为嵌入向量 + current_embedding = text_to_embedding(prompt) + + # 查询与当前对话最相关的历史交互 + search_params = { + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数 + } + start_time = time.time() + results = collection_manager.search( + data=current_embedding, # 输入向量 + search_params=search_params, # 搜索参数 + limit=5 # 返回 5 条结果 + ) + end_time = time.time() + + # 构建历史交互提示词 + history_prompt = "" + if results: + for hits in results: + for hit in hits: + try: + # 查询非向量字段 + record = collection_manager.query_by_id(hit.id) + if record: + print(f"查询到的记录: {record}") + # 添加历史交互 + history_prompt += f"用户: {record['user_input']}\n大模型: {record['model_response']}\n" + except Exception as e: + print(f"查询失败: {e}") + + print(f"历史交互提示词: {history_prompt}") + + # 调用大模型,将历史交互作为提示词 response = client.chat.completions.create( model=MODEL_NAME, messages=[ - {"role": "system", "content": "你是一个非常好的聊天伙伴,可以疏导用户,帮他解压,一句控制在20字以内。"}, - {"role": "user", "content": prompt} + {"role": "system", "content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息。"}, + {"role": "user", "content": f"{history_prompt}用户: {prompt}"} # 将历史交互和当前输入一起发送 ], max_tokens=500 ) @@ -47,13 +130,25 @@ async def reply(prompt: str = Form(...)): # 提取生成的回复 if response.choices and response.choices[0].message.content: result = response.choices[0].message.content.strip() - # 调用tts进行生成mp3 - # 生成一个uuid的文件名 + + # 记录用户输入和大模型反馈到向量数据库 + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + entities = [ + [session_id], # session_id + [prompt], # user_input + [result], # model_response + [timestamp], # timestamp + [current_embedding] # embedding + ] + collection_manager.insert_data(entities) + print("用户输入和大模型反馈已记录到向量数据库。") + + # 调用 TTS 生成 MP3 uuid_str = str(uuid.uuid4()) tts_file = "audio/" + uuid_str + ".mp3" t = TTS(tts_file) t.start(result) - # 文件上传到oss + # 文件上传到 OSS upload_mp3_to_oss(tts_file, tts_file) # 删除临时文件 try: @@ -61,17 +156,24 @@ async def reply(prompt: str = Form(...)): print(f"临时文件 {tts_file} 已删除") except Exception as e: print(f"删除临时文件失败: {e}") - # 完整的url + # 完整的 URL url = 'https://ylt.oss-cn-hangzhou.aliyuncs.com/' + tts_file - return {"success": True, "url": url} + return { + "success": True, + "url": url, + "search_time": end_time - start_time, # 返回查询耗时 + "response": result # 返回大模型的回复 + } else: raise HTTPException(status_code=500, detail="大模型未返回有效结果") except Exception as e: raise HTTPException(status_code=500, detail=f"调用大模型失败: {str(e)}") - + finally: + # 释放连接 + milvus_pool.release_connection(connection) # 运行 FastAPI 应用 if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=5500) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=5600) \ No newline at end of file diff --git a/AI/WxMini/X_Start.py b/AI/WxMini/X_Start.py deleted file mode 100644 index defadd3a..00000000 --- a/AI/WxMini/X_Start.py +++ /dev/null @@ -1,179 +0,0 @@ -import os -import uuid -import time -import jieba -from fastapi import FastAPI, Form, HTTPException -from openai import OpenAI -from gensim.models import KeyedVectors -from contextlib import asynccontextmanager -from TtsConfig import * -from WxMini.OssUtil import upload_mp3_to_oss -from WxMini.TtsUtil import TTS -from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager -from WxMini.Milvus.Utils.MilvusConnectionPool import * -from WxMini.Milvus.Config.MulvusConfig import * - -import jieba.analyse - -# 提取用户输入的关键词 -def extract_keywords(text, topK=3): - """ - 提取用户输入的关键词 - :param text: 用户输入的文本 - :param topK: 返回的关键词数量 - :return: 关键词列表 - """ - keywords = jieba.analyse.extract_tags(text, topK=topK) - return keywords - -# 初始化 Word2Vec 模型 -model_path = MS_MODEL_PATH -model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) -print(f"模型加载成功,词向量维度: {model.vector_size}") - -# 初始化 Milvus 连接池 -milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) - -# 初始化集合管理器 -collection_name = MS_COLLECTION_NAME -collection_manager = MilvusCollectionManager(collection_name) - -# 将文本转换为嵌入向量 -def text_to_embedding(text): - words = jieba.lcut(text) # 使用 jieba 分词 - print(f"文本: {text}, 分词结果: {words}") - embeddings = [model[word] for word in words if word in model] - print(f"有效词向量数量: {len(embeddings)}") - if embeddings: - avg_embedding = sum(embeddings) / len(embeddings) - print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 - return avg_embedding - else: - print("未找到有效词,返回零向量") - return [0.0] * model.vector_size - -# 使用 Lifespan Events 处理应用启动和关闭逻辑 -@asynccontextmanager -async def lifespan(app: FastAPI): - # 应用启动时加载集合到内存 - collection_manager.load_collection() - print(f"集合 '{collection_name}' 已加载到内存。") - yield - # 应用关闭时释放连接池 - milvus_pool.close() - print("Milvus 连接池已关闭。") - -# 初始化 FastAPI 应用 -app = FastAPI(lifespan=lifespan) - -# 初始化 OpenAI 客户端 -client = OpenAI( - api_key=MODEL_API_KEY, - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", -) - -@app.post("/reply") -async def reply(session_id: str = Form(...), prompt: str = Form(...)): - """ - 接收用户输入的 prompt,调用大模型并返回结果 - :param session_id: 用户会话 ID - :param prompt: 用户输入的 prompt - :return: 大模型的回复 - """ - try: - # 从连接池中获取一个连接 - connection = milvus_pool.get_connection() - - # 将用户输入转换为嵌入向量 - current_embedding = text_to_embedding(prompt) - - # 查询与当前对话最相关的历史交互 - search_params = { - "metric_type": "L2", # 使用 L2 距离度量方式 - "params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数 - } - start_time = time.time() - results = collection_manager.search( - data=current_embedding, # 输入向量 - search_params=search_params, # 搜索参数 - limit=5 # 返回 5 条结果 - ) - end_time = time.time() - - # 构建历史交互提示词 - history_prompt = "" - if results: - for hits in results: - for hit in hits: - try: - # 查询非向量字段 - record = collection_manager.query_by_id(hit.id) - if record: - print(f"查询到的记录: {record}") - # 添加历史交互 - history_prompt += f"用户: {record['user_input']}\n大模型: {record['model_response']}\n" - except Exception as e: - print(f"查询失败: {e}") - - print(f"历史交互提示词: {history_prompt}") - - # 调用大模型,将历史交互作为提示词 - response = client.chat.completions.create( - model=MODEL_NAME, - messages=[ - {"role": "system", "content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息。"}, - {"role": "user", "content": f"{history_prompt}用户: {prompt}"} # 将历史交互和当前输入一起发送 - ], - max_tokens=500 - ) - - # 提取生成的回复 - if response.choices and response.choices[0].message.content: - result = response.choices[0].message.content.strip() - - # 记录用户输入和大模型反馈到向量数据库 - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - entities = [ - [session_id], # session_id - [prompt], # user_input - [result], # model_response - [timestamp], # timestamp - [current_embedding] # embedding - ] - collection_manager.insert_data(entities) - print("用户输入和大模型反馈已记录到向量数据库。") - - # 调用 TTS 生成 MP3 - uuid_str = str(uuid.uuid4()) - tts_file = "audio/" + uuid_str + ".mp3" - t = TTS(tts_file) - t.start(result) - # 文件上传到 OSS - upload_mp3_to_oss(tts_file, tts_file) - # 删除临时文件 - try: - os.remove(tts_file) - print(f"临时文件 {tts_file} 已删除") - except Exception as e: - print(f"删除临时文件失败: {e}") - # 完整的 URL - url = 'https://ylt.oss-cn-hangzhou.aliyuncs.com/' + tts_file - return { - "success": True, - "url": url, - "search_time": end_time - start_time, # 返回查询耗时 - "response": result # 返回大模型的回复 - } - else: - raise HTTPException(status_code=500, detail="大模型未返回有效结果") - except Exception as e: - raise HTTPException(status_code=500, detail=f"调用大模型失败: {str(e)}") - finally: - # 释放连接 - milvus_pool.release_connection(connection) - -# 运行 FastAPI 应用 -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=5600) \ No newline at end of file