Files
dsProject/dsLightRag/Volcengine/T2_CreateIndex.py

127 lines
5.1 KiB
Python
Raw Normal View History

2025-09-07 08:00:15 +08:00
import json
2025-09-07 08:04:43 +08:00
import logging
2025-09-07 08:56:35 +08:00
import time
2025-09-07 08:00:15 +08:00
from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY
2025-09-07 08:56:35 +08:00
from VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME, VikingDBMemoryException
2025-09-07 08:04:43 +08:00
# 控制日志输出
logger = logging.getLogger('CollectionMemory')
logger.setLevel(logging.INFO)
2025-09-07 08:56:35 +08:00
# 只添加一次处理器,避免重复日志
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
2025-09-07 08:00:15 +08:00
def create_memory_collection(collection_name, description="情感陪伴记忆库"):
# 初始化记忆库服务
memory_service = VikingDBMemoryService(
ak=VOLC_ACCESSKEY,
sk=VOLC_SECRETKEY,
host="api-knowledgebase.mlp.cn-beijing.volces.com",
region="cn-beijing"
)
try:
# 检查集合是否已存在
2025-09-07 08:04:43 +08:00
logger.info(f"正在检查集合 '{collection_name}'...")
2025-09-07 08:00:15 +08:00
memory_service.get_collection(collection_name)
2025-09-07 08:04:43 +08:00
logger.info(f"集合 '{collection_name}' 已存在,无需重复创建")
2025-09-07 08:00:15 +08:00
return False
except Exception as e:
if "collection not exist" not in str(e):
2025-09-07 08:04:43 +08:00
logger.info(f"检查集合时发生错误: {str(e)}")
2025-09-07 08:00:15 +08:00
raise
# 创建新集合
2025-09-07 08:04:43 +08:00
logger.info(f"开始创建集合 '{collection_name}'...")
2025-09-07 08:00:15 +08:00
try:
response = memory_service.create_collection(
collection_name=collection_name,
description=description,
builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"],
builtin_entity_types=["sys_profile_v1"]
)
2025-09-07 08:04:43 +08:00
logger.info(f"创建响应: {json.dumps(response, ensure_ascii=False, indent=2)}")
logger.info(f"集合 '{collection_name}' 创建成功")
2025-09-07 08:00:15 +08:00
2025-09-07 09:03:05 +08:00
# 添加测试消息,触发索引构建
logger.info("添加测试消息以初始化索引...")
# 生成唯一会话ID
session_id = f"test_session_{int(time.time())}"
# 构造测试消息符合sys_event_v1类型要求
test_messages = [{
"role": "user",
"content": "initial test message",
"memory_type": "sys_event_v1",
"user_id": "test_user"
}]
# 构造元数据包含API要求的必填字段
test_metadata = {
"default_user_id": "test_user", # API必填字段
"default_assistant_id": "test_assistant", # API必填字段
"user_id": "test_user",
"session_id": session_id,
"time": int(time.time() * 1000) # 修正字段名并使用毫秒级时间戳
}
# 使用正确的add_session方法添加会话
memory_service.add_session(
collection_name=collection_name,
session_id=session_id,
messages=test_messages,
metadata=test_metadata
)
2025-09-07 08:56:35 +08:00
# 等待集合就绪 - 修改为模拟chat.py的索引就绪检查机制
2025-09-07 08:04:43 +08:00
logger.info("等待集合初始化完成...")
2025-09-07 08:56:35 +08:00
max_retries = 30 # 最多重试30次
retry_interval = 10 # 每10秒重试一次
retry_count = 0
# 增加初始延迟,避免创建后立即检查
logger.info(f"初始延迟30秒等待索引构建...")
time.sleep(30)
while retry_count < max_retries:
try:
# 尝试执行需要索引的操作模拟chat.py中的搜索逻辑
2025-09-07 08:58:11 +08:00
filter_params = {
"memory_type": ["sys_event_v1"],
"user_id": "test_user" # 添加测试用户ID满足API要求
}
2025-09-07 08:56:35 +08:00
memory_service.search_memory(
collection_name=collection_name,
query="test",
filter=filter_params,
limit=1
)
# 如果没有抛出索引错误,则认为就绪
logger.info(f"集合 '{collection_name}' 索引构建完成,已就绪")
return True
except VikingDBMemoryException as e:
error_msg = str(e)
# 检查是否是索引未就绪相关错误
if "index not exist" in error_msg or "need to add messages" in error_msg:
retry_count += 1
logger.info(f"索引尚未就绪,等待中... (重试 {retry_count}/{max_retries})")
time.sleep(retry_interval)
else:
logger.error(f"检查索引就绪状态时发生错误: {str(e)}")
return False
except Exception as e:
logger.error(f"检查过程发生意外错误: {str(e)}")
return False
logger.error(f"集合 '{collection_name}' 索引构建超时")
return False
2025-09-07 08:00:15 +08:00
except Exception as e:
2025-09-07 08:04:43 +08:00
logger.error(f"创建集合失败: {str(e)}")
2025-09-07 08:00:15 +08:00
raise
if __name__ == "__main__":
2025-09-07 08:02:23 +08:00
create_memory_collection(MEMORY_COLLECTION_NAME)