main
HuangHai 4 months ago
parent 17066c1b27
commit d2243aa457

@ -62,6 +62,25 @@ class MilvusCollectionManager:
self.collection.load()
print("集合已加载到内存。")
def query_by_id(self, id):
"""
根据 ID 查询非向量字段
:param id: 记录的 ID
:return: 包含非向量字段的字典
"""
try:
# 使用 Milvus 的 query 方法查询指定 ID 的记录
results = self.collection.query(
expr=f"id == {id}", # 查询条件
output_fields=["id", "session_id", "user_input", "model_response", "timestamp"] # 返回的字段
)
if results:
return results[0] # 返回第一条记录
else:
return None
except Exception as e:
print(f"查询失败: {e}")
return None
def search(self, query_embedding, search_params, limit=2):
"""
查询数据

@ -0,0 +1,42 @@
from pymilvus import FieldSchema, DataType, utility
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 判断集合是否存在,存在则删除
if utility.has_collection(collection_name):
print(f"集合 '{collection_name}' 已存在,正在删除...")
utility.drop_collection(collection_name)
print(f"集合 '{collection_name}' 已删除。")
# 5. 定义集合的字段和模式
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID
FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=64), # 会话 ID
FieldSchema(name="user_input", dtype=DataType.VARCHAR, max_length=500), # 用户问题
FieldSchema(name="model_response", dtype=DataType.VARCHAR, max_length=500), # 大模型反馈结果
FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=32), # 时间
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=MS_DIMENSION) # 向量字段,维度为 200
]
schema_description = "Chat records collection with session ID, user input, model response, and timestamp"
# 6. 创建集合
print(f"正在创建集合 '{collection_name}'...")
collection_manager.create_collection(fields, schema_description)
print(f"集合 '{collection_name}' 创建成功。")
# 7. 释放连接
milvus_pool.release_connection(connection)
# 8. 关闭连接池
milvus_pool.close()

@ -0,0 +1,28 @@
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.Config.MulvusConfig import *
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 创建索引
index_params = {
"index_type": "IVF_FLAT", # 使用 IVF_FLAT 索引类型
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nlist": 128} # 设置 IVF_FLAT 的 nlist 参数
}
collection_manager.create_index("embedding", index_params) # 为 embedding 字段创建索引
print(f"集合 '{collection_name}''embedding' 字段索引创建成功。")
# 5. 释放连接
milvus_pool.release_connection(connection)
# 6. 关闭连接池
milvus_pool.close()

@ -0,0 +1,61 @@
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from gensim.models import KeyedVectors
import jieba
import time
# 1. 加载预训练的 Word2Vec 模型
model_path = MS_MODEL_PATH
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
# 功能:将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
print(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
print("未找到有效词,返回零向量")
return [0.0] * model.vector_size
# 2. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 3. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 4. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 5. 输入一个用户问题
user_input = input("请输入一句话:") # 例如:“我今天心情不太好”
model_response = "我没听懂,能再说一遍吗?" # 大模型的固定回复
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 当前时间
session_id = "session_001" # 会话 ID可以根据需要动态生成
# 6. 将用户问题转换为嵌入向量
user_embedding = text_to_embedding(user_input)
# 7. 插入数据,确保字段顺序与集合定义一致
entities = [
[session_id], # session_id
[user_input], # user_input
[model_response], # model_response
[timestamp], # timestamp
[user_embedding] # embedding
]
collection_manager.insert_data(entities)
print("数据插入成功。")
# 8. 释放连接
milvus_pool.release_connection(connection)
# 9. 关闭连接池
milvus_pool.close()

@ -0,0 +1,56 @@
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.Config.MulvusConfig import *
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 加载集合到内存
collection_manager.load_collection()
print(f"集合 '{collection_name}' 已加载到内存。")
# 5. 查询所有数据
try:
# 使用 Milvus 的 query 方法查询所有数据
results = collection_manager.collection.query(
expr="", # 空表达式表示查询所有数据
output_fields=["id", "session_id", "user_input", "model_response", "timestamp", "embedding"], # 指定返回的字段
limit=1000 # 设置最大返回记录数
)
print("查询结果:")
if results:
for result in results:
try:
# 获取字段值
session_id = result["session_id"]
user_input = result["user_input"]
model_response = result["model_response"]
timestamp = result["timestamp"]
embedding = result["embedding"]
# 打印结果
print(f"ID: {result['id']}")
print(f"会话 ID: {session_id}")
print(f"用户问题: {user_input}")
print(f"大模型回复: {model_response}")
print(f"时间: {timestamp}")
print(f"向量: {embedding[:5]}...") # 只打印前 5 维向量
print("-" * 40) # 分隔线
except Exception as e:
print(f"查询失败: {e}")
else:
print("未找到相关数据,请检查查询参数或数据。")
except Exception as e:
print(f"查询失败: {e}")
# 6. 释放连接
milvus_pool.release_connection(connection)
# 7. 关闭连接池
milvus_pool.close()

@ -0,0 +1,83 @@
import time
import jieba # 导入 jieba 分词库
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.Config.MulvusConfig import *
from gensim.models import KeyedVectors
# 1. 加载预训练的 Word2Vec 模型
model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
print(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
print("未找到有效词,返回零向量")
return [0.0] * model.vector_size
# 2. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 3. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 4. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 5. 加载集合到内存
collection_manager.load_collection()
print(f"集合 '{collection_name}' 已加载到内存。")
# 6. 输入一句话
input_text = input("请输入一句话:") # 例如:“我今天心情不太好”
# 7. 将文本转换为嵌入向量
current_embedding = text_to_embedding(input_text)
# 8. 查询与当前对话最相关的历史对话
search_params = {
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
start_time = time.time()
results = collection_manager.search(current_embedding, search_params, limit=2) # 返回 2 条结果
end_time = time.time()
# 9. 输出查询结果
print("最相关的历史对话:")
if results:
for hits in results:
for hit in hits:
try:
# 查询非向量字段
record = collection_manager.query_by_id(hit.id)
print(f"ID: {hit.id}")
print(f"会话 ID: {record['session_id']}")
print(f"用户问题: {record['user_input']}")
print(f"大模型回复: {record['model_response']}")
print(f"时间: {record['timestamp']}")
print(f"距离: {hit.distance}")
print("-" * 40) # 分隔线
except Exception as e:
print(f"查询失败: {e}")
else:
print("未找到相关历史对话,请检查查询参数或数据。")
# 10. 输出查询耗时
print(f"查询耗时: {end_time - start_time:.4f}")
# 11. 释放连接
milvus_pool.release_connection(connection)
# 12. 关闭连接池
milvus_pool.close()
Loading…
Cancel
Save