You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.1 KiB

1 month ago
import logging
1 month ago
from datetime import datetime
1 month ago
from elasticsearch import Elasticsearch
1 month ago
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
1 month ago
logging.FileHandler('../sentence_save.log'),
1 month ago
logging.StreamHandler()
]
)
1 month ago
from Config.Config import ES_CONFIG
1 month ago
from Test.T2_Txt2Vec import text_to_embedding
1 month ago
# 初始化ES连接
es = Elasticsearch(
hosts=ES_CONFIG["hosts"],
basic_auth=ES_CONFIG["basic_auth"],
verify_certs=ES_CONFIG["verify_certs"],
ssl_show_warn=ES_CONFIG["ssl_show_warn"]
)
1 month ago
def check_text_exists(text, index_name="raw_texts"):
"""检查文本是否已存在"""
try:
# 添加长度验证
if len(text) > 256:
text = text[:256]
query = {
"query": {
"bool": {
"must": [
{"term": {"text.keyword": text}},
{"match": {"text": text}}
]
}
}
}
res = es.search(index=index_name, body=query, size=1)
return res['hits']['hits'][0]['_id'] if res['hits']['hits'] else None
except Exception as e:
print(f"检查失败: {str(e)}")
return None
1 month ago
def save_vector(text, index_name="knowledge_base"):
"""将文本向量化后保存到ES"""
try:
1 month ago
logging.info(f"开始向量化文本: {text[:50]}...")
1 month ago
vector = text_to_embedding(text)
doc = {
"text": text,
"vector": vector,
"timestamp": datetime.now()
}
res = es.index(index=index_name, document=doc)
1 month ago
logging.info(f"成功保存向量化文档到{index_name}, ID: {res['_id']}, 文本长度: {len(text)}")
1 month ago
return res['_id']
except Exception as e:
1 month ago
logging.error(f"向量保存失败: {str(e)}", exc_info=True)
1 month ago
raise
def save_raw_text(text, index_name="raw_texts"):
"""保存原始文本到ES"""
try:
# 准备文档
doc = {
"text": text,
"timestamp": datetime.now()
}
# 保存到ES
res = es.index(index=index_name, document=doc)
print(f"原始文本已保存ID: {res['_id']}")
return res['_id']
except Exception as e:
print(f"保存失败: {str(e)}")
raise
1 month ago
1 month ago
def save_text_with_check(text):
"""带检查的保存流程"""
# 检查文本是否已存在
existing_id = check_text_exists(text)
if existing_id:
print(f"文本已存在ID: {existing_id}")
return existing_id
# 保存向量
vector_id = save_vector(text)
# 保存原始文本
raw_id = save_raw_text(text)
return {"vector_id": vector_id, "raw_id": raw_id}
1 month ago
# 使用示例
if __name__ == "__main__":
1 month ago
# 示例文本
sample_text = "如何更换支付宝绑定银行卡"
1 month ago
# 带检查的保存
save_text_with_check(sample_text)