diff --git a/AI/WxMini/Milvus/MulvusConfig.py b/AI/WxMini/Milvus/Config/MulvusConfig.py similarity index 100% rename from AI/WxMini/Milvus/MulvusConfig.py rename to AI/WxMini/Milvus/Config/MulvusConfig.py diff --git a/AI/WxMini/Milvus/Config/__init__.py b/AI/WxMini/Milvus/Config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/WxMini/Milvus/Config/__pycache__/MulvusConfig.cpython-310.pyc b/AI/WxMini/Milvus/Config/__pycache__/MulvusConfig.cpython-310.pyc new file mode 100644 index 00000000..65b8dae5 Binary files /dev/null and b/AI/WxMini/Milvus/Config/__pycache__/MulvusConfig.cpython-310.pyc differ diff --git a/AI/WxMini/Milvus/Config/__pycache__/__init__.cpython-310.pyc b/AI/WxMini/Milvus/Config/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 00000000..51629354 Binary files /dev/null and b/AI/WxMini/Milvus/Config/__pycache__/__init__.cpython-310.pyc differ diff --git a/AI/WxMini/Milvus/T1.py b/AI/WxMini/Milvus/T1.py deleted file mode 100644 index e5160ceb..00000000 --- a/AI/WxMini/Milvus/T1.py +++ /dev/null @@ -1,148 +0,0 @@ -# 安装 pymilvus 库(如果尚未安装) -# cd D:\anaconda3\envs\py310\Scripts -# pip install pymilvus DBUtils - -# 导入必要的模块 -from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility -from queue import Queue -import threading -import random -from MulvusConfig import * - -# 1. 手动实现 Milvus 连接池 -class MilvusConnectionPool: - def __init__(self, host, port, max_connections=5): - """ - 初始化 Milvus 连接池 - :param host: Milvus 主机地址 - :param port: Milvus 端口 - :param max_connections: 最大连接数 - """ - self.host = host - self.port = port - self.max_connections = max_connections - self._pool = Queue(max_connections) - self._lock = threading.Lock() - - # 初始化连接池 - for _ in range(max_connections): - self._pool.put(self._create_connection()) - - def _create_connection(self): - """ - 创建一个新的 Milvus 连接 - :return: Milvus 连接对象 - """ - return connections.connect(host=self.host, port=self.port, alias="default") - - def get_connection(self): - """ - 从连接池中获取一个连接 - :return: Milvus 连接对象 - """ - with self._lock: - if not self._pool.empty(): - return self._pool.get() - else: - raise Exception("连接池已满,无法获取连接") - - def release_connection(self, connection): - """ - 释放连接,将其放回连接池 - :param connection: Milvus 连接对象 - """ - with self._lock: - if self._pool.qsize() < self.max_connections: - self._pool.put(connection) - else: - connections.disconnect("default") - - def close(self): - """ - 关闭连接池,释放所有连接 - """ - with self._lock: - while not self._pool.empty(): - connection = self._pool.get() - connections.disconnect("default") - -# 2. 使用连接池管理 Milvus 连接 -# 创建 Milvus 连接池,指定主机地址和端口 -milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) - -# 从连接池中获取一个连接 -connection = milvus_pool.get_connection() - -# 3. 定义集合的字段和模式 -# 创建字段列表,定义集合的结构 -fields = [ - FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID - FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) # 向量字段,维度为 128 -] -# 创建集合模式,包含字段列表和描述 -schema = CollectionSchema(fields, description="Simple demo collection") - -# 4. 创建集合 -# 定义集合名称 -collection_name = "demo_collection" -# 检查集合是否已存在,如果存在则删除 -if utility.has_collection(collection_name): - utility.drop_collection(collection_name) # 删除已存在的集合 -# 使用集合模式和名称创建新集合 -collection = Collection(name=collection_name, schema=schema) - -# 5. 插入数据 -# 生成随机向量数据,模拟插入的向量 -data = [ - [random.random() for _ in range(128)], # 第一个 128 维向量 - [random.random() for _ in range(128)], # 第二个 128 维向量 - [random.random() for _ in range(128)] # 第三个 128 维向量 -] -# 将数据包装为实体列表 -entities = [data] # 插入的数据 -# 将数据插入集合 -collection.insert(entities) - -# 6. 创建索引 -# 定义索引参数 -index_params = { - "index_type": "IVF_FLAT", # 使用 IVF_FLAT 索引类型 - "metric_type": "L2", # 使用 L2 距离度量方式 - "params": {"nlist": 128} # 设置 IVF_FLAT 的 nlist 参数 -} -# 在向量字段上创建索引 -collection.create_index("embedding", index_params) - -# 7. 加载集合到内存 -# 将集合加载到内存中,以便进行查询 -collection.load() - -# 8. 查询数据 -# 生成一个随机查询向量 -query_vector = [random.random() for _ in range(128)] # 查询向量 -# 定义查询参数 -search_params = { - "metric_type": "L2", # 使用 L2 距离度量方式 - "params": {"nprobe": 10} # 设置 IVF_FLAT 的 nprobe 参数 -} -# 执行查询 -results = collection.search( - data=[query_vector], # 查询向量 - anns_field="embedding", # 查询的向量字段 - param=search_params, # 查询参数 - limit=2 # 返回最相似的 2 个结果 -) - -# 9. 输出查询结果 -# 遍历查询结果,输出每个结果的 ID 和距离 -for hits in results: - for hit in hits: - print(f"ID: {hit.id}, Distance: {hit.distance}") - -# 10. 释放连接 -# 将连接放回连接池 -milvus_pool.release_connection(connection) - -# 11. 关闭连接池 -# 关闭 Milvus 连接池 -milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/T1_create_collection.py b/AI/WxMini/Milvus/T1_create_collection.py new file mode 100644 index 00000000..99918cb5 --- /dev/null +++ b/AI/WxMini/Milvus/T1_create_collection.py @@ -0,0 +1,38 @@ +from pymilvus import FieldSchema, DataType, utility +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 判断集合是否存在,存在则删除 +if utility.has_collection(collection_name): + print(f"集合 '{collection_name}' 已存在,正在删除...") + utility.drop_collection(collection_name) + print(f"集合 '{collection_name}' 已删除。") + +# 5. 定义集合的字段和模式 +fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), # 主键字段,自动生成 ID + FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) # 向量字段,维度为 128 +] +schema_description = "Simple demo collection" + +# 6. 创建集合 +print(f"正在创建集合 '{collection_name}'...") +collection_manager.create_collection(fields, schema_description) +print(f"集合 '{collection_name}' 创建成功。") + +# 7. 释放连接 +milvus_pool.release_connection(connection) + +# 8. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/T2_insert_data.py b/AI/WxMini/Milvus/T2_insert_data.py new file mode 100644 index 00000000..2afea069 --- /dev/null +++ b/AI/WxMini/Milvus/T2_insert_data.py @@ -0,0 +1,56 @@ +# insert_data.py +import random + +from pymilvus import FieldSchema, DataType + +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 插入数据 +data = [ + [random.random() for _ in range(128)], # 第一个 128 维向量 + [random.random() for _ in range(128)], # 第二个 128 维向量 + [random.random() for _ in range(128)] # 第三个 128 维向量 +] +entities = [data] # 插入的数据 +collection_manager.insert_data(entities) + +# 5. 创建索引 +index_params = { + "index_type": "IVF_FLAT", # 使用 IVF_FLAT 索引类型 + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nlist": 128} # 设置 IVF_FLAT 的 nlist 参数 +} +collection_manager.create_index("embedding", index_params) + +# 6. 加载集合到内存 +collection_manager.load_collection() + +# 7. 查询数据,验证插入是否成功 +query_vector = [random.random() for _ in range(128)] # 随机生成一个查询向量 +search_params = { + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nprobe": 10} # 设置 IVF_FLAT 的 nprobe 参数 +} +results = collection_manager.search(query_vector, search_params, limit=2) +print("查询结果:") +for hits in results: + for hit in hits: + print(f"ID: {hit.id}, Distance: {hit.distance}") + +# 8. 释放连接 +milvus_pool.release_connection(connection) + +# 9. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/T3_search_data.py b/AI/WxMini/Milvus/T3_search_data.py new file mode 100644 index 00000000..08a10984 --- /dev/null +++ b/AI/WxMini/Milvus/T3_search_data.py @@ -0,0 +1,37 @@ +import random + +from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager +from WxMini.Milvus.Utils.MilvusConnectionPool import * +from WxMini.Milvus.Config.MulvusConfig import * + +# 1. 使用连接池管理 Milvus 连接 +milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS) + +# 2. 从连接池中获取一个连接 +connection = milvus_pool.get_connection() + +# 3. 初始化集合管理器 +collection_name = MS_COLLECTION_NAME +collection_manager = MilvusCollectionManager(collection_name) + +# 4. 加载集合到内存 +collection_manager.load_collection() +print(f"集合 '{collection_name}' 已加载到内存。") + +# 5. 查询数据 +query_vector = [random.random() for _ in range(128)] # 随机生成一个查询向量 +search_params = { + "metric_type": "L2", # 使用 L2 距离度量方式 + "params": {"nprobe": 10} # 设置 IVF_FLAT 的 nprobe 参数 +} +results = collection_manager.search(query_vector, search_params, limit=2) +print("查询结果:") +for hits in results: + for hit in hits: + print(f"ID: {hit.id}, Distance: {hit.distance}") + +# 6. 释放连接 +milvus_pool.release_connection(connection) + +# 7. 关闭连接池 +milvus_pool.close() \ No newline at end of file diff --git a/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py b/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py new file mode 100644 index 00000000..8fd36f40 --- /dev/null +++ b/AI/WxMini/Milvus/Utils/MilvusCollectionManager.py @@ -0,0 +1,82 @@ +# MilvusCollectionManager.py +from pymilvus import Collection, utility, CollectionSchema + + +class MilvusCollectionManager: + def __init__(self, collection_name): + """ + 初始化集合管理器 + :param collection_name: 集合名称 + """ + self.collection_name = collection_name + self.collection = None + self._load_collection_if_exists() + + def _load_collection_if_exists(self): + """ + 如果集合存在,则加载集合 + """ + if utility.has_collection(self.collection_name): + self.collection = Collection(name=self.collection_name) + print(f"集合 '{self.collection_name}' 已加载。") + else: + print(f"集合 '{self.collection_name}' 不存在。") + + def create_collection(self, fields, schema_description): + """ + 创建集合 + :param fields: 字段列表 + :param schema_description: 集合描述 + """ + if utility.has_collection(self.collection_name): + utility.drop_collection(self.collection_name) # 如果集合已存在,先删除 + schema = CollectionSchema(fields, description=schema_description) + self.collection = Collection(name=self.collection_name, schema=schema) + print(f"集合 '{self.collection_name}' 创建成功。") + + def insert_data(self, entities): + """ + 插入数据 + :param entities: 要插入的数据 + """ + if self.collection is None: + raise Exception("集合未创建,请先调用 create_collection 方法") + self.collection.insert(entities) + print("数据插入成功。") + + def create_index(self, field_name, index_params): + """ + 创建索引 + :param field_name: 字段名称 + :param index_params: 索引参数 + """ + if self.collection is None: + raise Exception("集合未创建,请先调用 create_collection 方法") + self.collection.create_index(field_name, index_params) + print("索引创建成功。") + + def load_collection(self): + """ + 加载集合到内存 + """ + if self.collection is None: + raise Exception("集合未创建,请先调用 create_collection 方法") + self.collection.load() + print("集合已加载到内存。") + + def search(self, query_vector, search_params, limit=2): + """ + 查询数据 + :param query_vector: 查询向量 + :param search_params: 查询参数 + :param limit: 返回结果数量 + :return: 查询结果 + """ + if self.collection is None: + raise Exception("集合未创建,请先调用 create_collection 方法") + return self.collection.search( + data=[query_vector], + anns_field="embedding", + param=search_params, + limit=limit + ) \ No newline at end of file diff --git a/AI/WxMini/Milvus/Utils/MilvusConnectionPool.py b/AI/WxMini/Milvus/Utils/MilvusConnectionPool.py new file mode 100644 index 00000000..a8416104 --- /dev/null +++ b/AI/WxMini/Milvus/Utils/MilvusConnectionPool.py @@ -0,0 +1,62 @@ +import threading +from queue import Queue + +from pymilvus import connections + + +# 1. 手动实现 Milvus 连接池 +class MilvusConnectionPool: + def __init__(self, host, port, max_connections=5): + """ + 初始化 Milvus 连接池 + :param host: Milvus 主机地址 + :param port: Milvus 端口 + :param max_connections: 最大连接数 + """ + self.host = host + self.port = port + self.max_connections = max_connections + self._pool = Queue(max_connections) + self._lock = threading.Lock() + + # 初始化连接池 + for _ in range(max_connections): + self._pool.put(self._create_connection()) + + def _create_connection(self): + """ + 创建一个新的 Milvus 连接 + :return: Milvus 连接对象 + """ + return connections.connect(host=self.host, port=self.port, alias="default") + + def get_connection(self): + """ + 从连接池中获取一个连接 + :return: Milvus 连接对象 + """ + with self._lock: + if not self._pool.empty(): + return self._pool.get() + else: + raise Exception("连接池已满,无法获取连接") + + def release_connection(self, connection): + """ + 释放连接,将其放回连接池 + :param connection: Milvus 连接对象 + """ + with self._lock: + if self._pool.qsize() < self.max_connections: + self._pool.put(connection) + else: + connections.disconnect("default") + + def close(self): + """ + 关闭连接池,释放所有连接 + """ + with self._lock: + while not self._pool.empty(): + connection = self._pool.get() + connections.disconnect("default") \ No newline at end of file diff --git a/AI/WxMini/Milvus/Utils/__init__.py b/AI/WxMini/Milvus/Utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/WxMini/Milvus/Utils/__pycache__/MilvusCollectionManager.cpython-310.pyc b/AI/WxMini/Milvus/Utils/__pycache__/MilvusCollectionManager.cpython-310.pyc new file mode 100644 index 00000000..36fe8f91 Binary files /dev/null and b/AI/WxMini/Milvus/Utils/__pycache__/MilvusCollectionManager.cpython-310.pyc differ diff --git a/AI/WxMini/Milvus/Utils/__pycache__/MilvusConnectionPool.cpython-310.pyc b/AI/WxMini/Milvus/Utils/__pycache__/MilvusConnectionPool.cpython-310.pyc new file mode 100644 index 00000000..eaaf82d1 Binary files /dev/null and b/AI/WxMini/Milvus/Utils/__pycache__/MilvusConnectionPool.cpython-310.pyc differ diff --git a/AI/WxMini/Milvus/Utils/__pycache__/__init__.cpython-310.pyc b/AI/WxMini/Milvus/Utils/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 00000000..7b39722c Binary files /dev/null and b/AI/WxMini/Milvus/Utils/__pycache__/__init__.cpython-310.pyc differ diff --git a/AI/WxMini/Milvus/__init__.py b/AI/WxMini/Milvus/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/WxMini/Milvus/__pycache__/MulvusConfig.cpython-310.pyc b/AI/WxMini/Milvus/__pycache__/MulvusConfig.cpython-310.pyc deleted file mode 100644 index 1badb2ae..00000000 Binary files a/AI/WxMini/Milvus/__pycache__/MulvusConfig.cpython-310.pyc and /dev/null differ diff --git a/AI/WxMini/Milvus/__pycache__/__init__.cpython-310.pyc b/AI/WxMini/Milvus/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 00000000..5b2492ec Binary files /dev/null and b/AI/WxMini/Milvus/__pycache__/__init__.cpython-310.pyc differ diff --git a/AI/WxMini/__init__.py b/AI/WxMini/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/AI/WxMini/__pycache__/__init__.cpython-310.pyc b/AI/WxMini/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 00000000..443b2279 Binary files /dev/null and b/AI/WxMini/__pycache__/__init__.cpython-310.pyc differ