import logging import threading from queue import Queue from pymilvus import connections # 配置日志 logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # 1. 手动实现 Milvus 连接池 class MilvusConnectionPool: def __init__(self, host, port, max_connections=5): """ 初始化 Milvus 连接池 :param host: Milvus 主机地址 :param port: Milvus 端口 :param max_connections: 最大连接数 """ self.host = host self.port = port self.max_connections = max_connections self._pool = Queue(max_connections) self._lock = threading.Lock() # 初始化连接池 for _ in range(max_connections): self._pool.put(self._create_connection()) def _create_connection(self): """ 创建一个新的 Milvus 连接 :return: Milvus 连接对象 """ return connections.connect(host=self.host, port=self.port, alias="default") def get_connection(self): logger.info(f"获取连接,当前可用连接数: {self._pool.qsize()}") """ 从连接池中获取一个连接 :return: Milvus 连接对象 """ with self._lock: if not self._pool.empty(): return self._pool.get() else: raise Exception("连接池已满,无法获取连接") def release_connection(self, connection): """ 释放连接,将其放回连接池 :param connection: Milvus 连接对象 """ with self._lock: if self._pool.qsize() < self.max_connections: self._pool.put(connection) else: connections.disconnect("default") logger.info(f"释放连接,当前可用连接数: {self._pool.qsize()}") def close(self): """ 关闭连接池,释放所有连接 """ with self._lock: while not self._pool.empty(): connection = self._pool.get() connections.disconnect("default")