import logging import sys import os import time import json from volcenginesdkarkruntime import Ark from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY, VOLC_API_KEY from VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME # 控制日志输出 logger = logging.getLogger('CollectionMemory') logger.setLevel(logging.INFO) # 只添加一次处理器,避免重复日志 if not logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logger.addHandler(handler) def main(): logger.info("开始创建索引...") # 初始化记忆库服务 memory_service = VikingDBMemoryService( ak=VOLC_ACCESSKEY, sk=VOLC_SECRETKEY, host="api-knowledgebase.mlp.cn-beijing.volces.com", region="cn-beijing" ) # 初始化LLM客户端 llm_client = Ark( base_url="https://ark.cn-beijing.volces.com/api/v3", api_key=VOLC_API_KEY ) try: collection_name = MEMORY_COLLECTION_NAME user_id = "system" assistant_id = "assistant" # 确保集合存在 logger.info("检查/创建集合...") memory_service.ensure_collection_exists(collection_name) # 添加测试数据以触发索引构建 logger.info("添加测试数据...") test_messages = [ {"role": "user", "content": "你好,我是测试用户"}, {"role": "assistant", "content": "你好,我是测试助手"} ] test_metadata = { "default_user_id": user_id, "default_assistant_id": assistant_id, "time": int(time.time() * 1000) } session_id = f"test_session_{int(time.time())}" memory_service.add_session( collection_name=collection_name, session_id=session_id, messages=test_messages, metadata=test_metadata ) logger.info("测试数据添加成功,等待索引构建...") # 使用与chat.py.backup相同的等待索引就绪的逻辑 max_retries = 30 retry_interval = 60 # 秒,与chat.py.backup一致 for retry in range(max_retries): try: # 尝试搜索以验证索引是否就绪 filter_params = { "user_id": user_id, "memory_type": ["sys_event_v1", "sys_profile_v1"] } response = memory_service.search_memory( collection_name=collection_name, query="测试查询", filter=filter_params, limit=1 ) # 如果搜索成功,说明索引已就绪 logger.info(f"索引已就绪,找到 {response.get('data', {}).get('count', 0)} 条记录") break except Exception as e: error_message = str(e) if "1000023" in error_message: # 与chat.py.backup中的错误码一致 retry_attempt = retry + 1 logger.info(f"记忆索引正在构建中。将在{retry_interval}秒后重试... (尝试次数 {retry_attempt})") time.sleep(retry_interval) else: logger.error(f"搜索时发生错误: {error_message}") raise else: # 如果循环正常结束(未break),说明超时 logger.error(f"索引构建超时,已尝试 {max_retries} 次") sys.exit(1) logger.info("索引创建和测试完成!") except Exception as e: logger.error(f"操作失败: {e}") sys.exit(1) if __name__ == "__main__": main()