import asyncio from contextlib import asynccontextmanager from logging.handlers import RotatingFileHandler import jieba # 导入 jieba 分词库 import uvicorn from fastapi import FastAPI, Request from openai import OpenAI from sse_starlette.sse import EventSourceResponse from starlette.staticfiles import StaticFiles from gensim.models import KeyedVectors from Config import Config from Milvus.Config.MulvusConfig import * from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager from Milvus.Utils.MilvusConnectionPool import * from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 1. 加载预训练的 Word2Vec 模型 model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径 model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) print(f"模型加载成功,词向量维度: {model.vector_size}") @asynccontextmanager async def lifespan(app: FastAPI): # 初始化Milvus连接池 app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) # 初始化集合管理器 app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME) app.state.collection_manager.load_collection() # 初始化DeepSeek客户端 app.state.deepseek_client = OpenAI( api_key=Config.DEEPSEEK_API_KEY, base_url=Config.DEEPSEEK_URL ) yield # 关闭Milvus连接池 app.state.milvus_pool.close() app = FastAPI(lifespan=lifespan) # 将文本转换为嵌入向量 def text_to_embedding(text): words = jieba.lcut(text) # 使用 jieba 分词 print(f"文本: {text}, 分词结果: {words}") embeddings = [model[word] for word in words if word in model] print(f"有效词向量数量: {len(embeddings)}") if embeddings: avg_embedding = sum(embeddings) / len(embeddings) print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 return avg_embedding else: print("未找到有效词,返回零向量") return [0.0] * model.vector_size async def generate_stream(client, milvus_pool, collection_manager, query): """生成SSE流""" # 从连接池获取连接 connection = milvus_pool.get_connection() try: # 1. 将查询文本转换为向量 current_embedding = text_to_embedding(query) # 2. 搜索相关数据 search_params = { "metric_type": "L2", # 使用 L2 距离度量方式 "params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数 } # 7. 将文本转换为嵌入向量 results = collection_manager.search(current_embedding, search_params, limit=10) # 返回 2 条结果 # 3. 处理搜索结果 print("最相关的历史对话:") context="" if results: for hits in results: for hit in hits: try: # 查询非向量字段 record = collection_manager.query_by_id(hit.id) print(f"ID: {hit.id}") print(f"会话 ID: {record['person_id']}") print(f"用户问题: {record['user_input']}") context=context+record['user_input'] print(f"大模型回复: {record['model_response']}") print(f"时间: {record['timestamp']}") print(f"距离: {hit.distance}") print("-" * 40) # 分隔线 except Exception as e: print(f"查询失败: {e}") else: print("未找到相关历史对话,请检查查询参数或数据。") prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: 要求: 1. 分章节组织内容 2. 包含关键数据和事实 3. 语言简洁专业 相关信息: {context}""" response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一个专业的文档整理助手"}, {"role": "user", "content": prompt} ], temperature=0.3, stream=True ) for chunk in response: if chunk.choices[0].delta.content: yield {"data": chunk.choices[0].delta.content} await asyncio.sleep(0.01) except Exception as e: yield {"data": f"生成报告时出错: {str(e)}"} finally: # 释放连接 milvus_pool.release_connection(connection) """ http://10.10.21.22:8000/api/rag?query=小学数学中有哪些模型 """ @app.get("/api/rag") async def rag_stream(query: str, request: Request): """RAG+DeepSeek流式接口""" return EventSourceResponse( generate_stream( request.app.state.deepseek_client, request.app.state.milvus_pool, request.app.state.collection_manager, query ) ) app.mount("/static", StaticFiles(directory="Static"), name="static") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)