Files
dsProject/dsLightRag/Volcengine/T2_CreateIndex.py
2025-09-07 08:00:15 +08:00

57 lines
2.0 KiB
Python

import json
from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY
from VikingDBMemoryService import VikingDBMemoryService
from Volcengine.chat import wait_for_collection_ready
def create_memory_collection(collection_name, description="情感陪伴记忆库"):
# 初始化记忆库服务
memory_service = VikingDBMemoryService(
ak=VOLC_ACCESSKEY,
sk=VOLC_SECRETKEY,
host="api-knowledgebase.mlp.cn-beijing.volces.com",
region="cn-beijing"
)
try:
# 检查集合是否已存在
print(f"正在检查集合 '{collection_name}'...")
memory_service.get_collection(collection_name)
print(f"集合 '{collection_name}' 已存在,无需重复创建")
return False
except Exception as e:
if "collection not exist" not in str(e):
print(f"检查集合时发生错误: {str(e)}")
raise
# 创建新集合
print(f"开始创建集合 '{collection_name}'...")
try:
response = memory_service.create_collection(
collection_name=collection_name,
description=description,
builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"],
builtin_entity_types=["sys_profile_v1"]
)
print(f"创建响应: {json.dumps(response, ensure_ascii=False, indent=2)}")
print(f"集合 '{collection_name}' 创建成功")
# 等待集合就绪
print("等待集合初始化完成...")
if wait_for_collection_ready(memory_service, collection_name):
print(f"集合 '{collection_name}' 已就绪,可以开始使用")
return True
else:
print(f"集合 '{collection_name}' 初始化超时")
return False
except Exception as e:
print(f"创建集合失败: {str(e)}")
raise
if __name__ == "__main__":
# 可修改为目标集合名称
TARGET_COLLECTION = "emotional_support"
create_memory_collection(TARGET_COLLECTION)