Files
dsProject/dsLightRag/Volcengine/T2_CreateIndex.py
2025-09-07 09:13:16 +08:00

111 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import logging
import sys
from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY
from VikingDBMemoryService import VikingDBMemoryService, VikingDBMemoryException
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('CollectionMemory')
def main():
logger.info("开始创建并等待集合就绪...")
collection_name = "dsideal_collection"
user_id = "system"
assistant_id = "system"
try:
# 初始化服务
logger.info("初始化记忆库服务...")
memory_service = VikingDBMemoryService(ak=VOLC_ACCESSKEY, sk=VOLC_SECRETKEY)
# 检查集合是否存在,不存在则创建
logger.info(f"检查集合 '{collection_name}' 是否存在...")
try:
memory_service.get_collection(collection_name)
logger.info(f"集合 '{collection_name}' 已存在")
except Exception as e:
error_message = str(e)
if "collection not exist" in error_message:
logger.info(f"集合 '{collection_name}' 不存在,正在创建...")
memory_service.create_collection(
collection_name=collection_name,
description="中文情感陪伴场景测试",
builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"],
builtin_entity_types=["sys_profile_v1"]
)
logger.info(f"集合 '{collection_name}' 创建成功")
else:
logger.error(f"检查集合时出错: {e}")
sys.exit(1)
# 添加测试数据
logger.info("添加测试数据以初始化索引...")
session_id = f"init_session_{int(time.time())}"
test_messages = [{
"role": "user",
"content": "初始化测试消息",
"timestamp": int(time.time() * 1000)
}]
test_metadata = {
"default_user_id": user_id,
"default_assistant_id": assistant_id,
"time": int(time.time() * 1000)
}
memory_service.add_session(
collection_name=collection_name,
session_id=session_id,
messages=test_messages,
metadata=test_metadata
)
# 等待索引就绪
logger.info("开始等待索引构建完成...")
max_retries = 30
retry_interval = 10
retry_count = 0
while retry_count < max_retries:
try:
filter_params = {"user_id": [user_id], "memory_type": ["sys_event_v1"]}
memory_service.search_memory(
collection_name=collection_name,
query="测试",
filter=filter_params,
limit=1
)
logger.info(f"集合 '{collection_name}' 索引构建完成,已就绪")
sys.exit(0)
except VikingDBMemoryException as e:
error_msg = str(e)
# 修复点:正确处理"index not ready"错误,进行重试
if "index not ready" in error_msg or "index not exist" in error_msg or "need to add messages" in error_msg:
retry_count += 1
remaining = max_retries - retry_count
logger.info(f"索引尚未就绪,将重试({retry_count}/{max_retries}),剩余{remaining}次...")
time.sleep(retry_interval)
else:
logger.error(f"检查索引状态时发生错误: {str(e)}")
sys.exit(1)
except Exception as e:
# 修复点捕获所有异常包括非VikingDBMemoryException异常
error_msg = str(e)
if "index not ready" in error_msg or "index not exist" in error_msg or "need to add messages" in error_msg:
retry_count += 1
remaining = max_retries - retry_count
logger.info(f"索引尚未就绪,将重试({retry_count}/{max_retries}),剩余{remaining}次...")
time.sleep(retry_interval)
else:
logger.error(f"检查索引状态时发生未知错误: {str(e)}")
sys.exit(1)
logger.error(f"达到最大重试次数({max_retries}),索引仍未就绪")
sys.exit(1)
except Exception as e:
logger.error(f"系统异常: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()