main
HuangHai 4 weeks ago
commit f3cc47c012

@ -17,7 +17,7 @@ class MilvusCollectionManager:
""" """
if utility.has_collection(self.collection_name): if utility.has_collection(self.collection_name):
self.collection = Collection(name=self.collection_name) self.collection = Collection(name=self.collection_name)
#print(f"集合 '{self.collection_name}' 已加载。") # print(f"集合 '{self.collection_name}' 已加载。")
else: else:
print(f"集合 '{self.collection_name}' 不存在。") print(f"集合 '{self.collection_name}' 不存在。")
@ -52,6 +52,7 @@ class MilvusCollectionManager:
if self.collection is None: if self.collection is None:
raise Exception("集合未加载,请检查集合是否存在。") raise Exception("集合未加载,请检查集合是否存在。")
self.collection.insert(entities) self.collection.insert(entities)
def load_collection(self): def load_collection(self):
""" """
加载集合到内存 加载集合到内存
@ -70,7 +71,7 @@ class MilvusCollectionManager:
# 使用 Milvus 的 query 方法查询指定 ID 的记录 # 使用 Milvus 的 query 方法查询指定 ID 的记录
results = self.collection.query( results = self.collection.query(
expr=f"id == {id}", # 查询条件 expr=f"id == {id}", # 查询条件
output_fields=["id", "person_id", "user_input", "model_response", "timestamp"] # 返回的字段 output_fields=["id", "document_id", "user_input", "timestamp"] # 返回的字段
) )
if results: if results:
return results[0] # 返回第一条记录 return results[0] # 返回第一条记录
@ -79,6 +80,7 @@ class MilvusCollectionManager:
except Exception as e: except Exception as e:
print(f"查询失败: {e}") print(f"查询失败: {e}")
return None return None
def search(self, data, search_params, expr=None, limit=5): def search(self, data, search_params, expr=None, limit=5):
""" """
在集合中搜索与输入向量最相似的数据 在集合中搜索与输入向量最相似的数据
@ -121,4 +123,4 @@ class MilvusCollectionManager:
if result: if result:
return result[0]["text"] return result[0]["text"]
else: else:
return None return None

@ -27,13 +27,12 @@ if utility.has_collection(collection_name):
# 5. 定义集合的字段和模式 # 5. 定义集合的字段和模式
fields = [ fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID
FieldSchema(name="person_id", dtype=DataType.VARCHAR, max_length=64), # 会话 ID FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=64), # 文档 ID
FieldSchema(name="user_input", dtype=DataType.VARCHAR, max_length=65535), # 用户问题 FieldSchema(name="user_input", dtype=DataType.VARCHAR, max_length=65535), # 用户问题
FieldSchema(name="model_response", dtype=DataType.VARCHAR, max_length=65535), # 大模型反馈结果
FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=32), # 时间 FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=32), # 时间
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=MS_DIMENSION) # 向量字段,维度为 200 FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=MS_DIMENSION) # 向量字段,维度为 200
] ]
schema_description = "Chat records collection with person_id , user input, model response, and timestamp" schema_description = "Chat records collection with document_id , user_input, and timestamp"
# 6. 创建集合 # 6. 创建集合
print(f"正在创建集合 '{collection_name}'...") print(f"正在创建集合 '{collection_name}'...")

@ -46,16 +46,15 @@ for filename in os.listdir(txt_dir):
# 5. 获取当前时间和会话ID # 5. 获取当前时间和会话ID
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
person_id = "MATH_DATA_" + str(hash(filename)) document_id = "MATH_DATA_1" # 史校长的这本书定义为 MATH_DATA_1
# 6. 将文本转换为嵌入向量 # 6. 将文本转换为嵌入向量
embedding = text_to_embedding(content) embedding = text_to_embedding(content)
# 7. 插入数据 # 7. 插入数据
entities = [ entities = [
[person_id], # person_id [document_id], # document_id
[content], # user_input [content], # user_input
[""], # model_response (留空)
[timestamp], # timestamp [timestamp], # timestamp
[embedding] # embedding [embedding] # embedding
] ]

@ -21,7 +21,7 @@ try:
# 使用 Milvus 的 query 方法查询所有数据 # 使用 Milvus 的 query 方法查询所有数据
results = collection_manager.collection.query( results = collection_manager.collection.query(
expr="", # 空表达式表示查询所有数据 expr="", # 空表达式表示查询所有数据
output_fields=["id", "person_id", "user_input", "model_response", "timestamp", "embedding"], # 指定返回的字段 output_fields=["id", "document_id", "user_input", "timestamp", "embedding"], # 指定返回的字段
limit=1000 # 设置最大返回记录数 limit=1000 # 设置最大返回记录数
) )
print("查询结果:") print("查询结果:")
@ -29,16 +29,14 @@ try:
for result in results: for result in results:
try: try:
# 获取字段值 # 获取字段值
person_id = result["person_id"] document_id = result["document_id"]
user_input = result["user_input"] user_input = result["user_input"]
model_response = result["model_response"]
timestamp = result["timestamp"] timestamp = result["timestamp"]
embedding = result["embedding"] embedding = result["embedding"]
# 打印结果 # 打印结果
print(f"ID: {result['id']}") print(f"ID: {result['id']}")
print(f"会话 ID: {person_id}") print(f"文档 ID: {document_id}")
print(f"用户问题: {user_input}") print(f"用户问题: {user_input}")
print(f"大模型回复: {model_response}")
print(f"时间: {timestamp}") print(f"时间: {timestamp}")
print(f"向量: {embedding[:5]}...") # 只打印前 5 维向量 print(f"向量: {embedding[:5]}...") # 只打印前 5 维向量
print("-" * 40) # 分隔线 print("-" * 40) # 分隔线

