diff --git a/dsLightRag/Volcengine/T2_CreateIndex.py b/dsLightRag/Volcengine/T2_CreateIndex.py index 78453a35..138b4d55 100644 --- a/dsLightRag/Volcengine/T2_CreateIndex.py +++ b/dsLightRag/Volcengine/T2_CreateIndex.py @@ -1,75 +1,58 @@ import json -import logging import time - +import logging +import sys from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY -from VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME, VikingDBMemoryException +from VikingDBMemoryService import VikingDBMemoryService, VikingDBMemoryException - -# 控制日志输出 +# 配置日志 +logging.basicConfig(level=logging.INFO) logger = logging.getLogger('CollectionMemory') -logger.setLevel(logging.INFO) -# 只添加一次处理器,避免重复日志 -if not logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) - logger.addHandler(handler) - - -def create_memory_collection(collection_name, description="情感陪伴记忆库"): - # 初始化记忆库服务 - memory_service = VikingDBMemoryService( - ak=VOLC_ACCESSKEY, - sk=VOLC_SECRETKEY, - host="api-knowledgebase.mlp.cn-beijing.volces.com", - region="cn-beijing" - ) +def main(): + logger.info("开始创建并等待集合就绪...") + collection_name = "dsideal_collection" + user_id = "system" + assistant_id = "system" try: - # 检查集合是否已存在 - logger.info(f"正在检查集合 '{collection_name}'...") - memory_service.get_collection(collection_name) - logger.info(f"集合 '{collection_name}' 已存在,无需重复创建") - return False + # 初始化服务 + logger.info("初始化记忆库服务...") + memory_service = VikingDBMemoryService(ak=VOLC_ACCESSKEY, sk=VOLC_SECRETKEY) - except Exception as e: - if "collection not exist" not in str(e): - logger.info(f"检查集合时发生错误: {str(e)}") - raise + # 检查集合是否存在,不存在则创建 + logger.info(f"检查集合 '{collection_name}' 是否存在...") + try: + memory_service.get_collection(collection_name) + logger.info(f"集合 '{collection_name}' 已存在") + except Exception as e: + error_message = str(e) + if "collection not exist" in error_message: + logger.info(f"集合 '{collection_name}' 不存在,正在创建...") + memory_service.create_collection( + collection_name=collection_name, + description="中文情感陪伴场景测试", + builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"], + builtin_entity_types=["sys_profile_v1"] + ) + logger.info(f"集合 '{collection_name}' 创建成功") + else: + logger.error(f"检查集合时出错: {e}") + sys.exit(1) - # 创建新集合 - logger.info(f"开始创建集合 '{collection_name}'...") - try: - response = memory_service.create_collection( - collection_name=collection_name, - description=description, - builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"], - builtin_entity_types=["sys_profile_v1"] - ) - logger.info(f"创建响应: {json.dumps(response, ensure_ascii=False, indent=2)}") - logger.info(f"集合 '{collection_name}' 创建成功") - - # 添加测试消息,触发索引构建 - logger.info("添加测试消息以初始化索引...") - # 生成唯一会话ID - session_id = f"test_session_{int(time.time())}" - # 构造测试消息(符合sys_event_v1类型要求) + # 添加测试数据 + logger.info("添加测试数据以初始化索引...") + session_id = f"init_session_{int(time.time())}" test_messages = [{ "role": "user", - "content": "initial test message", - "memory_type": "sys_event_v1", - "user_id": "test_user" + "content": "初始化测试消息", + "timestamp": int(time.time() * 1000) }] - # 构造元数据(包含API要求的必填字段) test_metadata = { - "default_user_id": "test_user", # API必填字段 - "default_assistant_id": "test_assistant", # API必填字段 - "user_id": "test_user", - "session_id": session_id, - "time": int(time.time() * 1000) # 修正字段名并使用毫秒级时间戳 + "default_user_id": user_id, + "default_assistant_id": assistant_id, + "time": int(time.time() * 1000) } - # 使用正确的add_session方法添加会话 memory_service.add_session( collection_name=collection_name, session_id=session_id, @@ -77,51 +60,52 @@ def create_memory_collection(collection_name, description="情感陪伴记忆库 metadata=test_metadata ) - # 等待集合就绪 - 修改为模拟chat.py的索引就绪检查机制 - logger.info("等待集合初始化完成...") - max_retries = 30 # 最多重试30次 - retry_interval = 10 # 每10秒重试一次 + # 等待索引就绪 + logger.info("开始等待索引构建完成...") + max_retries = 30 + retry_interval = 10 retry_count = 0 - - # 增加初始延迟,避免创建后立即检查 - logger.info(f"初始延迟30秒,等待索引构建...") - time.sleep(30) - + while retry_count < max_retries: try: - # 尝试执行需要索引的操作(模拟chat.py中的搜索逻辑) - filter_params = { - "memory_type": ["sys_event_v1"], - "user_id": "test_user" # 添加测试用户ID满足API要求 - } + filter_params = {"user_id": [user_id], "memory_type": ["sys_event_v1"]} memory_service.search_memory( collection_name=collection_name, - query="test", + query="测试", filter=filter_params, limit=1 ) - # 如果没有抛出索引错误,则认为就绪 logger.info(f"集合 '{collection_name}' 索引构建完成,已就绪") - return True + sys.exit(0) except VikingDBMemoryException as e: error_msg = str(e) - # 检查是否是索引未就绪相关错误 - if "index not exist" in error_msg or "need to add messages" in error_msg: + # 修复点:正确处理"index not ready"错误,进行重试 + if "index not ready" in error_msg or "index not exist" in error_msg or "need to add messages" in error_msg: retry_count += 1 - logger.info(f"索引尚未就绪,等待中... (重试 {retry_count}/{max_retries})") + remaining = max_retries - retry_count + logger.info(f"索引尚未就绪,将重试({retry_count}/{max_retries}),剩余{remaining}次...") time.sleep(retry_interval) else: - logger.error(f"检查索引就绪状态时发生错误: {str(e)}") - return False + logger.error(f"检查索引状态时发生错误: {str(e)}") + sys.exit(1) except Exception as e: - logger.error(f"检查过程发生意外错误: {str(e)}") - return False - logger.error(f"集合 '{collection_name}' 索引构建超时") - return False + # 修复点:捕获所有异常,包括非VikingDBMemoryException异常 + error_msg = str(e) + if "index not ready" in error_msg or "index not exist" in error_msg or "need to add messages" in error_msg: + retry_count += 1 + remaining = max_retries - retry_count + logger.info(f"索引尚未就绪,将重试({retry_count}/{max_retries}),剩余{remaining}次...") + time.sleep(retry_interval) + else: + logger.error(f"检查索引状态时发生未知错误: {str(e)}") + sys.exit(1) + + logger.error(f"达到最大重试次数({max_retries}),索引仍未就绪") + sys.exit(1) except Exception as e: - logger.error(f"创建集合失败: {str(e)}") - raise + logger.error(f"系统异常: {str(e)}") + sys.exit(1) if __name__ == "__main__": - create_memory_collection(MEMORY_COLLECTION_NAME) \ No newline at end of file + main() \ No newline at end of file