import logging import warnings from Config.Config import ES_CONFIG from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool # 抑制HTTPS相关警告 warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) class EsSearchUtil: def __init__(self, es_config): """ 初始化Elasticsearch搜索工具 :param es_config: Elasticsearch配置字典,包含hosts, username, password, index_name等 """ self.es_config = es_config # 初始化连接池 self.es_pool = ElasticsearchConnectionPool( hosts=es_config['hosts'], basic_auth=es_config['basic_auth'], verify_certs=es_config.get('verify_certs', False), max_connections=50 ) self.index_name = es_config['index_name'] logger.info(f"EsSearchUtil初始化成功,索引名称: {self.index_name}") def text_search(self, query, size=10): # 从连接池获取连接 conn = self.es_pool.get_connection() try: # 使用连接执行搜索 result = conn.search( index=self.es_config['index_name'], query={"match": {"user_input": query}}, size=size ) return result except Exception as e: logger.error(f"文本搜索失败: {str(e)}") raise finally: # 释放连接回连接池 self.es_pool.release_connection(conn) # 添加main函数进行测试 if __name__ == "__main__": try: # 创建EsSearchUtil实例 search_util = EsSearchUtil(ES_CONFIG) # 查询"混凝土" query = "混凝土" logger.info(f"开始查询关键词: {query}") results = search_util.text_search(query, size=5) # 打印结果 logger.info(f"查询到 {len(results['hits']['hits'])} 条结果") for i, hit in enumerate(results['hits']['hits'], 1): logger.info(f"结果 {i}:") logger.info(f"得分: {hit['_score']}") logger.info(f"内容: {hit['_source'].get('user_input', '无内容')}") logger.info("-" * 50) print(f"查询 '{query}' 完成,共找到 {len(results['hits']['hits'])} 条结果") except Exception as e: logger.error(f"测试失败: {str(e)}") print(f"测试失败: {str(e)}")