main
HuangHai 4 weeks ago
parent c13e5fac52
commit 0cfe9b1877

@ -1,318 +0,0 @@
import os
import subprocess
import tempfile
import urllib.parse
import uuid
import logging
from contextlib import asynccontextmanager
from io import BytesIO
from logging.handlers import RotatingFileHandler
from typing import List
import jieba # 导入 jieba 分词库
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from gensim.models import KeyedVectors
from pydantic import BaseModel, Field, ValidationError
from starlette.responses import StreamingResponse
from Config.Config import ES_CONFIG, MS_MODEL_PATH, MS_MODEL_LIMIT
from ElasticSearch.Utils.ElasticsearchCollectionManager import ElasticsearchCollectionManager
from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool
from Util.ALiYunUtil import ALiYunUtil
# 初始化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 确保日志目录存在
os.makedirs('Logs', exist_ok=True)
handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 1. 加载预训练的 Word2Vec 模型
model = KeyedVectors.load_word2vec_format(MS_MODEL_PATH, binary=False, limit=MS_MODEL_LIMIT)
logger.info(f"模型加载成功,词向量维度: {model.vector_size}")
# 将HTML文件转换为Word文件
def html_to_word_pandoc(html_file, output_file):
subprocess.run(['pandoc', html_file, '-o', output_file])
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化Elasticsearch连接池
app.state.es_pool = ElasticsearchConnectionPool(
hosts=ES_CONFIG["hosts"],
basic_auth=ES_CONFIG["basic_auth"],
verify_certs=ES_CONFIG["verify_certs"],
max_connections=50
)
logger.info("Elasticsearch连接池初始化完成")
# 初始化集合管理器
app.state.collection_manager = ElasticsearchCollectionManager(ES_CONFIG["index_name"])
# 获取连接并加载索引
es_conn = app.state.es_pool.get_connection()
try:
app.state.collection_manager.load_collection(es_conn)
finally:
app.state.es_pool.release_connection(es_conn)
# 初始化阿里云大模型工具
app.state.aliyun_util = ALiYunUtil()
yield
# 关闭Elasticsearch连接池
app.state.es_pool.close()
app = FastAPI(lifespan=lifespan)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="Static"), name="static")
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
try:
embeddings = [model[word] for word in words if word in model]
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
logger.warning("未找到有效词,返回零向量")
return [0.0] * model.vector_size
except Exception as e:
logger.error(f"向量转换失败: {str(e)}")
return [0.0] * model.vector_size
async def generate_stream(client, es_pool, collection_manager, query, documents):
# 从连接池获取连接
es_conn = es_pool.get_connection()
try:
# 1. 将查询文本转换为向量
current_embedding = text_to_embedding(query)
# 2. 构建搜索参数
if documents:
expr = {
"bool": {
"should": [
{"terms": {"tags.tags": [doc]}} for doc in documents
],
"minimum_should_match": 1
}
}
else:
expr = None
# 3. 执行混合搜索
if not isinstance(current_embedding, list):
logger.error(f"current_embedding类型错误: {type(current_embedding)}")
current_embedding = [0.0] * model.vector_size
# 向量搜索
vector_results = es_conn.search(
index=ES_CONFIG['index_name'],
body={
"query": {
"match": {
"content": {
"query": query,
"analyzer": "ik_smart" # 指定分词器
}
}
},
"size": 5
}
)
# 文本精确搜索
text_results = es_conn.search(
index="raw_texts",
body={
"query": {
"match": {
"text.keyword": query
}
},
"size": 5
}
)
# 合并结果
results = []
for hit in vector_results['hits']['hits']:
results.append({
"_source": hit['_source'],
"_score": hit['_score'],
"_id": hit['_id']
})
for hit in text_results['hits']['hits']:
results.append({
"_source": hit['_source'],
"_score": hit['_score'],
"_id": hit['_id']
})
logger.info(f"搜索返回结果数量: {len(results) if results else 0}")
# 3. 处理搜索结果
logger.info("最相关的知识库内容:")
context = ""
if results:
for hit in results:
try:
source = hit["_source"]
score = hit["_score"]
if score > 0.5: # 设置相似度阈值
logger.info(f"ID: {hit['_id']}")
logger.info(f"标签: {source['tags']}")
logger.info(f"用户问题: {source['user_input']}")
logger.info(f"时间: {source['timestamp']}")
logger.info(f"分数: {score}")
logger.info("-" * 40)
full_content = source['tags'].get('full_content', source['user_input'])
if isinstance(full_content, str):
context = context + full_content
else:
logger.warning(f"Unexpected content type: {type(full_content)}")
else:
logger.warning(f"分数太低,忽略此结果: {hit['_id']}")
continue
except Exception as e:
logger.error(f"查询失败: {e}")
else:
logger.warning("未找到相关历史对话,请检查查询参数或数据。")
prompt = f"""
信息检索与回答助手
根据以下关于'{query}'的相关信息
基本信息
- 语言: 中文
- 描述: 根据提供的材料检索信息并回答问题
- 特点: 快速准确提取关键信息清晰简洁地回答
相关信息
{context}
回答要求
1. 依托给定的资料快速准确地回答问题可以添加一些额外的信息但请勿重复内容
2. 如果未提供相关信息请不要回答
3. 如果发现相关信息与原来的问题契合度低也不要回答
4. 使用HTML格式返回包含适当的段落列表和标题标签
5. 确保内容结构清晰便于前端展示
"""
# 调用阿里云大模型
if len(context) > 0:
html_content = client.chat(prompt)
yield {"data": html_content}
else:
yield {"data": "没有在知识库中找到相关的信息,无法回答此问题。"}
except Exception as e:
logger.error(f"生成报告时出错: {str(e)}")
yield {"data": f"生成报告时出错: {str(e)}"}
finally:
# 释放连接
es_pool.release_connection(es_conn)
logger.info("Elasticsearch连接已释放")
class QueryRequest(BaseModel):
query: str = Field(..., description="用户查询的问题")
documents: List[str] = Field(..., description="用户上传的文档")
class SaveWordRequest(BaseModel):
html: str = Field(..., description="要保存为Word的HTML内容")
@app.post("/api/save-word")
async def save_to_word(request: Request):
temp_html = None
output_file = None
try:
# Parse request data
try:
data = await request.json()
html_content = data.get('html_content', '')
if not html_content:
raise ValueError("Empty HTML content")
except Exception as e:
logger.error(f"Request parsing failed: {str(e)}")
raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}")
# 创建临时HTML文件
temp_html = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + ".html")
with open(temp_html, "w", encoding="utf-8") as f:
f.write(html_content)
# 使用pandoc转换
output_file = os.path.join(tempfile.gettempdir(), "理想大模型问答.docx")
subprocess.run(['pandoc', temp_html, '-o', output_file], check=True)
# 读取生成的Word文件
with open(output_file, "rb") as f:
stream = BytesIO(f.read())
# 返回响应
encoded_filename = urllib.parse.quote("理想大模型问答.docx")
return StreamingResponse(
stream,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"})
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# 清理临时文件
try:
if temp_html and os.path.exists(temp_html):
os.remove(temp_html)
if output_file and os.path.exists(output_file):
os.remove(output_file)
except Exception as e:
logger.warning(f"Failed to clean up temp files: {str(e)}")
@app.post("/api/rag")
async def rag_stream(request: Request):
try:
data = await request.json()
query_request = QueryRequest(**data)
except ValidationError as e:
logger.error(f"请求体验证失败: {e.errors()}")
raise HTTPException(status_code=422, detail=e.errors())
except Exception as e:
logger.error(f"请求解析失败: {str(e)}")
raise HTTPException(status_code=400, detail="无效的请求格式")
"""RAG+ALiYun接口"""
async for chunk in generate_stream(
request.app.state.aliyun_util,
request.app.state.es_pool,
request.app.state.collection_manager,
query_request.query,
query_request.documents
):
return chunk
if __name__ == "__main__":
logger.info("启动Elasticsearch混合搜索服务")
uvicorn.run(app, host="0.0.0.0", port=8000)
Loading…
Cancel
Save