Files
dsProject/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py
2025-08-19 09:18:25 +08:00

263 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import warnings
import hashlib # 导入哈希库
import time
from Config.Config import ES_CONFIG
from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool
from elasticsearch import Elasticsearch
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from pydantic import SecretStr
from Config import Config
# 禁用HTTPS相关警告
def disableWarning():
# 抑制HTTPS相关警告
warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure')
warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host')
# 初始化配置
disableWarning()
# 初始化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EsSearchUtil:
def __init__(self, es_config):
"""
初始化Elasticsearch搜索工具
:param es_config: Elasticsearch配置字典包含hosts, username, password, index_name等
"""
self.es_config = es_config
# 初始化连接池
self.es_pool = ElasticsearchConnectionPool(
hosts=es_config['hosts'],
basic_auth=es_config['basic_auth'],
verify_certs=es_config.get('verify_certs', False),
max_connections=50
)
self.index_name = es_config['index_name']
logger.info(f"EsSearchUtil初始化成功索引名称: {self.index_name}")
def rebuild_mapping(self):
"""
重建Elasticsearch索引和mapping结构
返回:
bool: 操作是否成功
"""
try:
# 从连接池获取连接
conn = self.es_pool.get_connection()
# 定义mapping结构
mapping = {
"mappings": {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 1024, # embedding维度为1024
"index": True,
"similarity": "l2_norm" # 使用L2距离
},
"user_input": {"type": "text"},
"tags": {
"type": "object",
"properties": {
"tags": {"type": "keyword"},
"full_content": {"type": "text"}
}
}
}
}
}
# 检查索引是否存在,存在则删除
if conn.indices.exists(index=self.index_name):
conn.indices.delete(index=self.index_name)
logger.info(f"删除已存在的索引 '{self.index_name}'")
print(f"删除已存在的索引 '{self.index_name}'")
# 创建索引和mapping
conn.indices.create(index=self.index_name, body=mapping)
logger.info(f"索引 '{self.index_name}' 创建成功mapping结构已设置")
print(f"索引 '{self.index_name}' 创建成功mapping结构已设置。")
return True
except Exception as e:
logger.error(f"重建mapping失败: {str(e)}")
print(f"重建mapping失败: {e}")
return False
finally:
# 释放连接回连接池
self.es_pool.release_connection(conn)
def text_search(self, query, size=10):
# 从连接池获取连接
conn = self.es_pool.get_connection()
try:
# 使用连接执行搜索
result = conn.search(
index=self.es_config['index_name'],
query={"match": {"user_input": query}},
size=size
)
return result
except Exception as e:
logger.error(f"文本搜索失败: {str(e)}")
raise
finally:
# 释放连接回连接池
self.es_pool.release_connection(conn)
def split_text_into_chunks(text: str, chunk_size: int = 200, chunk_overlap: int = 0) -> list:
"""
将文本切割成块
参数:
text: 要切割的文本
chunk_size: 每个块的大小
chunk_overlap: 块之间的重叠大小
返回:
list: 文本块列表
"""
# 创建文档对象
docs = [Document(page_content=text, metadata={"source": "simulated_document"})]
# 切割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True
)
all_splits = text_splitter.split_documents(docs)
print(f"切割后的文档块数量:{len(all_splits)}")
return [split.page_content for split in all_splits]
def insert_long_text_to_es(long_text: str, tags: list = None) -> bool:
"""
将长文本切割后向量化并插入到Elasticsearch基于文本内容哈希实现去重
参数:
long_text: 要插入的长文本
tags: 可选的标签列表
返回:
bool: 插入是否成功
"""
try:
# 1. 初始化Elasticsearch连接
es = Elasticsearch(
hosts=Config.ES_CONFIG['hosts'],
basic_auth=Config.ES_CONFIG['basic_auth'],
verify_certs=False
)
# 2. 检查索引是否存在,不存在则创建
index_name = Config.ES_CONFIG['index_name']
if not es.indices.exists(index=index_name):
# 定义mapping结构
mapping = {
"mappings": {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 1024, # 根据实际embedding维度调整
"index": True,
"similarity": "l2_norm"
},
"user_input": {"type": "text"},
"tags": {
"type": "object",
"properties": {
"tags": {"type": "keyword"},
"full_content": {"type": "text"}
}
},
"timestamp": {"type": "date"}
}
}
}
es.indices.create(index=index_name, body=mapping)
print(f"索引 '{index_name}' 创建成功")
# 3. 切割文本
text_chunks = split_text_into_chunks(long_text)
# 4. 准备标签
if tags is None:
tags = ["general_text"]
# 5. 获取当前时间
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 6. 创建嵌入模型
embeddings = OpenAIEmbeddings(
model=Config.EMBED_MODEL_NAME,
base_url=Config.EMBED_BASE_URL,
api_key=SecretStr(Config.EMBED_API_KEY)
)
# 7. 为每个文本块生成向量并插入
for i, chunk in enumerate(text_chunks):
# 生成文本块的哈希值作为文档ID
doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest()
# 检查文档是否已存在
if es.exists(index=index_name, id=doc_id):
print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}")
continue
# 生成文本块的嵌入向量
embedding = embeddings.embed_documents([chunk])[0]
# 准备文档数据
doc = {
'tags': {"tags": tags, "full_content": long_text},
'user_input': chunk,
'timestamp': timestamp,
'embedding': embedding
}
# 插入数据到Elasticsearch
es.index(index=index_name, id=doc_id, document=doc)
print(f"文档块 {i+1} 插入成功: {doc_id}")
return True
except Exception as e:
print(f"插入数据失败: {e}")
return False
# 添加main函数进行测试
if __name__ == "__main__":
try:
# 创建EsSearchUtil实例
search_util = EsSearchUtil(ES_CONFIG)
# 查询"混凝土"
query = "混凝土"
logger.info(f"开始查询关键词: {query}")
results = search_util.text_search(query, size=5)
print(f"查询 '{query}' 完成,共找到 {len(results['hits']['hits'])} 条结果")
# 在打印结果数量后添加
for i, hit in enumerate(results['hits']['hits'], 1):
print(f"结果 {i}:")
print(f"得分: {hit['_score']}")
print(f"内容: {hit['_source'].get('user_input', '无内容')}")
print("-" * 50)
except Exception as e:
logger.error(f"测试失败: {str(e)}")
print(f"测试失败: {str(e)}")