@ -62,9 +62,8 @@ if results:
# 查询非向量字段 # 查询非向量字段
record = collection_manager.query_by_id(hit.id) record = collection_manager.query_by_id(hit.id)
print(f"ID: {hit.id}") print(f"ID: {hit.id}")
print(f"会话 ID: {record['person_id']}") print(f"文档 ID: {record['document_id']}")
print(f"用户问题: {record['user_input']}") print(f"用户问题: {record['user_input']}")
print(f"大模型回复: {record['model_response']}")
print(f"时间: {record['timestamp']}") print(f"时间: {record['timestamp']}")
print(f"距离: {hit.distance}") print(f"距离: {hit.distance}")
print("-" * 40) # 分隔线 print("-" * 40) # 分隔线

@ -32,11 +32,11 @@ print(f"模型加载成功,词向量维度: {model.vector_size}")
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
# 初始化Milvus连接池 # 初始化Milvus连接池
app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 初始化集合管理器 # 初始化集合管理器
app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME) app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME)
app.state.collection_manager.load_collection() app.state.collection_manager.load_collection()
# 初始化DeepSeek客户端 # 初始化DeepSeek客户端
app.state.deepseek_client = OpenAI( app.state.deepseek_client = OpenAI(
api_key=Config.DEEPSEEK_API_KEY, api_key=Config.DEEPSEEK_API_KEY,
@ -47,8 +47,10 @@ async def lifespan(app: FastAPI):
# 关闭Milvus连接池 # 关闭Milvus连接池
app.state.milvus_pool.close() app.state.milvus_pool.close()
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
# 将文本转换为嵌入向量 # 将文本转换为嵌入向量
def text_to_embedding(text): def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词 words = jieba.lcut(text) # 使用 jieba 分词
@ -63,11 +65,12 @@ def text_to_embedding(text):
print("未找到有效词,返回零向量") print("未找到有效词,返回零向量")
return [0.0] * model.vector_size return [0.0] * model.vector_size
async def generate_stream(client, milvus_pool, collection_manager, query): async def generate_stream(client, milvus_pool, collection_manager, query):
"""生成SSE流""" """生成SSE流"""
# 从连接池获取连接 # 从连接池获取连接
connection = milvus_pool.get_connection() connection = milvus_pool.get_connection()
try: try:
# 1. 将查询文本转换为向量 # 1. 将查询文本转换为向量
current_embedding = text_to_embedding(query) current_embedding = text_to_embedding(query)
@ -79,10 +82,10 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
} }
# 7. 将文本转换为嵌入向量 # 7. 将文本转换为嵌入向量
results = collection_manager.search(current_embedding, search_params, limit=5) # 返回 2 条结果 results = collection_manager.search(current_embedding, search_params, limit=5) # 返回 2 条结果
# 3. 处理搜索结果 # 3. 处理搜索结果
print("最相关的历史对话:") print("最相关的历史对话:")
context="" context = ""
if results: if results:
for hits in results: for hits in results:
for hit in hits: for hit in hits:
@ -90,10 +93,9 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
# 查询非向量字段 # 查询非向量字段
record = collection_manager.query_by_id(hit.id) record = collection_manager.query_by_id(hit.id)
print(f"ID: {hit.id}") print(f"ID: {hit.id}")
print(f"会话 ID: {record['person_id']}") print(f"文档 ID: {record['document_id']}")
print(f"用户问题: {record['user_input']}") print(f"用户问题: {record['user_input']}")
context=context+record['user_input'] context = context + record['user_input']
print(f"大模型回复: {record['model_response']}")
print(f"时间: {record['timestamp']}") print(f"时间: {record['timestamp']}")
print(f"距离: {hit.distance}") print(f"距离: {hit.distance}")
print("-" * 40) # 分隔线 print("-" * 40) # 分隔线
@ -101,7 +103,7 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
print(f"查询失败: {e}") print(f"查询失败: {e}")
else: else:
print("未找到相关历史对话,请检查查询参数或数据。") print("未找到相关历史对话,请检查查询参数或数据。")
prompt = f"""根据以下关于'{query}'的相关信息,# Role: 信息检索与回答助手 prompt = f"""根据以下关于'{query}'的相关信息,# Role: 信息检索与回答助手
## Profile ## Profile
@ -154,7 +156,7 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
相关信息 相关信息
{context}""" {context}"""
response = client.chat.completions.create( response = client.chat.completions.create(
model="deepseek-chat", model="deepseek-chat",
messages=[ messages=[
@ -164,7 +166,7 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
temperature=0.3, temperature=0.3,
stream=True stream=True
) )
for chunk in response: for chunk in response:
if chunk.choices[0].delta.content: if chunk.choices[0].delta.content:
yield {"data": chunk.choices[0].delta.content} yield {"data": chunk.choices[0].delta.content}
@ -174,9 +176,13 @@ async def generate_stream(client, milvus_pool, collection_manager, query):
finally: finally:
# 释放连接 # 释放连接
milvus_pool.release_connection(connection) milvus_pool.release_connection(connection)
""" """
http://10.10.21.22:8000/api/rag?query=小学数学中有哪些模型 http://10.10.21.22:8000/api/rag?query=小学数学中有哪些模型
""" """
@app.post("/api/rag") @app.post("/api/rag")
async def rag_stream(request: Request, query: str = Body(...)): async def rag_stream(request: Request, query: str = Body(...)):
"""RAG+DeepSeek流式接口""" """RAG+DeepSeek流式接口"""
@ -189,7 +195,8 @@ async def rag_stream(request: Request, query: str = Body(...)):
) )
) )
app.mount("/static", StaticFiles(directory="Static"), name="static") app.mount("/static", StaticFiles(directory="Static"), name="static")
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)

Binary file not shown.
Loading…
Cancel
Save