diff --git a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py index 7bf50549..ecd207bd 100644 --- a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py +++ b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py @@ -1,140 +1,12 @@ -import hashlib # 导入哈希库 -import time import warnings -from elasticsearch import Elasticsearch -from langchain_core.documents import Document -from langchain_text_splitters import RecursiveCharacterTextSplitter -from langchain_openai import OpenAIEmbeddings -from pydantic import SecretStr - -from Config import Config +from ElasticSearch.Utils.EsSearchUtil import insert_long_text_to_es # 抑制HTTPS相关警告 warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') -def split_text_into_chunks(text: str, chunk_size: int = 200, chunk_overlap: int = 0) -> list: - """ - 将文本切割成块 - - 参数: - text: 要切割的文本 - chunk_size: 每个块的大小 - chunk_overlap: 块之间的重叠大小 - - 返回: - list: 文本块列表 - """ - # 创建文档对象 - docs = [Document(page_content=text, metadata={"source": "simulated_document"})] - - # 切割文档 - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True - ) - all_splits = text_splitter.split_documents(docs) - print(f"切割后的文档块数量:{len(all_splits)}") - - return [split.page_content for split in all_splits] - - -def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: - """ - 将长文本切割后向量化并插入到Elasticsearch,基于文本内容哈希实现去重 - - 参数: - long_text: 要插入的长文本 - tags: 可选的标签列表 - - 返回: - bool: 插入是否成功 - """ - try: - # 1. 初始化Elasticsearch连接 - es = Elasticsearch( - hosts=Config.ES_CONFIG['hosts'], - basic_auth=Config.ES_CONFIG['basic_auth'], - verify_certs=False - ) - - # 2. 检查索引是否存在,不存在则创建 - index_name = Config.ES_CONFIG['index_name'] - if not es.indices.exists(index=index_name): - # 定义mapping结构 - mapping = { - "mappings": { - "properties": { - "embedding": { - "type": "dense_vector", - "dims": 1024, # 根据实际embedding维度调整 - "index": True, - "similarity": "l2_norm" - }, - "user_input": {"type": "text"}, - "tags": { - "type": "object", - "properties": { - "tags": {"type": "keyword"}, - "full_content": {"type": "text"} - } - }, - "timestamp": {"type": "date"} - } - } - } - es.indices.create(index=index_name, body=mapping) - print(f"索引 '{index_name}' 创建成功") - - # 3. 切割文本 - text_chunks = split_text_into_chunks(long_text) - - # 4. 准备标签 - if tags is None: - tags = ["general_text"] - - # 5. 获取当前时间 - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - # 6. 创建嵌入模型 - embeddings = OpenAIEmbeddings( - model=Config.EMBED_MODEL_NAME, - base_url=Config.EMBED_BASE_URL, - api_key=SecretStr(Config.EMBED_API_KEY) - ) - - # 7. 为每个文本块生成向量并插入 - for i, chunk in enumerate(text_chunks): - # 生成文本块的哈希值作为文档ID - doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest() - - # 检查文档是否已存在 - if es.exists(index=index_name, id=doc_id): - print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}") - continue - - # 生成文本块的嵌入向量 - embedding = embeddings.embed_documents([chunk])[0] - - # 准备文档数据 - doc = { - 'tags': {"tags": tags, "full_content": long_text}, - 'user_input': chunk, - 'timestamp': timestamp, - 'embedding': embedding - } - - # 插入数据到Elasticsearch - es.index(index=index_name, id=doc_id, document=doc) - print(f"文档块 {i+1} 插入成功: {doc_id}") - - return True - except Exception as e: - print(f"插入数据失败: {e}") - return False - - def main(): # 示例1:插入单个长文本 long_text = """混凝土是一种广泛使用的建筑材料,由水泥、砂、石子和水混合而成。它具有高强度、耐久性和良好的可塑性,被广泛应用于建筑、桥梁、道路等土木工程领域。 diff --git a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py index 4e885056..033858d9 100644 --- a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py +++ b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py @@ -1,9 +1,16 @@ import logging import warnings - +import hashlib # 导入哈希库 +import time from Config.Config import ES_CONFIG from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool +from elasticsearch import Elasticsearch +from langchain_core.documents import Document +from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_openai import OpenAIEmbeddings +from pydantic import SecretStr +from Config import Config # 抑制HTTPS相关警告 warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') @@ -50,6 +57,128 @@ class EsSearchUtil: self.es_pool.release_connection(conn) + +def split_text_into_chunks(text: str, chunk_size: int = 200, chunk_overlap: int = 0) -> list: + """ + 将文本切割成块 + + 参数: + text: 要切割的文本 + chunk_size: 每个块的大小 + chunk_overlap: 块之间的重叠大小 + + 返回: + list: 文本块列表 + """ + # 创建文档对象 + docs = [Document(page_content=text, metadata={"source": "simulated_document"})] + + # 切割文档 + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True + ) + all_splits = text_splitter.split_documents(docs) + print(f"切割后的文档块数量:{len(all_splits)}") + + return [split.page_content for split in all_splits] + + +def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: + """ + 将长文本切割后向量化并插入到Elasticsearch,基于文本内容哈希实现去重 + + 参数: + long_text: 要插入的长文本 + tags: 可选的标签列表 + + 返回: + bool: 插入是否成功 + """ + try: + # 1. 初始化Elasticsearch连接 + es = Elasticsearch( + hosts=Config.ES_CONFIG['hosts'], + basic_auth=Config.ES_CONFIG['basic_auth'], + verify_certs=False + ) + + # 2. 检查索引是否存在,不存在则创建 + index_name = Config.ES_CONFIG['index_name'] + if not es.indices.exists(index=index_name): + # 定义mapping结构 + mapping = { + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 1024, # 根据实际embedding维度调整 + "index": True, + "similarity": "l2_norm" + }, + "user_input": {"type": "text"}, + "tags": { + "type": "object", + "properties": { + "tags": {"type": "keyword"}, + "full_content": {"type": "text"} + } + }, + "timestamp": {"type": "date"} + } + } + } + es.indices.create(index=index_name, body=mapping) + print(f"索引 '{index_name}' 创建成功") + + # 3. 切割文本 + text_chunks = split_text_into_chunks(long_text) + + # 4. 准备标签 + if tags is None: + tags = ["general_text"] + + # 5. 获取当前时间 + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 6. 创建嵌入模型 + embeddings = OpenAIEmbeddings( + model=Config.EMBED_MODEL_NAME, + base_url=Config.EMBED_BASE_URL, + api_key=SecretStr(Config.EMBED_API_KEY) + ) + + # 7. 为每个文本块生成向量并插入 + for i, chunk in enumerate(text_chunks): + # 生成文本块的哈希值作为文档ID + doc_id = hashlib.md5(chunk.encode('utf-8')).hexdigest() + + # 检查文档是否已存在 + if es.exists(index=index_name, id=doc_id): + print(f"文档块 {i+1} 已存在,跳过插入: {doc_id}") + continue + + # 生成文本块的嵌入向量 + embedding = embeddings.embed_documents([chunk])[0] + + # 准备文档数据 + doc = { + 'tags': {"tags": tags, "full_content": long_text}, + 'user_input': chunk, + 'timestamp': timestamp, + 'embedding': embedding + } + + # 插入数据到Elasticsearch + es.index(index=index_name, id=doc_id, document=doc) + print(f"文档块 {i+1} 插入成功: {doc_id}") + + return True + except Exception as e: + print(f"插入数据失败: {e}") + return False + + + # 添加main函数进行测试 if __name__ == "__main__": try: diff --git a/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc b/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc new file mode 100644 index 00000000..8f081a8f Binary files /dev/null and b/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc differ