From 8f06b260ef1702092eeed326870eee3b750150ff Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Wed, 25 Jun 2025 09:03:05 +0800 Subject: [PATCH] 'commit' --- dsRag/Doc/8、Docling.txt | 5 ++ dsRag/Start.py | 104 ++++++++++++++++++++++++++++++++++- dsRag/StartSSE.py | 112 -------------------------------------- 3 files changed, 107 insertions(+), 114 deletions(-) create mode 100644 dsRag/Doc/8、Docling.txt delete mode 100644 dsRag/StartSSE.py diff --git a/dsRag/Doc/8、Docling.txt b/dsRag/Doc/8、Docling.txt new file mode 100644 index 00000000..f6dd9f3b --- /dev/null +++ b/dsRag/Doc/8、Docling.txt @@ -0,0 +1,5 @@ +# 官网 +https://github.com/DS4SD/docling + +# 文档解析神器 Docling 上线,快速转换文件格式 +https://blog.csdn.net/XLionXxxx/article/details/144915203 diff --git a/dsRag/Start.py b/dsRag/Start.py index 667df41f..7db62625 100644 --- a/dsRag/Start.py +++ b/dsRag/Start.py @@ -4,12 +4,16 @@ from contextlib import asynccontextmanager from logging.handlers import RotatingFileHandler import uvicorn -from fastapi import FastAPI, UploadFile, File +from fastapi import FastAPI, UploadFile, File, Request +from sse_starlette.sse import EventSourceResponse +from elasticsearch import Elasticsearch +from openai import OpenAI from Dao.KbDao import KbDao from Util.MySQLUtil import init_mysql_pool - +from Config import Config from fastapi.staticfiles import StaticFiles +import urllib3 # 初始化日志 logger = logging.getLogger(__name__) @@ -23,6 +27,23 @@ async def lifespan(app: FastAPI): # 初始化数据库连接池 app.state.kb_dao = KbDao(await init_mysql_pool()) + # 初始化ES连接 + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # 初始化ES连接时添加verify_certs=False + app.state.es = Elasticsearch( + hosts=Config.ES_CONFIG['hosts'], + basic_auth=Config.ES_CONFIG['basic_auth'], + verify_certs=False # 禁用证书验证 + ) + + # 初始化DeepSeek客户端 + app.state.deepseek_client = OpenAI( + api_key=Config.DEEPSEEK_API_KEY, + base_url=Config.DEEPSEEK_URL + ) + # 启动文档处理任务 async def document_processor(): while True: @@ -97,6 +118,85 @@ async def upload_file(kb_id: int, file: UploadFile = File(...)): """文件上传接口""" return await app.state.kb_dao.handle_upload(kb_id, file) +def search_related_data(es, query): + """搜索与查询相关的数据""" + # 向量搜索 + vector_results = es.search( + index=Config.ES_CONFIG['default_index'], + body={ + "query": { + "match": { + "content": { + "query": query, + "analyzer": "ik_smart" + } + } + }, + "size": 5 + } + ) + + # 文本精确搜索 + text_results = es.search( + index="raw_texts", + body={ + "query": { + "match": { + "text.keyword": query + } + }, + "size": 5 + } + ) + + # 合并结果 + context = "" + for hit in vector_results['hits']['hits']: + context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" + + for hit in text_results['hits']['hits']: + context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" + + return context + +async def generate_stream(client, es, query): + """生成SSE流""" + context = search_related_data(es, query) + + prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: +要求: +1. 分章节组织内容 +2. 包含关键数据和事实 +3. 语言简洁专业 + +相关信息: +{context}""" + + try: + response = client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "你是一个专业的文档整理助手"}, + {"role": "user", "content": prompt} + ], + temperature=0.3, + stream=True + ) + + for chunk in response: + if chunk.choices[0].delta.content: + yield {"data": chunk.choices[0].delta.content} + await asyncio.sleep(0.01) + except Exception as e: + yield {"data": f"生成报告时出错: {str(e)}"} + +@app.get("/api/rag") +async def rag_stream(query: str, request: Request): + """RAG+DeepSeek流式接口""" + return EventSourceResponse( + generate_stream(request.app.state.deepseek_client, request.app.state.es, query) + ) + app.mount("/static", StaticFiles(directory="Static"), name="static") if __name__ == "__main__": diff --git a/dsRag/StartSSE.py b/dsRag/StartSSE.py deleted file mode 100644 index 0e186788..00000000 --- a/dsRag/StartSSE.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -conda activate rag -pip install fastapi uvicorn sse-starlette -""" -import asyncio - -from elasticsearch import Elasticsearch -from fastapi import FastAPI -from openai import OpenAI -from sse_starlette.sse import EventSourceResponse - -from Config import Config - -app = FastAPI() - -# 初始化ES连接 -es = Elasticsearch( - hosts=Config.ES_CONFIG['hosts'], - basic_auth=Config.ES_CONFIG['basic_auth'], - verify_certs=Config.ES_CONFIG['verify_certs'] -) - -# 初始化DeepSeek客户端 -client = OpenAI( - api_key=Config.DEEPSEEK_API_KEY, - base_url=Config.DEEPSEEK_URL -) - -def search_related_data(query): - """搜索与查询相关的数据""" - # 向量搜索 - vector_results = es.search( - index=Config.ES_CONFIG['default_index'], - body={ - "query": { - "match": { - "content": { - "query": query, - "analyzer": "ik_smart" - } - } - }, - "size": 5 - } - ) - - # 文本精确搜索 - text_results = es.search( - index="raw_texts", - body={ - "query": { - "match": { - "text.keyword": query - } - }, - "size": 5 - } - ) - - # 合并结果 - context = "" - for hit in vector_results['hits']['hits']: - context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" - - for hit in text_results['hits']['hits']: - context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" - - return context - -async def generate_stream(query): - """生成SSE流""" - context = search_related_data(query) - - prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: -要求: -1. 分章节组织内容 -2. 包含关键数据和事实 -3. 语言简洁专业 - -相关信息: -{context}""" - - try: - response = client.chat.completions.create( - model="deepseek-chat", - messages=[ - {"role": "system", "content": "你是一个专业的文档整理助手"}, - {"role": "user", "content": prompt} - ], - temperature=0.3, - stream=True - ) - - for chunk in response: - if chunk.choices[0].delta.content: - yield {"data": chunk.choices[0].delta.content} - await asyncio.sleep(0.01) - except Exception as e: - yield {"data": f"生成报告时出错: {str(e)}"} - -@app.get("/api/rag") -async def rag_stream(query: str): - """RAG+DeepSeek流式接口""" - return EventSourceResponse(generate_stream(query)) - -""" -http://localhost:8000/api/rag?query=整理云南省初中在校生情况文档 -http://10.10.21.20:8000/api/rag?query=整理云南省初中在校生情况文档 -""" -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file