Files
dsProject/dsLightRag/Volcengine/T2_CreateConection.py
2025-09-07 13:18:19 +08:00

114 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import sys
import time
from volcenginesdkarkruntime import Ark
from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY, VOLC_API_KEY
from Volcengine.Kit.VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME
# 控制日志输出
logger = logging.getLogger('CollectionMemory')
logger.setLevel(logging.INFO)
# 只添加一次处理器,避免重复日志
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
def main():
logger.info("开始创建索引...")
# 初始化记忆库服务
memory_service = VikingDBMemoryService(
ak=VOLC_ACCESSKEY,
sk=VOLC_SECRETKEY,
host="api-knowledgebase.mlp.cn-beijing.volces.com",
region="cn-beijing"
)
# 初始化LLM客户端
llm_client = Ark(
base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key=VOLC_API_KEY
)
try:
collection_name = MEMORY_COLLECTION_NAME
user_id = "system"
assistant_id = "assistant"
# 确保集合存在
logger.info("检查/创建集合...")
memory_service.ensure_collection_exists(collection_name)
# 添加测试数据以触发索引构建
logger.info("添加测试数据...")
test_messages = [
{"role": "user", "content": "你好,我是测试用户"},
{"role": "assistant", "content": "你好,我是测试助手"}
]
test_metadata = {
"default_user_id": user_id,
"default_assistant_id": assistant_id,
"time": int(time.time() * 1000)
}
session_id = f"test_session_{int(time.time())}"
memory_service.add_session(
collection_name=collection_name,
session_id=session_id,
messages=test_messages,
metadata=test_metadata
)
logger.info("测试数据添加成功,等待索引构建...")
max_retries = 30
retry_interval = 60 # 秒
for retry in range(max_retries):
try:
# 尝试搜索以验证索引是否就绪
filter_params = {
"user_id": user_id,
"memory_type": ["sys_event_v1", "sys_profile_v1"]
}
response = memory_service.search_memory(
collection_name=collection_name,
query="测试查询",
filter=filter_params,
limit=1
)
# 如果搜索成功,说明索引已就绪
logger.info(f"索引已就绪,找到 {response.get('data', {}).get('count', 0)} 条记录")
break
except Exception as e:
error_message = str(e)
if "1000023" in error_message: # 与chat.py.backup中的错误码一致
retry_attempt = retry + 1
logger.info(f"记忆索引正在构建中。将在{retry_interval}秒后重试... (尝试次数 {retry_attempt})")
time.sleep(retry_interval)
else:
logger.error(f"搜索时发生错误: {error_message}")
raise
else:
# 如果循环正常结束未break说明超时
logger.error(f"索引构建超时,已尝试 {max_retries}")
sys.exit(1)
logger.info("索引创建和测试完成!")
except Exception as e:
logger.error(f"操作失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()