import hashlib # 导入哈希库 import time import warnings from elasticsearch import Elasticsearch from langchain_openai import OpenAIEmbeddings # 直接导入嵌入模型 from pydantic import SecretStr # 用于包装API密钥 from Config import Config # 抑制HTTPS相关警告 warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') def insert_long_text_to_es(long_text: str, tags: list = None) -> bool: """ 将长文本向量化并插入到Elasticsearch,基于文本内容哈希实现去重 参数: long_text: 要插入的长文本 tags: 可选的标签列表 返回: bool: 插入是否成功 """ try: # 1. 初始化Elasticsearch连接 es = Elasticsearch( hosts=Config.ES_CONFIG['hosts'], basic_auth=Config.ES_CONFIG['basic_auth'], verify_certs=False ) # 2. 检查索引是否存在,不存在则创建 index_name = Config.ES_CONFIG['index_name'] if not es.indices.exists(index=index_name): # 定义mapping结构 mapping = { "mappings": { "properties": { "embedding": { "type": "dense_vector", "dims": 1024, # 根据实际embedding维度调整 "index": True, "similarity": "l2_norm" }, "user_input": {"type": "text"}, "tags": { "type": "object", "properties": { "tags": {"type": "keyword"}, "full_content": {"type": "text"} } }, "timestamp": {"type": "date"} } } } es.indices.create(index=index_name, body=mapping) print(f"索引 '{index_name}' 创建成功") # 3. 生成文本内容的哈希值作为文档ID(实现去重) doc_id = hashlib.md5(long_text.encode('utf-8')).hexdigest() print(f"文本哈希值: {doc_id}") # 4. 检查文档是否已存在 if es.exists(index=index_name, id=doc_id): print(f"文档已存在,跳过插入: {doc_id}") return True # 5. 准备标签 if tags is None: tags = ["general_text"] tags_dict = {"tags": tags, "full_content": long_text} # 6. 获取当前时间 timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 7. 直接创建嵌入模型并生成向量 embeddings = OpenAIEmbeddings( model=Config.EMBED_MODEL_NAME, base_url=Config.EMBED_BASE_URL, api_key=SecretStr(Config.EMBED_API_KEY) ) # 8. 生成文本嵌入向量 embedding = embeddings.embed_documents([long_text])[0] # 9. 准备文档数据 doc = { 'tags': tags_dict, 'user_input': long_text[:500], # 取前500个字符作为摘要 'timestamp': timestamp, 'embedding': embedding } # 10. 插入数据到Elasticsearch(使用哈希值作为ID) es.index(index=index_name, id=doc_id, document=doc) print(f"长文本数据插入成功: {doc_id}") return True except Exception as e: print(f"插入数据失败: {e}") return False def main(): # 示例1:插入单个长文本 long_text = """混凝土是一种广泛使用的建筑材料,由水泥、砂、石子和水混合而成。它具有高强度、耐久性和良好的可塑性,被广泛应用于建筑、桥梁、道路等土木工程领域。 混凝土的历史可以追溯到古罗马时期,当时人们使用火山灰、石灰和碎石混合制成类似混凝土的材料。现代混凝土技术始于19世纪,随着波特兰水泥的发明而得到快速发展。 混凝土的性能取决于其配合比,包括水灰比、砂率等参数。水灰比是影响混凝土强度的关键因素,较小的水灰比通常会产生更高强度的混凝土。 为了改善混凝土的性能,常常会添加各种外加剂,如减水剂、早强剂、缓凝剂等。此外,还可以使用纤维增强、聚合物改性等技术来提高混凝土的韧性和耐久性。 在施工过程中,混凝土需要适当的养护,以确保其强度正常发展。养护措施包括浇水、覆盖保湿、蒸汽养护等。 随着建筑技术的发展,高性能混凝土、自密实混凝土、再生骨料混凝土等新型混凝土不断涌现,为土木工程领域提供了更多的选择。""" tags = ["test", "hash_deduplication"] insert_long_text_to_es(long_text, tags) if __name__ == "__main__": main()