# D:\anaconda3\envs\py310\Scripts\pip.exe install pymilvus from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility # 1. 连接 Milvus connections.connect("default", host="10.10.14.101", port="19530") # 2. 定义集合的字段和模式 fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) ] schema = CollectionSchema(fields, description="Simple demo collection") # 3. 创建集合 collection_name = "demo_collection" if utility.has_collection(collection_name): utility.drop_collection(collection_name) # 如果集合已存在,先删除 collection = Collection(name=collection_name, schema=schema) # 4. 插入数据 import random data = [ [random.random() for _ in range(128)], # 第一个向量 [random.random() for _ in range(128)], # 第二个向量 [random.random() for _ in range(128)] # 第三个向量 ] entities = [data] # 插入的数据 collection.insert(entities) # 5. 创建索引 index_params = { "index_type": "IVF_FLAT", # 索引类型 "metric_type": "L2", # 距离度量方式 "params": {"nlist": 128} # 索引参数 } collection.create_index("embedding", index_params) # 6. 加载集合到内存 collection.load() # 7. 查询数据 query_vector = [random.random() for _ in range(128)] # 查询向量 search_params = { "metric_type": "L2", "params": {"nprobe": 10} # 查询参数 } results = collection.search( data=[query_vector], # 查询向量 anns_field="embedding", # 查询字段 param=search_params, limit=2 # 返回最相似的 2 个结果 ) # 8. 输出查询结果 for hits in results: for hit in hits: print(f"ID: {hit.id}, Distance: {hit.distance}") # 9. 关闭连接 connections.disconnect("default")