Files
dsProject/dsLightRag/Volcengine/chat.py
2025-09-07 08:08:16 +08:00

186 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import time
from volcenginesdkarkruntime import Ark
from Config.Config import VOLC_API_KEY
"""
在记忆库准备好后,我们先模拟一段包含两轮的完整对话。
对话结束后,把这段对话历史消息写入记忆库。然后再开启一个新话题,提出和刚才相关的问题,
AI 就能用刚写入的记忆来回答。
注意:首次写入需要 35 分钟建立索引,这段时间内检索会报错。
"""
import json
import time
from VikingDBMemoryService import initialize_services, ensure_collection_exists, search_relevant_memories
# 控制日志输出
logger = logging.getLogger('CollectionMemory')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
def handle_conversation_turn(memory_service, llm_client, collection_name, user_id, user_message, conversation_history):
"""处理一轮对话包括记忆搜索和LLM响应。"""
logger.info("\n" + "=" * 60)
logger.info(f"用户: {user_message}")
relevant_memories = search_relevant_memories(memory_service, collection_name, user_id, user_message)
system_prompt = "你是一个富有同情心、善于倾听的AI伙伴拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。"
if relevant_memories:
memory_context = "\n".join(
[f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories])
system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。"
logger.info("AI正在思考...")
try:
messages = [{"role": "system", "content": system_prompt}] + conversation_history + [
{"role": "user", "content": user_message}]
completion = llm_client.chat.completions.create(
model="doubao-seed-1-6-flash-250715",
messages=messages
)
assistant_reply = completion.choices[0].message.content
except Exception as e:
logger.info(f"LLM调用失败: {e}")
assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?"
logger.info(f"伙伴: {assistant_reply}")
conversation_history.extend([
{"role": "user", "content": user_message},
{"role": "assistant", "content": assistant_reply}
])
return assistant_reply
def archive_conversation(memory_service, collection_name, user_id, assistant_id, conversation_history, topic_name):
"""将对话历史归档到记忆数据库。"""
if not conversation_history:
logger.info("没有对话可以归档。")
return False
logger.info(f"\n正在归档关于 '{topic_name}' 的对话...")
session_id = f"{topic_name}_{int(time.time())}"
metadata = {
"default_user_id": user_id,
"default_assistant_id": assistant_id,
"time": int(time.time() * 1000)
}
try:
memory_service.add_session(
collection_name=collection_name,
session_id=session_id,
messages=conversation_history,
metadata=metadata
)
logger.info(f"对话已成功归档会话ID: {session_id}")
logger.info("正在等待记忆索引更新...")
return True
except Exception as e:
logger.info(f"归档对话失败: {e}")
return False
def setup_memory_collection(collection_name="emotional_support"):
"""独立封装记忆体创建逻辑返回memory_service供测试使用"""
try:
memory_service, _ = initialize_services()
ensure_collection_exists(memory_service, collection_name)
logger.info(f"记忆体 '{collection_name}' 创建/验证成功")
# 添加集合就绪等待
logger.info("等待集合准备就绪...")
if wait_for_collection_ready(memory_service, collection_name):
logger.info(f"集合 '{collection_name}' 已就绪")
return memory_service
else:
logger.info(f"集合 '{collection_name}' 未能就绪")
return None
except Exception as e:
logger.info(f"记忆体创建失败: {e}")
return None
def wait_for_collection_ready(memory_service, collection_name, timeout=300, interval=10):
"""
等待集合准备就绪
:param memory_service: 记忆库服务实例
:param collection_name: 集合名称
:param timeout: 超时时间(秒)
:param interval: 检查间隔(秒)
:return: True if ready, False if timeout
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
collection_info = memory_service.get_collection(collection_name)
# 根据Volcengine API文档状态可能在Status字段中值可能为"READY"、"CREATING"等
status = collection_info.get("Status", "UNKNOWN")
logger.info(f"集合 '{collection_name}' 当前状态: {status}")
if status == "READY":
return True
time.sleep(interval)
except Exception as e:
logger.info(f"检查集合状态失败: {e}")
time.sleep(interval)
logger.info(f"集合 '{collection_name}'{timeout}秒内未就绪")
return False
def main():
logger.info("开始端到端记忆测试...")
collection_name="emotional_support"
try:
# 调用封装的记忆体创建函数
memory_service = setup_memory_collection()
if not memory_service:
return
llm_client = Ark(
base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key=VOLC_API_KEY
)
user_id = "xiaoming" # 用户ID:小明
assistant_id = "assistant1" # 助手ID:助手1
except Exception as e:
logger.info(f"初始化失败: {e}")
return
logger.info("\n--- 阶段 1: 初始对话 ---")
initial_conversation_history = []
handle_conversation_turn(
memory_service, llm_client, collection_name, user_id,
"你好我是小明今年18岁但压力好大。",
initial_conversation_history
)
handle_conversation_turn(
memory_service, llm_client, collection_name, user_id,
"马上就要高考了,家里人的期待好高。",
initial_conversation_history
)
logger.info("\n--- 阶段 2: 归档记忆 ---")
archive_conversation(
memory_service, collection_name, user_id, assistant_id,
initial_conversation_history, "study_stress_discussion"
)
logger.info("\n--- 阶段 3: 验证记忆 ---")
verification_conversation_history = []
handle_conversation_turn(
memory_service, llm_client, collection_name, user_id,
"我最近很焦虑,不知道该怎么办。",
verification_conversation_history
)
logger.info("\n端到端记忆测试完成!")
if __name__ == "__main__":
setup_memory_collection()
# main()
"""
memory_service = setup_memory_collection()
if memory_service:
is_ready = wait_for_collection_ready(memory_service, "emotional_support")
"""