You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
from datetime import datetime
from elasticsearch import Elasticsearch
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('../sentence_save.log'),
logging.StreamHandler()
]
)
from Config.Config import ES_CONFIG
from Test.T2_Txt2Vec import text_to_embedding
# 初始化ES连接
es = Elasticsearch(
hosts=ES_CONFIG["hosts"],
basic_auth=ES_CONFIG["basic_auth"],
verify_certs=ES_CONFIG["verify_certs"],
ssl_show_warn=ES_CONFIG["ssl_show_warn"]
)
def check_text_exists(text, index_name="raw_texts"):
"""检查文本是否已存在"""
try:
# 添加长度验证
if len(text) > 256:
text = text[:256]
query = {
"query": {
"bool": {
"must": [
{"term": {"text.keyword": text}},
{"match": {"text": text}}
]
}
}
}
res = es.search(index=index_name, body=query, size=1)
return res['hits']['hits'][0]['_id'] if res['hits']['hits'] else None
except Exception as e:
print(f"检查失败: {str(e)}")
return None
def save_vector(text, index_name="knowledge_base"):
"""将文本向量化后保存到ES"""
try:
logging.info(f"开始向量化文本: {text[:50]}...")
vector = text_to_embedding(text)
doc = {
"text": text,
"vector": vector,
"timestamp": datetime.now()
}
res = es.index(index=index_name, document=doc)
logging.info(f"成功保存向量化文档到{index_name}, ID: {res['_id']}, 文本长度: {len(text)}")
return res['_id']
except Exception as e:
logging.error(f"向量保存失败: {str(e)}", exc_info=True)
raise
def save_raw_text(text, index_name="raw_texts"):
"""保存原始文本到ES"""
try:
# 准备文档
doc = {
"text": text,
"timestamp": datetime.now()
}
# 保存到ES
res = es.index(index=index_name, document=doc)
print(f"原始文本已保存ID: {res['_id']}")
return res['_id']
except Exception as e:
print(f"保存失败: {str(e)}")
raise
def save_text_with_check(text):
"""带检查的保存流程"""
# 检查文本是否已存在
existing_id = check_text_exists(text)
if existing_id:
print(f"文本已存在ID: {existing_id}")
return existing_id
# 保存向量
vector_id = save_vector(text)
# 保存原始文本
raw_id = save_raw_text(text)
return {"vector_id": vector_id, "raw_id": raw_id}
# 使用示例
if __name__ == "__main__":
# 示例文本
sample_text = "如何更换支付宝绑定银行卡"
# 带检查的保存
save_text_with_check(sample_text)