You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
65 lines
2.2 KiB
65 lines
2.2 KiB
from elasticsearch import Elasticsearch
|
|
import threading
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ElasticsearchConnectionPool:
|
|
def __init__(self, hosts, basic_auth, verify_certs=False, max_connections=50):
|
|
"""
|
|
初始化Elasticsearch连接池
|
|
:param hosts: Elasticsearch服务器地址
|
|
:param basic_auth: 认证信息(username, password)
|
|
:param verify_certs: 是否验证SSL证书
|
|
:param max_connections: 最大连接数
|
|
"""
|
|
self.hosts = hosts
|
|
self.basic_auth = basic_auth
|
|
self.verify_certs = verify_certs
|
|
self.max_connections = max_connections
|
|
self._connections = []
|
|
self._lock = threading.Lock()
|
|
self._initialize_pool()
|
|
|
|
def _initialize_pool(self):
|
|
"""初始化连接池"""
|
|
for _ in range(self.max_connections):
|
|
self._connections.append(self._create_connection())
|
|
|
|
def _create_connection(self):
|
|
"""创建新的Elasticsearch连接"""
|
|
return Elasticsearch(
|
|
hosts=self.hosts,
|
|
basic_auth=self.basic_auth,
|
|
verify_certs=self.verify_certs
|
|
)
|
|
|
|
def get_connection(self):
|
|
"""从连接池获取一个连接"""
|
|
with self._lock:
|
|
if not self._connections:
|
|
logger.warning("Connection pool exhausted, creating new connection")
|
|
return self._create_connection()
|
|
return self._connections.pop()
|
|
|
|
def release_connection(self, connection):
|
|
"""释放连接回连接池"""
|
|
with self._lock:
|
|
if len(self._connections) < self.max_connections:
|
|
self._connections.append(connection)
|
|
else:
|
|
try:
|
|
connection.close()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to close connection: {str(e)}")
|
|
|
|
def close(self):
|
|
"""关闭所有连接"""
|
|
with self._lock:
|
|
for conn in self._connections:
|
|
try:
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to close connection: {str(e)}")
|
|
self._connections.clear() |