import datetime from elasticsearch import Elasticsearch from Config.Config import ES_CONFIG from T2_Txt2Vec import text_to_embedding from Util.EsMappingUtil import create_vector_index # 导入工具函数 # 初始化ES连接 es = Elasticsearch( hosts=ES_CONFIG["hosts"], basic_auth=ES_CONFIG["basic_auth"], verify_certs=ES_CONFIG["verify_certs"], ssl_show_warn=ES_CONFIG["ssl_show_warn"] ) def save_to_es(text, index_name="knowledge_base"): """将文本向量化后保存到ES""" # 检查是否已存在相同文本 query = { "query": { "term": { "text.keyword": { "value": text } } } } exists = es.search(index=index_name, body=query) if exists["hits"]["total"]["value"] > 0: print(f"文档已存在,跳过保存: {text}") return exists["hits"]["hits"][0]["_id"] # 返回现有文档ID # 保存新文档 vector = text_to_embedding(text) doc = { "text": text, "vector": vector, "timestamp": datetime.datetime.now().isoformat() } try: res = es.index(index=index_name, document=doc) print(f"文档已保存,ID: {res['_id']}") return res["_id"] except Exception as e: print(f"保存到ES失败: {str(e)}") raise # 使用示例 if __name__ == "__main__": create_vector_index(dims=200) # 使用工具函数创建索引 save_to_es("如何更换支付宝绑定银行卡")