from Config.Config import ES_CONFIG from elasticsearch import Elasticsearch from T2_Txt2Vec import text_to_embedding import datetime import warnings # 初始化ES连接 es = Elasticsearch( hosts=ES_CONFIG["hosts"], basic_auth=ES_CONFIG["basic_auth"], verify_certs=ES_CONFIG["verify_certs"], ssl_show_warn=ES_CONFIG["ssl_show_warn"] ) # 修改create_vector_index和save_to_es函数中使用ES_CONFIG["default_index"] def create_vector_index(index_name="knowledge_base"): """创建带有向量字段的索引""" mapping = { "mappings": { "properties": { "text": {"type": "text", "analyzer": "ik_max_word"}, "vector": { "type": "dense_vector", "dims": 768, # 需与text2vec模型维度一致 "index": True, "similarity": "cosine" }, "timestamp": {"type": "date"} } } } try: if es.indices.exists(index=index_name): es.indices.delete(index=index_name) es.indices.create(index=index_name, body=mapping) print(f"索引 {index_name} 创建成功") except Exception as e: print(f"创建索引失败: {str(e)}") raise def save_to_es(text, index_name="knowledge_base"): """将文本向量化后保存到ES""" vector = text_to_embedding(text) doc = { "text": text, "vector": vector, "timestamp": datetime.datetime.now().isoformat() } try: res = es.index(index=index_name, document=doc) print(f"文档已保存,ID: {res['_id']}") return res except Exception as e: print(f"保存到ES失败: {str(e)}") raise # 使用示例 if __name__ == "__main__": create_vector_index() # 首次运行前执行 save_to_es("如何更换支付宝绑定银行卡")