diff --git a/dsRag/Start.py b/dsRag/Start.py index d700c8dc..b50b5a41 100644 --- a/dsRag/Start.py +++ b/dsRag/Start.py @@ -1,41 +1,41 @@ import asyncio -import logging from contextlib import asynccontextmanager from logging.handlers import RotatingFileHandler +import jieba # 导入 jieba 分词库 import uvicorn -from fastapi import FastAPI, UploadFile, File, Request -from sse_starlette.sse import EventSourceResponse -from elasticsearch import Elasticsearch +from fastapi import FastAPI, Request from openai import OpenAI - -from Dao.KbDao import KbDao -from Util.MySQLUtil import init_mysql_pool +from sse_starlette.sse import EventSourceResponse +from starlette.staticfiles import StaticFiles +from gensim.models import KeyedVectors from Config import Config -from fastapi.staticfiles import StaticFiles +from Milvus.Config.MulvusConfig import * +from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from Milvus.Utils.MilvusConnectionPool import * +from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -handler = RotatingFileHandler('Logs/start.log', maxBytes=1024*1024, backupCount=5) +handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) +# 1. 加载预训练的 Word2Vec 模型 +model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径 +model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) +print(f"模型加载成功,词向量维度: {model.vector_size}") + + @asynccontextmanager async def lifespan(app: FastAPI): - # 初始化数据库连接池 - app.state.kb_dao = KbDao(await init_mysql_pool()) + # 初始化Milvus连接池 + app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) - # 初始化ES连接 - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - # 初始化ES连接时添加verify_certs=False - app.state.es = Elasticsearch( - hosts=Config.ES_CONFIG['hosts'], - basic_auth=Config.ES_CONFIG['basic_auth'], - verify_certs=False # 禁用证书验证 - ) + # 初始化集合管理器 + app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME) + app.state.collection_manager.load_collection() # 初始化DeepSeek客户端 app.state.deepseek_client = OpenAI( @@ -43,110 +43,66 @@ async def lifespan(app: FastAPI): base_url=Config.DEEPSEEK_URL ) yield - # 关闭数据库连接池 - await app.state.kb_dao.mysql_pool.close() - -app = FastAPI(lifespan=lifespan) - -# 知识库CRUD接口 -@app.get("/kb") -async def list_kbs(): - """获取所有知识库列表""" - return await app.state.kb_dao.list_kbs() -@app.post("/kb") -async def create_kb(kb: dict): - """创建知识库""" - return await app.state.kb_dao.create_kb(kb) + # 关闭Milvus连接池 + app.state.milvus_pool.close() -@app.get("/kb/{kb_id}") -async def read_kb(kb_id: int): - """获取知识库详情""" - return await app.state.kb_dao.get_kb(kb_id) - -@app.post("/kb/update/{kb_id}") -async def update_kb(kb_id: int, kb: dict): - """更新知识库信息""" - return await app.state.kb_dao.update_kb(kb_id, kb) - -@app.delete("/kb/{kb_id}") -async def delete_kb(kb_id: int): - """删除知识库""" - return await app.state.kb_dao.delete_kb(kb_id) - -# 知识库文件CRUD接口 -@app.post("/kb_file") -async def create_kb_file(file: dict): - """创建知识库文件记录""" - return await app.state.kb_dao.create_kb_file(file) - -@app.get("/kb_files/{file_id}") -async def read_kb_file(file_id: int): - """获取文件详情""" - return await app.state.kb_dao.get_kb_file(file_id) - -@app.post("/kb_files/update/{file_id}") -async def update_kb_file(file_id: int, file: dict): - """更新文件信息""" - return await app.state.kb_dao.update_kb_file(file_id, file) - -@app.delete("/kb_files/{file_id}") -async def delete_kb_file(file_id: int): - """删除文件记录""" - return await app.state.kb_dao.delete_kb_file(file_id) - -# 文件上传接口 -@app.post("/upload") -async def upload_file(kb_id: int, file: UploadFile = File(...)): - """文件上传接口""" - return await app.state.kb_dao.handle_upload(kb_id, file) - -def search_related_data(es, query): - """搜索与查询相关的数据""" - # 向量搜索 - vector_results = es.search( - index=Config.ES_CONFIG['default_index'], - body={ - "query": { - "match": { - "content": { - "query": query, - "analyzer": "ik_smart" - } - } - }, - "size": 5 - } - ) - - # 文本精确搜索 - text_results = es.search( - index="raw_texts", - body={ - "query": { - "match": { - "text.keyword": query - } - }, - "size": 5 - } - ) - - # 合并结果 - context = "" - for hit in vector_results['hits']['hits']: - context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" - - for hit in text_results['hits']['hits']: - context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" - - return context +app = FastAPI(lifespan=lifespan) -async def generate_stream(client, es, query): +# 将文本转换为嵌入向量 +def text_to_embedding(text): + words = jieba.lcut(text) # 使用 jieba 分词 + print(f"文本: {text}, 分词结果: {words}") + embeddings = [model[word] for word in words if word in model] + print(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + print("未找到有效词,返回零向量") + return [0.0] * model.vector_size + +async def generate_stream(client, milvus_pool, collection_manager, query): """生成SSE流""" - context = search_related_data(es, query) + # 从连接池获取连接 + connection = milvus_pool.get_connection() - prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: + try: + # 1. 将查询文本转换为向量 + current_embedding = text_to_embedding(query) + + # 2. 搜索相关数据 + search_params = { + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数 + } + # 7. 将文本转换为嵌入向量 + results = collection_manager.search(current_embedding, search_params, limit=10) # 返回 2 条结果 + + # 3. 处理搜索结果 + print("最相关的历史对话:") + context="" + if results: + for hits in results: + for hit in hits: + try: + # 查询非向量字段 + record = collection_manager.query_by_id(hit.id) + print(f"ID: {hit.id}") + print(f"会话 ID: {record['person_id']}") + print(f"用户问题: {record['user_input']}") + context=context+record['user_input'] + print(f"大模型回复: {record['model_response']}") + print(f"时间: {record['timestamp']}") + print(f"距离: {hit.distance}") + print("-" * 40) # 分隔线 + except Exception as e: + print(f"查询失败: {e}") + else: + print("未找到相关历史对话,请检查查询参数或数据。") + + prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: 要求: 1. 分章节组织内容 2. 包含关键数据和事实 @@ -154,8 +110,7 @@ async def generate_stream(client, es, query): 相关信息: {context}""" - - try: + response = client.chat.completions.create( model="deepseek-chat", messages=[ @@ -172,12 +127,22 @@ async def generate_stream(client, es, query): await asyncio.sleep(0.01) except Exception as e: yield {"data": f"生成报告时出错: {str(e)}"} - + finally: + # 释放连接 + milvus_pool.release_connection(connection) +""" +http://10.10.21.22:8000/api/rag?query=小学数学中有哪些模型 +""" @app.get("/api/rag") async def rag_stream(query: str, request: Request): """RAG+DeepSeek流式接口""" return EventSourceResponse( - generate_stream(request.app.state.deepseek_client, request.app.state.es, query) + generate_stream( + request.app.state.deepseek_client, + request.app.state.milvus_pool, + request.app.state.collection_manager, + query + ) ) app.mount("/static", StaticFiles(directory="Static"), name="static")