from elasticsearch import Elasticsearch import threading import logging logger = logging.getLogger(__name__) class ElasticsearchConnectionPool: def __init__(self, hosts, basic_auth, verify_certs=False, max_connections=50): """ 初始化Elasticsearch连接池 :param hosts: Elasticsearch服务器地址 :param basic_auth: 认证信息(username, password) :param verify_certs: 是否验证SSL证书 :param max_connections: 最大连接数 """ self.hosts = hosts self.basic_auth = basic_auth self.verify_certs = verify_certs self.max_connections = max_connections self._connections = [] self._lock = threading.Lock() self._initialize_pool() def _initialize_pool(self): """初始化连接池""" for _ in range(self.max_connections): self._connections.append(self._create_connection()) def _create_connection(self): """创建新的Elasticsearch连接""" return Elasticsearch( hosts=self.hosts, basic_auth=self.basic_auth, verify_certs=self.verify_certs ) def get_connection(self): """从连接池获取一个连接""" with self._lock: if not self._connections: logger.warning("Connection pool exhausted, creating new connection") return self._create_connection() return self._connections.pop() def release_connection(self, connection): """释放连接回连接池""" with self._lock: if len(self._connections) < self.max_connections: self._connections.append(connection) else: try: connection.close() except Exception as e: logger.warning(f"Failed to close connection: {str(e)}") def close(self): """关闭所有连接""" with self._lock: for conn in self._connections: try: conn.close() except Exception as e: logger.warning(f"Failed to close connection: {str(e)}") self._connections.clear()