from elasticsearch import Elasticsearch from Config.Config import ES_CONFIG # 初始化ES连接 es = Elasticsearch( hosts=ES_CONFIG["hosts"], basic_auth=ES_CONFIG["basic_auth"], verify_certs=ES_CONFIG["verify_certs"], ssl_show_warn=ES_CONFIG["ssl_show_warn"] ) def get_vector_mapping(dims=200): """获取向量索引的mapping结构""" return { "properties": { "content": { "type": "text", "analyzer": "ik_smart", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 8192 } } }, "vector": { "type": "dense_vector", "dims": dims, "index": True, "similarity": "cosine" }, "timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis" } } } def get_text_mapping(): """获取文本索引的mapping结构""" return { "properties": { "raw_text": { "type": "text", "analyzer": "ik_smart", "fielddata": True }, "timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis" } } } def manage_index(action, index_type="vector", index_name=None, dims=200): """管理Elasticsearch索引""" if index_name is None: index_name = "knowledge_base" if index_type == "vector" else "raw_texts" if action == "create": mapping = get_vector_mapping(dims) if index_type == "vector" else get_text_mapping() try: if es.indices.exists(index=index_name): print(f"索引 {index_name} 已存在") return False es.indices.create(index=index_name, body={"mappings": mapping}) print(f"索引 {index_name} 创建成功(使用ik_smart分词器)") return True except Exception as e: print(f"创建索引失败: {str(e)}") raise elif action == "delete": try: if not es.indices.exists(index=index_name): print(f"索引 {index_name} 不存在") return False es.indices.delete(index=index_name) print(f"索引 {index_name} 删除成功") return True except Exception as e: print(f"删除索引失败: {str(e)}") raise else: raise ValueError("action参数必须是'create'或'delete'") # 使用示例 if __name__ == "__main__": # 先删除现有索引(如果存在) manage_index("delete", "vector") manage_index("delete", "text") # 创建新的向量索引 manage_index("create", "vector", dims=200) # 创建新的原始文本索引 manage_index("create", "text")