diff --git a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py index a2d8921b..64bd34fe 100644 --- a/dsSchoolBuddy/ElasticSearch/T3_InsertData.py +++ b/dsSchoolBuddy/ElasticSearch/T3_InsertData.py @@ -1,75 +1,144 @@ -import warnings - -from Config import Config -from Config.Config import * -from elasticsearch import Elasticsearch -from gensim.models import KeyedVectors -import jieba import os import time +import warnings + +from elasticsearch import Elasticsearch +from langchain_core.documents import Document + +from Config import Config +from Util.VectorUtil import text_to_vector_db # 导入向量化工具函数 # 抑制HTTPS相关警告 warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') -# 1. 加载预训练的 Word2Vec 模型 -model_path = MODEL_PATH -model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MODEL_LIMIT) -print(f"模型加载成功,词向量维度: {model.vector_size}") +def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: + """ + 将长文本向量化并插入到Elasticsearch + + 参数: + long_text: 要插入的长文本 + tags: 可选的标签列表 + + 返回: + bool: 插入是否成功 + """ + try: + # 1. 初始化Elasticsearch连接 + es = Elasticsearch( + hosts=Config.ES_CONFIG['hosts'], + basic_auth=Config.ES_CONFIG['basic_auth'], + verify_certs=False + ) -# 功能:将文本转换为嵌入向量 -def text_to_embedding(text): - words = jieba.lcut(text) - embeddings = [model[word] for word in words if word in model] - if embeddings: - return sum(embeddings) / len(embeddings) - return [0.0] * model.vector_size - - -# 2. 初始化Elasticsearch连接 -es = Elasticsearch( - hosts=Config.ES_CONFIG['hosts'], - basic_auth=Config.ES_CONFIG['basic_auth'], - verify_certs=False -) - -# 3. 处理processed_chunks目录下的所有文件 -txt_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Txt') - -for filename in os.listdir(txt_dir): - if filename.endswith('.txt'): - filepath = os.path.join(txt_dir, filename) - with open(filepath, 'r', encoding='utf-8') as f: - # 只读取第一行作为向量计算 - first_line = f.readline().strip() - # 读取全部内容用于后续查询 - full_content = first_line + '\n' + f.read() - - if not first_line: - print(f"跳过空文件: {filename}") - continue - - print(f"正在处理文件: {filename}") - - # 4. 获取当前时间和会话ID - timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - # 需要进行标记的标签 - x = filename.split("_") - selectedTags = [x[0] + "_" + x[1]] - tags = {"tags": selectedTags, "full_content": full_content} # 添加完整内容 - - # 5. 将第一行文本转换为嵌入向量 - embedding = text_to_embedding(first_line) - - # 6. 插入数据到Elasticsearch - doc = { - 'tags': tags, - 'user_input': first_line, - 'timestamp': timestamp, - 'embedding': embedding + # 2. 检查索引是否存在,不存在则创建 + index_name = Config.ES_CONFIG['index_name'] + if not es.indices.exists(index=index_name): + # 定义mapping结构 + mapping = { + "mappings": { + "properties": { + "embedding": { + "type": "dense_vector", + "dims": 1024, # 根据实际embedding维度调整 + "index": True, + "similarity": "l2_norm" + }, + "user_input": {"type": "text"}, + "tags": { + "type": "object", + "properties": { + "tags": {"type": "keyword"}, + "full_content": {"type": "text"} + } + }, + "timestamp": {"type": "date"} + } + } } - es.index(index=ES_CONFIG['index_name'], document=doc) - print(f"文件 {filename} 数据插入成功") + es.indices.create(index=index_name, body=mapping) + print(f"索引 '{index_name}' 创建成功") -print("所有文件处理完成") + # 3. 使用VectorUtil处理文本 + # 这里我们创建一个临时的Document对象 + docs = [Document(page_content=long_text, metadata={"source": "user_provided_text"})] + + # 4. 获取当前时间 + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 5. 准备标签 + if tags is None: + tags = ["general_text"] + tags_dict = {"tags": tags, "full_content": long_text} + + # 6. 使用VectorUtil中的功能获取嵌入向量 + # 注意:我们需要修改text_to_vector_db函数或创建新函数来获取单个文档的嵌入 + # 这里为了演示,我们直接调用text_to_vector_db并获取第一个文档的嵌入 + vector_store, _, _ = text_to_vector_db(long_text) + + # 7. 提取嵌入向量 + # 注意:这里假设我们只处理一个文档 + embedding = vector_store._embeddings.embed_documents([long_text])[0] + + # 8. 插入数据到Elasticsearch + doc = { + 'tags': tags_dict, + 'user_input': long_text[:500], # 取前500个字符作为摘要 + 'timestamp': timestamp, + 'embedding': embedding + } + es.index(index=index_name, document=doc) + print(f"长文本数据插入成功") + return True + except Exception as e: + print(f"插入数据失败: {e}") + return False + + +def process_text_directory(txt_dir: str) -> None: + """ + 处理指定目录下的所有文本文件,将其向量化并插入到Elasticsearch + + 参数: + txt_dir: 包含文本文件的目录路径 + """ + for filename in os.listdir(txt_dir): + if filename.endswith('.txt'): + filepath = os.path.join(txt_dir, filename) + with open(filepath, 'r', encoding='utf-8') as f: + full_content = f.read() + + if not full_content: + print(f"跳过空文件: {filename}") + continue + + print(f"正在处理文件: {filename}") + + # 提取标签 + x = filename.split("_") + if len(x) >= 2: + selected_tags = [x[0] + "_" + x[1]] + else: + selected_tags = ["uncategorized"] + + # 插入文本 + insert_long_text_to_es(full_content, selected_tags) + + +def main(): + # 示例1:插入单个长文本 + long_text = """混凝土是一种广泛使用的建筑材料,由水泥、砂、石子和水混合而成。它具有高强度、耐久性和良好的可塑性,被广泛应用于建筑、桥梁、道路等土木工程领域。 + +混凝土的历史可以追溯到古罗马时期,当时人们使用火山灰、石灰和碎石混合制成类似混凝土的材料。现代混凝土技术始于19世纪,随着波特兰水泥的发明而得到快速发展。 + +混凝土的性能取决于其配合比,包括水灰比、砂率等参数。水灰比是影响混凝土强度的关键因素,较小的水灰比通常会产生更高强度的混凝土。""" + insert_long_text_to_es(long_text, tags=["construction", "materials"]) + + # 示例2:处理目录中的所有文本文件 + # txt_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Txt') + # process_text_directory(txt_dir) + + +if __name__ == "__main__": + main()