diff --git a/dsRag/StartEs.py b/dsRag/StartEs.py index 94b9972c..e69de29b 100644 --- a/dsRag/StartEs.py +++ b/dsRag/StartEs.py @@ -1,318 +0,0 @@ -import os -import subprocess -import tempfile -import urllib.parse -import uuid -import logging -from contextlib import asynccontextmanager -from io import BytesIO -from logging.handlers import RotatingFileHandler -from typing import List - -import jieba # 导入 jieba 分词库 -import uvicorn -from fastapi import FastAPI, Request, HTTPException -from fastapi.staticfiles import StaticFiles -from gensim.models import KeyedVectors -from pydantic import BaseModel, Field, ValidationError -from starlette.responses import StreamingResponse - -from Config.Config import ES_CONFIG, MS_MODEL_PATH, MS_MODEL_LIMIT -from ElasticSearch.Utils.ElasticsearchCollectionManager import ElasticsearchCollectionManager -from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool -from Util.ALiYunUtil import ALiYunUtil - -# 初始化日志 -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -# 确保日志目录存在 -os.makedirs('Logs', exist_ok=True) -handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5) -handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) -logger.addHandler(handler) - -# 1. 加载预训练的 Word2Vec 模型 -model = KeyedVectors.load_word2vec_format(MS_MODEL_PATH, binary=False, limit=MS_MODEL_LIMIT) -logger.info(f"模型加载成功,词向量维度: {model.vector_size}") - - -# 将HTML文件转换为Word文件 -def html_to_word_pandoc(html_file, output_file): - subprocess.run(['pandoc', html_file, '-o', output_file]) - - -@asynccontextmanager -async def lifespan(app: FastAPI): - # 初始化Elasticsearch连接池 - app.state.es_pool = ElasticsearchConnectionPool( - hosts=ES_CONFIG["hosts"], - basic_auth=ES_CONFIG["basic_auth"], - verify_certs=ES_CONFIG["verify_certs"], - max_connections=50 - ) - logger.info("Elasticsearch连接池初始化完成") - - # 初始化集合管理器 - app.state.collection_manager = ElasticsearchCollectionManager(ES_CONFIG["index_name"]) - # 获取连接并加载索引 - es_conn = app.state.es_pool.get_connection() - try: - app.state.collection_manager.load_collection(es_conn) - finally: - app.state.es_pool.release_connection(es_conn) - - # 初始化阿里云大模型工具 - app.state.aliyun_util = ALiYunUtil() - - yield - # 关闭Elasticsearch连接池 - app.state.es_pool.close() - - -app = FastAPI(lifespan=lifespan) - -# 挂载静态文件目录 -app.mount("/static", StaticFiles(directory="Static"), name="static") - - -# 将文本转换为嵌入向量 -def text_to_embedding(text): - words = jieba.lcut(text) # 使用 jieba 分词 - print(f"文本: {text}, 分词结果: {words}") - try: - embeddings = [model[word] for word in words if word in model] - logger.info(f"有效词向量数量: {len(embeddings)}") - if embeddings: - avg_embedding = sum(embeddings) / len(embeddings) - logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 - return avg_embedding - else: - logger.warning("未找到有效词,返回零向量") - return [0.0] * model.vector_size - except Exception as e: - logger.error(f"向量转换失败: {str(e)}") - return [0.0] * model.vector_size - - -async def generate_stream(client, es_pool, collection_manager, query, documents): - # 从连接池获取连接 - es_conn = es_pool.get_connection() - try: - # 1. 将查询文本转换为向量 - current_embedding = text_to_embedding(query) - - # 2. 构建搜索参数 - if documents: - expr = { - "bool": { - "should": [ - {"terms": {"tags.tags": [doc]}} for doc in documents - ], - "minimum_should_match": 1 - } - } - else: - expr = None - - # 3. 执行混合搜索 - if not isinstance(current_embedding, list): - logger.error(f"current_embedding类型错误: {type(current_embedding)}") - current_embedding = [0.0] * model.vector_size - - # 向量搜索 - vector_results = es_conn.search( - index=ES_CONFIG['index_name'], - body={ - "query": { - "match": { - "content": { - "query": query, - "analyzer": "ik_smart" # 指定分词器 - } - } - }, - "size": 5 - } - ) - - # 文本精确搜索 - text_results = es_conn.search( - index="raw_texts", - body={ - "query": { - "match": { - "text.keyword": query - } - }, - "size": 5 - } - ) - - # 合并结果 - results = [] - for hit in vector_results['hits']['hits']: - results.append({ - "_source": hit['_source'], - "_score": hit['_score'], - "_id": hit['_id'] - }) - - for hit in text_results['hits']['hits']: - results.append({ - "_source": hit['_source'], - "_score": hit['_score'], - "_id": hit['_id'] - }) - logger.info(f"搜索返回结果数量: {len(results) if results else 0}") - - # 3. 处理搜索结果 - logger.info("最相关的知识库内容:") - context = "" - if results: - for hit in results: - try: - source = hit["_source"] - score = hit["_score"] - - if score > 0.5: # 设置相似度阈值 - logger.info(f"ID: {hit['_id']}") - logger.info(f"标签: {source['tags']}") - logger.info(f"用户问题: {source['user_input']}") - logger.info(f"时间: {source['timestamp']}") - logger.info(f"分数: {score}") - logger.info("-" * 40) - - full_content = source['tags'].get('full_content', source['user_input']) - if isinstance(full_content, str): - context = context + full_content - else: - logger.warning(f"Unexpected content type: {type(full_content)}") - else: - logger.warning(f"分数太低,忽略此结果: {hit['_id']}") - continue - - except Exception as e: - logger.error(f"查询失败: {e}") - else: - logger.warning("未找到相关历史对话,请检查查询参数或数据。") - - prompt = f""" -信息检索与回答助手 -根据以下关于'{query}'的相关信息: - -基本信息 -- 语言: 中文 -- 描述: 根据提供的材料检索信息并回答问题 -- 特点: 快速准确提取关键信息,清晰简洁地回答 - -相关信息 -{context} - -回答要求 -1. 依托给定的资料,快速准确地回答问题,可以添加一些额外的信息,但请勿重复内容。 -2. 如果未提供相关信息,请不要回答。 -3. 如果发现相关信息与原来的问题契合度低,也不要回答 -4. 使用HTML格式返回,包含适当的段落、列表和标题标签 -5. 确保内容结构清晰,便于前端展示 -""" - - # 调用阿里云大模型 - if len(context) > 0: - html_content = client.chat(prompt) - yield {"data": html_content} - else: - yield {"data": "没有在知识库中找到相关的信息,无法回答此问题。"} - except Exception as e: - logger.error(f"生成报告时出错: {str(e)}") - yield {"data": f"生成报告时出错: {str(e)}"} - finally: - # 释放连接 - es_pool.release_connection(es_conn) - logger.info("Elasticsearch连接已释放") - - -class QueryRequest(BaseModel): - query: str = Field(..., description="用户查询的问题") - documents: List[str] = Field(..., description="用户上传的文档") - - -class SaveWordRequest(BaseModel): - html: str = Field(..., description="要保存为Word的HTML内容") - - -@app.post("/api/save-word") -async def save_to_word(request: Request): - temp_html = None - output_file = None - try: - # Parse request data - try: - data = await request.json() - html_content = data.get('html_content', '') - if not html_content: - raise ValueError("Empty HTML content") - except Exception as e: - logger.error(f"Request parsing failed: {str(e)}") - raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}") - - # 创建临时HTML文件 - temp_html = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + ".html") - with open(temp_html, "w", encoding="utf-8") as f: - f.write(html_content) - - # 使用pandoc转换 - output_file = os.path.join(tempfile.gettempdir(), "理想大模型问答.docx") - subprocess.run(['pandoc', temp_html, '-o', output_file], check=True) - - # 读取生成的Word文件 - with open(output_file, "rb") as f: - stream = BytesIO(f.read()) - - # 返回响应 - encoded_filename = urllib.parse.quote("理想大模型问答.docx") - return StreamingResponse( - stream, - media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", - headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}) - - except HTTPException: - raise - except Exception as e: - logger.error(f"Unexpected error: {str(e)}") - raise HTTPException(status_code=500, detail="Internal server error") - finally: - # 清理临时文件 - try: - if temp_html and os.path.exists(temp_html): - os.remove(temp_html) - if output_file and os.path.exists(output_file): - os.remove(output_file) - except Exception as e: - logger.warning(f"Failed to clean up temp files: {str(e)}") - - -@app.post("/api/rag") -async def rag_stream(request: Request): - try: - data = await request.json() - query_request = QueryRequest(**data) - except ValidationError as e: - logger.error(f"请求体验证失败: {e.errors()}") - raise HTTPException(status_code=422, detail=e.errors()) - except Exception as e: - logger.error(f"请求解析失败: {str(e)}") - raise HTTPException(status_code=400, detail="无效的请求格式") - """RAG+ALiYun接口""" - async for chunk in generate_stream( - request.app.state.aliyun_util, - request.app.state.es_pool, - request.app.state.collection_manager, - query_request.query, - query_request.documents - ): - return chunk - - -if __name__ == "__main__": - logger.info("启动Elasticsearch混合搜索服务") - uvicorn.run(app, host="0.0.0.0", port=8000)