From 1fd96fbc4e90ed234b40dcfd777244b25bdf88f9 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Tue, 19 Aug 2025 09:23:36 +0800 Subject: [PATCH] 'commit' --- .../ElasticSearch/Utils/EsSearchUtil.py | 207 +++++++++--------- 1 file changed, 100 insertions(+), 107 deletions(-) diff --git a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py index 7d7bc4c8..dcb15e6b 100644 --- a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py +++ b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py @@ -1,18 +1,15 @@ import logging import warnings -import hashlib # 导入哈希库 +import hashlib import time from Config.Config import ES_CONFIG from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool -from elasticsearch import Elasticsearch from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from pydantic import SecretStr - from Config import Config - # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -111,130 +108,126 @@ class EsSearchUtil: # 释放连接回连接池 self.es_pool.release_connection(conn) + def split_text_into_chunks(self,text: str, chunk_size: int = 200, chunk_overlap: int = 0) -> list: + """ + 将文本切割成块 + 参数: + text: 要切割的文本 + chunk_size: 每个块的大小 + chunk_overlap: 块之间的重叠大小 -def split_text_into_chunks(text: str, chunk_size: int = 200, chunk_overlap: int = 0) -> list: - """ - 将文本切割成块 + 返回: + list: 文本块列表 + """ + # 创建文档对象 + docs = [Document(page_content=text, metadata={"source": "simulated_document"})] - 参数: - text: 要切割的文本 - chunk_size: 每个块的大小 - chunk_overlap: 块之间的重叠大小 + # 切割文档 + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True + ) + all_splits = text_splitter.split_documents(docs) + print(f"切割后的文档块数量:{len(all_splits)}") - 返回: - list: 文本块列表 - """ - # 创建文档对象 - docs = [Document(page_content=text, metadata={"source": "simulated_document"})] + return [split.page_content for split in all_splits] - # 切割文档 - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True - ) - all_splits = text_splitter.split_documents(docs) - print(f"切割后的文档块数量:{len(all_splits)}") + def insert_long_text_to_es(self,long_text: str, tags: list = None) -> bool: + """ + 将长文本切割后向量化并插入到Elasticsearch,基于文本内容哈希实现去重 - return [split.page_content for split in all_splits] + 参数: + long_text: 要插入的长文本 + tags: 可选的标签列表 + 返回: + bool: 插入是否成功 + """ + try: + # 1. 创建EsSearchUtil实例以使用连接池 + search_util = EsSearchUtil(Config.ES_CONFIG) -def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: - """ - 将长文本切割后向量化并插入到Elasticsearch,基于文本内容哈希实现去重 + # 2. 从连接池获取连接 + conn = search_util.es_pool.get_connection() - 参数: - long_text: 要插入的长文本 - tags: 可选的标签列表 - - 返回: - bool: 插入是否成功 - """ - try: - # 1. 创建EsSearchUtil实例以使用连接池 - search_util = EsSearchUtil(Config.ES_CONFIG) - - # 2. 从连接池获取连接 - conn = search_util.es_pool.get_connection() - - # 3. 检查索引是否存在,不存在则创建 - index_name = Config.ES_CONFIG['index_name'] - if not conn.indices.exists(index=index_name): - # 定义mapping结构 - mapping = { - "mappings": { - "properties": { - "embedding": { - "type": "dense_vector", - "dims": 1024, # 根据实际embedding维度调整 - "index": True, - "similarity": "l2_norm" - }, - "user_input": {"type": "text"}, - "tags": { - "type": "object", - "properties": { - "tags": {"type": "keyword"}, - "full_content": {"type": "text"} - } - }, - "timestamp": {"type": "date"} + # 3. 检查索引是否存在,不存在则创建 + index_name = Config.ES_CONFIG['index_name'] + if not conn.indices.exists(index=index_name): + # 定义mapping结构 + mapping = { + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 1024, # 根据实际embedding维度调整 + "index": True, + "similarity": "l2_norm" + }, + "user_input": {"type": "text"}, + "tags": { + "type": "object", + "properties": { + "tags": {"type": "keyword"}, + "full_content": {"type": "text"} + } + }, + "timestamp": {"type": "date"} + } } } - } - conn.indices.create(index=index_name, body=mapping) - print(f"索引 '{index_name}' 创建成功") + conn.indices.create(index=index_name, body=mapping) + print(f"索引 '{index_name}' 创建成功") - # 4. 切割文本 - text_chunks = split_text_into_chunks(long_text) + # 4. 切割文本 + text_chunks = self.split_text_into_chunks(long_text) - # 5. 准备标签 - if tags is None: - tags = ["general_text"] + # 5. 准备标签 + if tags is None: + tags = ["general_text"] - # 6. 获取当前时间 - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + # 6. 获取当前时间 + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 7. 创建嵌入模型 - embeddings = OpenAIEmbeddings( - model=Config.EMBED_MODEL_NAME, - base_url=Config.EMBED_BASE_URL, - api_key=SecretStr(Config.EMBED_API_KEY) - ) + # 7. 创建嵌入模型 + embeddings = OpenAIEmbeddings( + model=Config.EMBED_MODEL_NAME, + base_url=Config.EMBED_BASE_URL, + api_key=SecretStr(Config.EMBED_API_KEY) + ) - # 8. 为每个文本块生成向量并插入 - for i, chunk in enumerate(text_chunks): - # 生成文本块的哈希值作为文档ID - doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest() + # 8. 为每个文本块生成向量并插入 + for i, chunk in enumerate(text_chunks): + # 生成文本块的哈希值作为文档ID + doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest() - # 检查文档是否已存在 - if conn.exists(index=index_name, id=doc_id): - print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}") - continue + # 检查文档是否已存在 + if conn.exists(index=index_name, id=doc_id): + print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}") + continue - # 生成文本块的嵌入向量 - embedding = embeddings.embed_documents([chunk])[0] + # 生成文本块的嵌入向量 + embedding = embeddings.embed_documents([chunk])[0] - # 准备文档数据 - doc = { - 'tags': {"tags": tags, "full_content": long_text}, - 'user_input': chunk, - 'timestamp': timestamp, - 'embedding': embedding - } + # 准备文档数据 + doc = { + 'tags': {"tags": tags, "full_content": long_text}, + 'user_input': chunk, + 'timestamp': timestamp, + 'embedding': embedding + } - # 插入数据到Elasticsearch - conn.index(index=index_name, id=doc_id, document=doc) - print(f"文档块 {i+1} 插入成功: {doc_id}") - - return True - except Exception as e: - print(f"插入数据失败: {e}") - return False - finally: - # 确保释放连接回连接池 - if 'conn' in locals() and 'search_util' in locals(): - search_util.es_pool.release_connection(conn) + # 插入数据到Elasticsearch + conn.index(index=index_name, id=doc_id, document=doc) + print(f"文档块 {i+1} 插入成功: {doc_id}") + return True + except Exception as e: + print(f"插入数据失败: {e}") + return False + finally: + # 确保释放连接回连接池 + if 'conn' in locals() and 'search_util' in locals(): + search_util.es_pool.release_connection(conn) # 添加main函数进行测试