diff --git a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py index a0c23e81..c98291ed 100644 --- a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py +++ b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py @@ -3,8 +3,10 @@ import time import warnings from elasticsearch import Elasticsearch -from langchain_openai import OpenAIEmbeddings # 直接导入嵌入模型 -from pydantic import SecretStr # 用于包装API密钥 +from langchain_core.documents import Document +from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_openai import OpenAIEmbeddings +from pydantic import SecretStr from Config import Config @@ -13,14 +15,39 @@ warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verif warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') +def split_text_into_chunks(text: str, chunk_size: int = 200, chunk_overlap: int = 100) -> list: + """ + 将文本切割成块 + + 参数: + text: 要切割的文本 + chunk_size: 每个块的大小 + chunk_overlap: 块之间的重叠大小 + + 返回: + list: 文本块列表 + """ + # 创建文档对象 + docs = [Document(page_content=text, metadata={"source": "simulated_document"})] + + # 切割文档 + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True + ) + all_splits = text_splitter.split_documents(docs) + print(f"切割后的文档块数量:{len(all_splits)}") + + return [split.page_content for split in all_splits] + + def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: """ - 将长文本向量化并插入到Elasticsearch,基于文本内容哈希实现去重 - + 将长文本切割后向量化并插入到Elasticsearch,基于文本内容哈希实现去重 + 参数: long_text: 要插入的长文本 tags: 可选的标签列表 - + 返回: bool: 插入是否成功 """ @@ -60,44 +87,48 @@ def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: es.indices.create(index=index_name, body=mapping) print(f"索引 '{index_name}' 创建成功") - # 3. 生成文本内容的哈希值作为文档ID(实现去重) - doc_id = hashlib.md5(long_text.encode('utf-8')).hexdigest() - print(f"文本哈希值: {doc_id}") + # 3. 切割文本 + text_chunks = split_text_into_chunks(long_text) - # 4. 检查文档是否已存在 - if es.exists(index=index_name, id=doc_id): - print(f"文档已存在,跳过插入: {doc_id}") - return True - - # 5. 准备标签 + # 4. 准备标签 if tags is None: tags = ["general_text"] - tags_dict = {"tags": tags, "full_content": long_text} - # 6. 获取当前时间 + # 5. 获取当前时间 timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 7. 直接创建嵌入模型并生成向量 + # 6. 创建嵌入模型 embeddings = OpenAIEmbeddings( model=Config.EMBED_MODEL_NAME, base_url=Config.EMBED_BASE_URL, api_key=SecretStr(Config.EMBED_API_KEY) ) - # 8. 生成文本嵌入向量 - embedding = embeddings.embed_documents([long_text])[0] + # 7. 为每个文本块生成向量并插入 + for i, chunk in enumerate(text_chunks): + # 生成文本块的哈希值作为文档ID + doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest() - # 9. 准备文档数据 - doc = { - 'tags': tags_dict, - 'user_input': long_text[:500], # 取前500个字符作为摘要 - 'timestamp': timestamp, - 'embedding': embedding - } + # 检查文档是否已存在 + if es.exists(index=index_name, id=doc_id): + print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}") + continue + + # 生成文本块的嵌入向量 + embedding = embeddings.embed_documents([chunk])[0] + + # 准备文档数据 + doc = { + 'tags': {"tags": tags, "full_content": long_text}, + 'user_input': chunk, + 'timestamp': timestamp, + 'embedding': embedding + } + + # 插入数据到Elasticsearch + es.index(index=index_name, id=doc_id, document=doc) + print(f"文档块 {i+1} 插入成功: {doc_id}") - # 10. 插入数据到Elasticsearch(使用哈希值作为ID) - es.index(index=index_name, id=doc_id, document=doc) - print(f"长文本数据插入成功: {doc_id}") return True except Exception as e: print(f"插入数据失败: {e}") @@ -121,6 +152,5 @@ def main(): insert_long_text_to_es(long_text, tags) - if __name__ == "__main__": main()