From d2243aa45724fdb63ac24eb20d641128ed56a57b Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Mon, 24 Mar 2025 11:26:20 +0800 Subject: [PATCH] 'commit' --- .../Milvus/Utils/MilvusCollectionManager.py | 19 +++++ AI/WxMini/Milvus/X1_create_collection.py | 42 ++++++++++ AI/WxMini/Milvus/X2_create_index.py | 28 +++++++ AI/WxMini/Milvus/X3_insert_data.py | 61 ++++++++++++++ AI/WxMini/Milvus/X4_select_all_data.py | 56 +++++++++++++ AI/WxMini/Milvus/X5_search_near_data.py | 83 +++++++++++++++++++ 6 files changed, 289 insertions(+) create mode 100644 AI/WxMini/Milvus/X1_create_collection.py create mode 100644 AI/WxMini/Milvus/X2_create_index.py create mode 100644 AI/WxMini/Milvus/X3_insert_data.py create mode 100644 AI/WxMini/Milvus/X4_select_all_data.py create mode 100644 AI/WxMini/Milvus/X5_search_near_data.py diff --git a/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py b/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py index 8894b477..eefad47e 100644 --- a/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py +++ b/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py @@ -62,6 +62,25 @@ class MilvusCollectionManager: self.collection.load() print("集合已加载到内存。") + def query_by_id(self, id): + """ + 根据 ID 查询非向量字段 + :param id: 记录的 ID + :return: 包含非向量字段的字典 + """ + try: + # 使用 Milvus 的 query 方法查询指定 ID 的记录 + results = self.collection.query( + expr=f"id == {id}", # 查询条件 + output_fields=["id", "session_id", "user_input", "model_response", "timestamp"] # 返回的字段 + ) + if results: + return results[0] # 返回第一条记录 + else: + return None + except Exception as e: + print(f"查询失败: {e}") + return None def search(self, query_embedding, search_params, limit=2): """ 查询数据 diff --git a/AI/WxMini/Milvus/X1_create_collection.py b/AI/WxMini/Milvus/X1_create_collection.py new file mode 100644 index 00000000..48de3b46 --- /dev/null +++ b/AI/WxMini/Milvus/X1_create_collection.py @@ -0,0 +1,42 @@ +from pymilvus import FieldSchema, DataType, utility +from WxMini.Milvus.Config.MulvusConfig import * +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 判断集合是否存在,存在则删除 +if utility.has_collection(collection_name): + print(f"集合 '{collection_name}' 已存在,正在删除...") + utility.drop_collection(collection_name) + print(f"集合 '{collection_name}' 已删除。") + +# 5. 定义集合的字段和模式 +fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID + FieldSchema(name="session_id", dtype=DataType.VARCHAR, max_length=64), # 会话 ID + FieldSchema(name="user_input", dtype=DataType.VARCHAR, max_length=500), # 用户问题 + FieldSchema(name="model_response", dtype=DataType.VARCHAR, max_length=500), # 大模型反馈结果 + FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=32), # 时间 + FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=MS_DIMENSION) # 向量字段,维度为 200 +] +schema_description = "Chat records collection with session ID, user input, model response, and timestamp" + +# 6. 创建集合 +print(f"正在创建集合 '{collection_name}'...") +collection_manager.create_collection(fields, schema_description) +print(f"集合 '{collection_name}' 创建成功。") + +# 7. 释放连接 +milvus_pool.release_connection(connection) + +# 8. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/X2_create_index.py b/AI/WxMini/Milvus/X2_create_index.py new file mode 100644 index 00000000..3d06a886 --- /dev/null +++ b/AI/WxMini/Milvus/X2_create_index.py @@ -0,0 +1,28 @@ +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 创建索引 +index_params = { + "index_type": "IVF_FLAT", # 使用 IVF_FLAT 索引类型 + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nlist": 128} # 设置 IVF_FLAT 的 nlist 参数 +} +collection_manager.create_index("embedding", index_params) # 为 embedding 字段创建索引 +print(f"集合 '{collection_name}' 的 'embedding' 字段索引创建成功。") + +# 5. 释放连接 +milvus_pool.release_connection(connection) + +# 6. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/X3_insert_data.py b/AI/WxMini/Milvus/X3_insert_data.py new file mode 100644 index 00000000..d6665500 --- /dev/null +++ b/AI/WxMini/Milvus/X3_insert_data.py @@ -0,0 +1,61 @@ +from WxMini.Milvus.Config.MulvusConfig import * +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from gensim.models import KeyedVectors +import jieba +import time + +# 1. 加载预训练的 Word2Vec 模型 +model_path = MS_MODEL_PATH +model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) +print(f"模型加载成功,词向量维度: {model.vector_size}") + +# 功能:将文本转换为嵌入向量 +def text_to_embedding(text): + words = jieba.lcut(text) # 使用 jieba 分词 + print(f"文本: {text}, 分词结果: {words}") + embeddings = [model[word] for word in words if word in model] + print(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + print("未找到有效词,返回零向量") + return [0.0] * model.vector_size + +# 2. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 3. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 4. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 5. 输入一个用户问题 +user_input = input("请输入一句话:") # 例如:“我今天心情不太好” +model_response = "我没听懂,能再说一遍吗?" # 大模型的固定回复 +timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 当前时间 +session_id = "session_001" # 会话 ID(可以根据需要动态生成) + +# 6. 将用户问题转换为嵌入向量 +user_embedding = text_to_embedding(user_input) + +# 7. 插入数据,确保字段顺序与集合定义一致 +entities = [ + [session_id], # session_id + [user_input], # user_input + [model_response], # model_response + [timestamp], # timestamp + [user_embedding] # embedding +] +collection_manager.insert_data(entities) +print("数据插入成功。") + +# 8. 释放连接 +milvus_pool.release_connection(connection) + +# 9. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/X4_select_all_data.py b/AI/WxMini/Milvus/X4_select_all_data.py new file mode 100644 index 00000000..290c89ec --- /dev/null +++ b/AI/WxMini/Milvus/X4_select_all_data.py @@ -0,0 +1,56 @@ +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 加载集合到内存 +collection_manager.load_collection() +print(f"集合 '{collection_name}' 已加载到内存。") + +# 5. 查询所有数据 +try: + # 使用 Milvus 的 query 方法查询所有数据 + results = collection_manager.collection.query( + expr="", # 空表达式表示查询所有数据 + output_fields=["id", "session_id", "user_input", "model_response", "timestamp", "embedding"], # 指定返回的字段 + limit=1000 # 设置最大返回记录数 + ) + print("查询结果:") + if results: + for result in results: + try: + # 获取字段值 + session_id = result["session_id"] + user_input = result["user_input"] + model_response = result["model_response"] + timestamp = result["timestamp"] + embedding = result["embedding"] + # 打印结果 + print(f"ID: {result['id']}") + print(f"会话 ID: {session_id}") + print(f"用户问题: {user_input}") + print(f"大模型回复: {model_response}") + print(f"时间: {timestamp}") + print(f"向量: {embedding[:5]}...") # 只打印前 5 维向量 + print("-" * 40) # 分隔线 + except Exception as e: + print(f"查询失败: {e}") + else: + print("未找到相关数据,请检查查询参数或数据。") +except Exception as e: + print(f"查询失败: {e}") + +# 6. 释放连接 +milvus_pool.release_connection(connection) + +# 7. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/X5_search_near_data.py b/AI/WxMini/Milvus/X5_search_near_data.py new file mode 100644 index 00000000..2f4e5233 --- /dev/null +++ b/AI/WxMini/Milvus/X5_search_near_data.py @@ -0,0 +1,83 @@ +import time +import jieba # 导入 jieba 分词库 +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * +from gensim.models import KeyedVectors + +# 1. 加载预训练的 Word2Vec 模型 +model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径 +model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT) +print(f"模型加载成功,词向量维度: {model.vector_size}") + +# 将文本转换为嵌入向量 +def text_to_embedding(text): + words = jieba.lcut(text) # 使用 jieba 分词 + print(f"文本: {text}, 分词结果: {words}") + embeddings = [model[word] for word in words if word in model] + print(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + print("未找到有效词,返回零向量") + return [0.0] * model.vector_size + +# 2. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 3. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 4. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 5. 加载集合到内存 +collection_manager.load_collection() +print(f"集合 '{collection_name}' 已加载到内存。") + +# 6. 输入一句话 +input_text = input("请输入一句话:") # 例如:“我今天心情不太好” + +# 7. 将文本转换为嵌入向量 +current_embedding = text_to_embedding(input_text) + +# 8. 查询与当前对话最相关的历史对话 +search_params = { + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数 +} +start_time = time.time() +results = collection_manager.search(current_embedding, search_params, limit=2) # 返回 2 条结果 +end_time = time.time() + +# 9. 输出查询结果 +print("最相关的历史对话:") +if results: + for hits in results: + for hit in hits: + try: + # 查询非向量字段 + record = collection_manager.query_by_id(hit.id) + print(f"ID: {hit.id}") + print(f"会话 ID: {record['session_id']}") + print(f"用户问题: {record['user_input']}") + print(f"大模型回复: {record['model_response']}") + print(f"时间: {record['timestamp']}") + print(f"距离: {hit.distance}") + print("-" * 40) # 分隔线 + except Exception as e: + print(f"查询失败: {e}") +else: + print("未找到相关历史对话,请检查查询参数或数据。") + +# 10. 输出查询耗时 +print(f"查询耗时: {end_time - start_time:.4f} 秒") + +# 11. 释放连接 +milvus_pool.release_connection(connection) + +# 12. 关闭连接池 +milvus_pool.close() \ No newline at end of file