Files
dsProject/dsSchoolBuddy/ElasticSearch/T3_InsertData.py
2025-08-19 08:26:09 +08:00

117 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib # 导入哈希库
import time
import warnings
from elasticsearch import Elasticsearch
from langchain_openai import OpenAIEmbeddings # 直接导入嵌入模型
from pydantic import SecretStr # 用于包装API密钥
from Config import Config
# 抑制HTTPS相关警告
warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure')
warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host')
def insert_long_text_to_es(long_text: str, tags: list = None) -> bool:
"""
将长文本向量化并插入到Elasticsearch基于文本内容哈希实现去重
参数:
long_text: 要插入的长文本
tags: 可选的标签列表
返回:
bool: 插入是否成功
"""
try:
# 1. 初始化Elasticsearch连接
es = Elasticsearch(
hosts=Config.ES_CONFIG['hosts'],
basic_auth=Config.ES_CONFIG['basic_auth'],
verify_certs=False
)
# 2. 检查索引是否存在,不存在则创建
index_name = Config.ES_CONFIG['index_name']
if not es.indices.exists(index=index_name):
# 定义mapping结构
mapping = {
"mappings": {
"properties": {
"embedding": {
"type": "dense_vector",
"dims": 1024, # 根据实际embedding维度调整
"index": True,
"similarity": "l2_norm"
},
"user_input": {"type": "text"},
"tags": {
"type": "object",
"properties": {
"tags": {"type": "keyword"},
"full_content": {"type": "text"}
}
},
"timestamp": {"type": "date"}
}
}
}
es.indices.create(index=index_name, body=mapping)
print(f"索引 '{index_name}' 创建成功")
# 3. 生成文本内容的哈希值作为文档ID实现去重
doc_id = hashlib.md5(long_text.encode('utf-8')).hexdigest()
print(f"文本哈希值: {doc_id}")
# 4. 检查文档是否已存在
if es.exists(index=index_name, id=doc_id):
print(f"文档已存在,跳过插入: {doc_id}")
return True
# 5. 准备标签
if tags is None:
tags = ["general_text"]
tags_dict = {"tags": tags, "full_content": long_text}
# 6. 获取当前时间
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 7. 直接创建嵌入模型并生成向量
embeddings = OpenAIEmbeddings(
model=Config.EMBED_MODEL_NAME,
base_url=Config.EMBED_BASE_URL,
api_key=SecretStr(Config.EMBED_API_KEY)
)
# 8. 生成文本嵌入向量
embedding = embeddings.embed_documents([long_text])[0]
# 9. 准备文档数据
doc = {
'tags': tags_dict,
'user_input': long_text[:500], # 取前500个字符作为摘要
'timestamp': timestamp,
'embedding': embedding
}
# 10. 插入数据到Elasticsearch使用哈希值作为ID
es.index(index=index_name, id=doc_id, document=doc)
print(f"长文本数据插入成功: {doc_id}")
return True
except Exception as e:
print(f"插入数据失败: {e}")
return False
def main():
# 示例1插入单个长文本
long_text = "这是一个测试长文本,用于演示基于内容哈希的去重机制。"
tags = ["test", "hash_deduplication"]
insert_long_text_to_es(long_text, tags)
if __name__ == "__main__":
main()