import random from WxMini.Milvus.Config.MulvusConfig import * from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager from WxMini.Milvus.Utils.MilvusConnectionPool import * # 1. 使用连接池管理 Milvus 连接 milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) # 2. 从连接池中获取一个连接 connection = milvus_pool.get_connection() # 3. 初始化集合管理器 collection_name = MS_COLLECTION_NAME collection_manager = MilvusCollectionManager(collection_name) # 4. 插入数据 texts = [ "我今天心情不太好,因为工作压力很大。", # 第一个对话文本 "我最近在学习 Python,感觉很有趣。", # 第二个对话文本 "我打算周末去爬山,放松一下。" # 第三个对话文本 ] embeddings = [ [random.random() for _ in range(128)], # 第一个 128 维向量 [random.random() for _ in range(128)], # 第二个 128 维向量 [random.random() for _ in range(128)] # 第三个 128 维向量 ] # 插入数据,确保字段顺序与集合定义一致 entities = [texts, embeddings] # 第一个列表是 text 字段,第二个列表是 embedding 字段 collection_manager.insert_data(entities) print("数据插入成功。") # 5. 加载集合到内存 collection_manager.load_collection() # 6. 查询数据,验证插入是否成功 query_vector = [random.random() for _ in range(128)] # 随机生成一个查询向量 search_params = { "metric_type": "L2", # 使用 L2 距离度量方式 "params": {"nprobe": 10} # 设置 IVF_FLAT 的 nprobe 参数 } results = collection_manager.search(query_vector, search_params, limit=2) print("查询结果:") if results: for hits in results: for hit in hits: text = collection_manager.query_text_by_id(hit.id) print(f"ID: {hit.id}, Text: {text}, Distance: {hit.distance}") else: print("未找到相关数据,请检查查询参数或数据。") # 7. 释放连接 milvus_pool.release_connection(connection) # 8. 关闭连接池 milvus_pool.close()