import os import subprocess import tempfile import urllib.parse import uuid from contextlib import asynccontextmanager from io import BytesIO from logging.handlers import RotatingFileHandler from typing import List import jieba # 导入 jieba 分词库 import uvicorn from fastapi import FastAPI, Request, HTTPException from fastapi.staticfiles import StaticFiles from gensim.models import KeyedVectors from pydantic import BaseModel, Field, ValidationError from starlette.responses import StreamingResponse from Config.Config import MS_MODEL_PATH, MS_MODEL_LIMIT, MS_HOST, MS_PORT, MS_MAX_CONNECTIONS, MS_NPROBE, \ MS_COLLECTION_NAME, ES_CONFIG from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager from Milvus.Utils.MilvusConnectionPool import * from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool from Util.ALiYunUtil import ALiYunUtil from Util.EsSearchUtil import EsSearchUtil # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # 将HTML文件转换为Word文件 def html_to_word_pandoc(html_file, output_file): subprocess.run(['pandoc', html_file, '-o', output_file]) async def lifespan(app: FastAPI): # 初始化阿里云大模型工具 app.state.aliyun_util = ALiYunUtil() yield # 清理资源 await app.state.aliyun_util.close() app = FastAPI(lifespan=lifespan) # 挂载静态文件目录 app.mount("/static", StaticFiles(directory="Static"), name="static") class QueryRequest(BaseModel): query: str = Field(..., description="用户查询的问题") documents: List[str] = Field(..., description="用户上传的文档") class SaveWordRequest(BaseModel): html: str = Field(..., description="要保存为Word的HTML内容") @app.post("/api/save-word") async def save_to_word(request: Request): temp_html = None output_file = None try: # Parse request data try: data = await request.json() html_content = data.get('html_content', '') if not html_content: raise ValueError("Empty HTML content") except Exception as e: logger.error(f"Request parsing failed: {str(e)}") raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}") # 创建临时HTML文件 temp_html = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + ".html") with open(temp_html, "w", encoding="utf-8") as f: f.write(html_content) # 使用pandoc转换 output_file = os.path.join(tempfile.gettempdir(), "【理想大模型】问答.docx") subprocess.run(['pandoc', temp_html, '-o', output_file], check=True) # 读取生成的Word文件 with open(output_file, "rb") as f: stream = BytesIO(f.read()) # 返回响应 encoded_filename = urllib.parse.quote("【理想大模型】问答.docx") return StreamingResponse( stream, media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}) except HTTPException: raise except Exception as e: logger.error(f"Unexpected error: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") finally: # 清理临时文件 try: if temp_html and os.path.exists(temp_html): os.remove(temp_html) if output_file and os.path.exists(output_file): os.remove(output_file) except Exception as e: logger.warning(f"Failed to clean up temp files: {str(e)}") @app.post("/api/rag") async def rag_stream(request: Request): try: data = await request.json() query = data.get('query', '') query_tags = data.get('tags', []) # 获取EsSearchUtil实例 es_search_util = EsSearchUtil(ES_CONFIG) # 执行混合搜索 es_conn = es_search_util.es_pool.get_connection() try: # 向量搜索 query_embedding = es_search_util.text_to_embedding(query) vector_results = es_conn.search( index=ES_CONFIG['index_name'], body={ "query": { "script_score": { "query": { "bool": { "should": [ { "terms": { "tags.tags": query_tags } } ], "minimum_should_match": 1 } }, "script": { "source": "double score = cosineSimilarity(params.query_vector, 'embedding'); return score >= 0 ? score : 0", "params": {"query_vector": query_embedding} } } }, "size": 3 } ) # 文本精确搜索 text_results = es_conn.search( index=ES_CONFIG['index_name'], body={ "query": { "bool": { "must": [ { "match": { "user_input": query } }, { "terms": { "tags.tags": query_tags } } ] } }, "size": 3 } ) # 合并结果 results = { "vector_results": [hit['_source'] for hit in vector_results['hits']['hits']], "text_results": [hit['_source'] for hit in text_results['hits']['hits']] } return results finally: es_search_util.es_pool.release_connection(es_conn) except Exception as e: logger.error(f"RAG search error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)