import logging from datetime import datetime from elasticsearch import Elasticsearch # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('../sentence_save.log'), logging.StreamHandler() ] ) from Config.Config import ES_CONFIG from Test.T2_Txt2Vec import text_to_embedding # 初始化ES连接 es = Elasticsearch( hosts=ES_CONFIG["hosts"], basic_auth=ES_CONFIG["basic_auth"], verify_certs=ES_CONFIG["verify_certs"], ssl_show_warn=ES_CONFIG["ssl_show_warn"] ) def check_text_exists(text, index_name="raw_texts"): """检查文本是否已存在""" try: # 添加长度验证 if len(text) > 256: text = text[:256] query = { "query": { "bool": { "must": [ {"term": {"text.keyword": text}}, {"match": {"text": text}} ] } } } res = es.search(index=index_name, body=query, size=1) return res['hits']['hits'][0]['_id'] if res['hits']['hits'] else None except Exception as e: print(f"检查失败: {str(e)}") return None def save_vector(text, index_name="knowledge_base"): """将文本向量化后保存到ES""" try: logging.info(f"开始向量化文本: {text[:50]}...") vector = text_to_embedding(text) doc = { "text": text, "vector": vector, "timestamp": datetime.now() } res = es.index(index=index_name, document=doc) logging.info(f"成功保存向量化文档到{index_name}, ID: {res['_id']}, 文本长度: {len(text)}") return res['_id'] except Exception as e: logging.error(f"向量保存失败: {str(e)}", exc_info=True) raise def save_raw_text(text, index_name="raw_texts"): """保存原始文本到ES""" try: # 准备文档 doc = { "text": text, "timestamp": datetime.now() } # 保存到ES res = es.index(index=index_name, document=doc) print(f"原始文本已保存,ID: {res['_id']}") return res['_id'] except Exception as e: print(f"保存失败: {str(e)}") raise def save_text_with_check(text): """带检查的保存流程""" # 检查文本是否已存在 existing_id = check_text_exists(text) if existing_id: print(f"文本已存在,ID: {existing_id}") return existing_id # 保存向量 vector_id = save_vector(text) # 保存原始文本 raw_id = save_raw_text(text) return {"vector_id": vector_id, "raw_id": raw_id} # 使用示例 if __name__ == "__main__": # 示例文本 sample_text = "如何更换支付宝绑定银行卡" # 带检查的保存 save_text_with_check(sample_text)