|
|
|
@ -3,6 +3,7 @@ import subprocess
|
|
|
|
|
import tempfile
|
|
|
|
|
import urllib.parse
|
|
|
|
|
import uuid
|
|
|
|
|
import logging
|
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
|
from io import BytesIO
|
|
|
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
@ -16,16 +17,16 @@ from gensim.models import KeyedVectors
|
|
|
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
|
|
|
from starlette.responses import StreamingResponse
|
|
|
|
|
|
|
|
|
|
from Config.Config import MS_MODEL_PATH, MS_MODEL_LIMIT, MS_HOST, MS_PORT, MS_MAX_CONNECTIONS, MS_NPROBE, \
|
|
|
|
|
MS_COLLECTION_NAME
|
|
|
|
|
from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
|
|
|
|
|
from Milvus.Utils.MilvusConnectionPool import *
|
|
|
|
|
from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool
|
|
|
|
|
from Config.Config import ES_CONFIG, MS_MODEL_PATH, MS_MODEL_LIMIT
|
|
|
|
|
from ElasticSearch.Utils.ElasticsearchCollectionManager import ElasticsearchCollectionManager
|
|
|
|
|
from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool
|
|
|
|
|
from Util.ALiYunUtil import ALiYunUtil
|
|
|
|
|
|
|
|
|
|
# 初始化日志
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
# 确保日志目录存在
|
|
|
|
|
os.makedirs('Logs', exist_ok=True)
|
|
|
|
|
handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5)
|
|
|
|
|
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
|
logger.addHandler(handler)
|
|
|
|
@ -42,19 +43,30 @@ def html_to_word_pandoc(html_file, output_file):
|
|
|
|
|
|
|
|
|
|
@asynccontextmanager
|
|
|
|
|
async def lifespan(app: FastAPI):
|
|
|
|
|
# 初始化Milvus连接池
|
|
|
|
|
app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
|
|
|
|
|
# 初始化Elasticsearch连接池
|
|
|
|
|
app.state.es_pool = ElasticsearchConnectionPool(
|
|
|
|
|
hosts=ES_CONFIG["hosts"],
|
|
|
|
|
basic_auth=ES_CONFIG["basic_auth"],
|
|
|
|
|
verify_certs=ES_CONFIG["verify_certs"],
|
|
|
|
|
max_connections=50
|
|
|
|
|
)
|
|
|
|
|
logger.info("Elasticsearch连接池初始化完成")
|
|
|
|
|
|
|
|
|
|
# 初始化集合管理器
|
|
|
|
|
app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME)
|
|
|
|
|
app.state.collection_manager.load_collection()
|
|
|
|
|
app.state.collection_manager = ElasticsearchCollectionManager(ES_CONFIG["index_name"])
|
|
|
|
|
# 获取连接并加载索引
|
|
|
|
|
es_conn = app.state.es_pool.get_connection()
|
|
|
|
|
try:
|
|
|
|
|
app.state.collection_manager.load_collection(es_conn)
|
|
|
|
|
finally:
|
|
|
|
|
app.state.es_pool.release_connection(es_conn)
|
|
|
|
|
|
|
|
|
|
# 初始化阿里云大模型工具
|
|
|
|
|
app.state.aliyun_util = ALiYunUtil()
|
|
|
|
|
|
|
|
|
|
yield
|
|
|
|
|
# 关闭Milvus连接池
|
|
|
|
|
app.state.milvus_pool.close()
|
|
|
|
|
# 关闭Elasticsearch连接池
|
|
|
|
|
app.state.es_pool.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
@ -78,59 +90,61 @@ def text_to_embedding(text):
|
|
|
|
|
return [0.0] * model.vector_size
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def generate_stream(client, milvus_pool, collection_manager, query, documents):
|
|
|
|
|
async def generate_stream(client, es_pool, collection_manager, query, documents):
|
|
|
|
|
# 从连接池获取连接
|
|
|
|
|
connection = milvus_pool.get_connection()
|
|
|
|
|
es_conn = es_pool.get_connection()
|
|
|
|
|
try:
|
|
|
|
|
# 1. 将查询文本转换为向量
|
|
|
|
|
current_embedding = text_to_embedding(query)
|
|
|
|
|
|
|
|
|
|
# 2. 搜索相关数据
|
|
|
|
|
search_params = {
|
|
|
|
|
"metric_type": "L2", # 使用 L2 距离度量方式
|
|
|
|
|
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
|
|
|
|
|
}
|
|
|
|
|
# 动态生成expr表达式
|
|
|
|
|
# 2. 构建搜索参数
|
|
|
|
|
if documents:
|
|
|
|
|
conditions = [f"array_contains(tags['tags'], '{doc}')" for doc in documents]
|
|
|
|
|
expr = " OR ".join(conditions)
|
|
|
|
|
expr = {
|
|
|
|
|
"bool": {
|
|
|
|
|
"should": [
|
|
|
|
|
{"terms": {"tags.tags": [doc]}} for doc in documents
|
|
|
|
|
],
|
|
|
|
|
"minimum_should_match": 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
expr = "" # 如果没有选择文档,返回空字符串
|
|
|
|
|
# 7. 将文本转换为嵌入向量
|
|
|
|
|
results = collection_manager.search(current_embedding,
|
|
|
|
|
search_params,
|
|
|
|
|
expr=expr, # 使用in操作符
|
|
|
|
|
limit=5) # 返回 5 条结果
|
|
|
|
|
expr = None
|
|
|
|
|
|
|
|
|
|
# 3. 执行混合搜索
|
|
|
|
|
results = collection_manager.search(
|
|
|
|
|
es_conn,
|
|
|
|
|
current_embedding,
|
|
|
|
|
search_params={"k": 5}, # 返回5个结果
|
|
|
|
|
expr=expr,
|
|
|
|
|
limit=5
|
|
|
|
|
)
|
|
|
|
|
logger.info(f"搜索返回结果数量: {len(results) if results else 0}")
|
|
|
|
|
|
|
|
|
|
# 3. 处理搜索结果
|
|
|
|
|
logger.info("最相关的知识库内容:")
|
|
|
|
|
context = ""
|
|
|
|
|
if results:
|
|
|
|
|
for hits in results:
|
|
|
|
|
for hit in hits:
|
|
|
|
|
try:
|
|
|
|
|
# 查询非向量字段
|
|
|
|
|
record = collection_manager.query_by_id(hit.id)
|
|
|
|
|
if hit.distance < 0.88: # 设置距离阈值
|
|
|
|
|
logger.info(f"ID: {hit.id}")
|
|
|
|
|
logger.info(f"标签: {record['tags']}")
|
|
|
|
|
logger.info(f"用户问题: {record['user_input']}")
|
|
|
|
|
logger.info(f"时间: {record['timestamp']}")
|
|
|
|
|
logger.info(f"距离: {hit.distance}")
|
|
|
|
|
logger.info("-" * 40) # 分隔线
|
|
|
|
|
# 获取完整内容
|
|
|
|
|
full_content = record['tags'].get('full_content', record['user_input'])
|
|
|
|
|
context = context + full_content
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(f"距离太远,忽略此结果: {hit.id}")
|
|
|
|
|
logger.info(f"标签: {record['tags']}")
|
|
|
|
|
logger.info(f"用户问题: {record['user_input']}")
|
|
|
|
|
logger.info(f"时间: {record['timestamp']}")
|
|
|
|
|
logger.info(f"距离: {hit.distance}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"查询失败: {e}")
|
|
|
|
|
for hit in results:
|
|
|
|
|
try:
|
|
|
|
|
source = hit["_source"]
|
|
|
|
|
score = hit["_score"]
|
|
|
|
|
|
|
|
|
|
if score > 0.5: # 设置相似度阈值
|
|
|
|
|
logger.info(f"ID: {hit['_id']}")
|
|
|
|
|
logger.info(f"标签: {source['tags']}")
|
|
|
|
|
logger.info(f"用户问题: {source['user_input']}")
|
|
|
|
|
logger.info(f"时间: {source['timestamp']}")
|
|
|
|
|
logger.info(f"分数: {score}")
|
|
|
|
|
logger.info("-" * 40)
|
|
|
|
|
|
|
|
|
|
full_content = source['tags'].get('full_content', source['user_input'])
|
|
|
|
|
context = context + full_content
|
|
|
|
|
else:
|
|
|
|
|
logger.warning(f"分数太低,忽略此结果: {hit['_id']}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"查询失败: {e}")
|
|
|
|
|
else:
|
|
|
|
|
logger.warning("未找到相关历史对话,请检查查询参数或数据。")
|
|
|
|
|
|
|
|
|
@ -161,10 +175,12 @@ async def generate_stream(client, milvus_pool, collection_manager, query, docume
|
|
|
|
|
else:
|
|
|
|
|
yield {"data": "没有在知识库中找到相关的信息,无法回答此问题。"}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"生成报告时出错: {str(e)}")
|
|
|
|
|
yield {"data": f"生成报告时出错: {str(e)}"}
|
|
|
|
|
finally:
|
|
|
|
|
# 释放连接
|
|
|
|
|
milvus_pool.release_connection(connection)
|
|
|
|
|
es_pool.release_connection(es_conn)
|
|
|
|
|
logger.info("Elasticsearch连接已释放")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
@ -262,4 +278,5 @@ async def rag_stream(request: Request):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
logger.info("启动Elasticsearch混合搜索服务")
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
|
|
|
|