diff --git a/dsLightRag/Volcengine/T2_CreateIndex.py b/dsLightRag/Volcengine/T2_CreateIndex.py index 9dae830f..4148e9c4 100644 --- a/dsLightRag/Volcengine/T2_CreateIndex.py +++ b/dsLightRag/Volcengine/T2_CreateIndex.py @@ -3,7 +3,6 @@ import logging from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY from VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME -from Volcengine.chat import wait_for_collection_ready # 控制日志输出 logger = logging.getLogger('CollectionMemory') logger.setLevel(logging.INFO) @@ -47,7 +46,8 @@ def create_memory_collection(collection_name, description="情感陪伴记忆库 # 等待集合就绪 logger.info("等待集合初始化完成...") - if wait_for_collection_ready(memory_service, collection_name): + # 将独立函数调用改为实例方法调用 + if memory_service.wait_for_collection_ready(): logger.info(f"集合 '{collection_name}' 已就绪,可以开始使用") return True else: diff --git a/dsLightRag/Volcengine/VikingDBMemoryService.py b/dsLightRag/Volcengine/VikingDBMemoryService.py index 3e38b461..9dd09ca1 100644 --- a/dsLightRag/Volcengine/VikingDBMemoryService.py +++ b/dsLightRag/Volcengine/VikingDBMemoryService.py @@ -3,6 +3,7 @@ pip install volcengine pip install --upgrade "volcengine-python-sdk[ark]" """ import json +import logging import os import threading import time @@ -17,6 +18,13 @@ from volcenginesdkarkruntime import Ark from Config.Config import VOLC_SECRETKEY, VOLC_ACCESSKEY, VOLC_API_KEY +# 配置日志 +logger = logging.getLogger('CollectionMemory') +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +logger.addHandler(handler) + # 记忆体集合名称 MEMORY_COLLECTION_NAME="dsideal_collection" @@ -294,4 +302,162 @@ class VikingDBMemoryService(Service): if entities is not None: params["entities"] = entities res = self.json("AddSession", {}, json.dumps(params)) - return json.loads(res) \ No newline at end of file + return json.loads(res) + + def handle_conversation_turn(self, llm_client, user_id, user_message, conversation_history): + """处理一轮对话,包括记忆搜索和LLM响应。""" + logger.info("\n" + "=" * 60) + logger.info(f"用户: {user_message}") + + relevant_memories = self.search_relevant_memories(user_id, user_message) + + system_prompt = "你是一个富有同情心、善于倾听的AI伙伴,拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。" + if relevant_memories: + memory_context = "\n".join( + [f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories]) + system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。" + + logger.info("AI正在思考...") + + try: + messages = [{"role": "system", "content": system_prompt}] + conversation_history + [ + {"role": "user", "content": user_message}] + completion = llm_client.chat.completions.create( + model="doubao-seed-1-6-flash-250715", + messages=messages + ) + assistant_reply = completion.choices[0].message.content + except Exception as e: + logger.info(f"LLM调用失败: {e}") + assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?" + + logger.info(f"伙伴: {assistant_reply}") + + conversation_history.extend([ + {"role": "user", "content": user_message}, + {"role": "assistant", "content": assistant_reply} + ]) + return assistant_reply + + def archive_conversation(self, user_id, assistant_id, conversation_history, topic_name): + """将对话历史归档到记忆数据库。""" + if not conversation_history: + logger.info("没有对话可以归档。") + return False + + logger.info(f"\n正在归档关于 '{topic_name}' 的对话...") + session_id = f"{topic_name}_{int(time.time())}" + metadata = { + "default_user_id": user_id, + "default_assistant_id": assistant_id, + "time": int(time.time() * 1000) + } + + try: + self.add_session( + collection_name=MEMORY_COLLECTION_NAME, + session_id=session_id, + messages=conversation_history, + metadata=metadata + ) + logger.info(f"对话已成功归档,会话ID: {session_id}") + logger.info("正在等待记忆索引更新...") + return True + except Exception as e: + logger.info(f"归档对话失败: {e}") + return False + + def wait_for_collection_ready(self, timeout=300, interval=10): + """ + 等待集合准备就绪 + :param timeout: 超时时间(秒) + :param interval: 检查间隔(秒) + :return: True if ready, False if timeout + """ + start_time = time.time() + while time.time() - start_time < timeout: + try: + # 使用类中定义的集合名称常量 + collection_info = self.get_collection(MEMORY_COLLECTION_NAME) + status = collection_info.get("Status", "UNKNOWN") + logger.info(f"集合 '{MEMORY_COLLECTION_NAME}' 当前状态: {status}") + if status == "READY": + return True + time.sleep(interval) + except Exception as e: + logger.info(f"检查集合状态失败: {e}") + time.sleep(interval) + logger.info(f"集合 '{MEMORY_COLLECTION_NAME}' 在{timeout}秒内未就绪") + return False + + def setup_memory_collection(self): + """独立封装记忆体创建逻辑,返回memory_service供测试使用""" + try: + ensure_collection_exists(self, MEMORY_COLLECTION_NAME) + logger.info(f"记忆体 '{MEMORY_COLLECTION_NAME}' 创建/验证成功") + + # 添加集合就绪等待 + logger.info("等待集合准备就绪...") + if self.wait_for_collection_ready(): + logger.info(f"集合 '{MEMORY_COLLECTION_NAME}' 已就绪") + return self + else: + logger.info(f"集合 '{MEMORY_COLLECTION_NAME}' 未能就绪") + return None + except Exception as e: + logger.info(f"记忆体创建失败: {e}") + return None + + def run_end_to_end_test(self): + """端到端记忆测试的主函数""" + logger.info("开始端到端记忆测试...") + + try: + # 调用封装的记忆体创建函数 + memory_service = self.setup_memory_collection() + if not memory_service: + return + llm_client = Ark( + base_url="https://ark.cn-beijing.volces.com/api/v3", + api_key=VOLC_API_KEY + ) + user_id = "xiaoming" # 用户ID:小明 + assistant_id = "assistant1" # 助手ID:助手1 + except Exception as e: + logger.info(f"初始化失败: {e}") + return + + logger.info("\n--- 阶段 1: 初始对话 ---") + initial_conversation_history = [] + self.handle_conversation_turn( + llm_client, user_id, + "你好,我是小明,今年18岁,但压力好大。", + initial_conversation_history + ) + self.handle_conversation_turn( + llm_client, user_id, + "马上就要高考了,家里人的期待好高。", + initial_conversation_history + ) + + logger.info("\n--- 阶段 2: 归档记忆 ---") + self.archive_conversation( + user_id, assistant_id, + initial_conversation_history, "study_stress_discussion" + ) + + logger.info("\n--- 阶段 3: 验证记忆 ---") + verification_conversation_history = [] + self.handle_conversation_turn( + llm_client, user_id, + "我最近很焦虑,不知道该怎么办。", + verification_conversation_history + ) + + logger.info("\n端到端记忆测试完成!") + +if __name__ == "__main__": + # 初始化服务 + memory_service, _ = initialize_services() + # 运行端到端测试 + memory_service.run_end_to_end_test() \ No newline at end of file diff --git a/dsLightRag/Volcengine/__pycache__/VikingDBMemoryService.cpython-310.pyc b/dsLightRag/Volcengine/__pycache__/VikingDBMemoryService.cpython-310.pyc index 2ed7c6fe..1fb44e15 100644 Binary files a/dsLightRag/Volcengine/__pycache__/VikingDBMemoryService.cpython-310.pyc and b/dsLightRag/Volcengine/__pycache__/VikingDBMemoryService.cpython-310.pyc differ diff --git a/dsLightRag/Volcengine/__pycache__/chat.cpython-310.pyc b/dsLightRag/Volcengine/__pycache__/chat.cpython-310.pyc deleted file mode 100644 index c55090d3..00000000 Binary files a/dsLightRag/Volcengine/__pycache__/chat.cpython-310.pyc and /dev/null differ diff --git a/dsLightRag/Volcengine/chat.py b/dsLightRag/Volcengine/chat.py deleted file mode 100644 index a750856f..00000000 --- a/dsLightRag/Volcengine/chat.py +++ /dev/null @@ -1,186 +0,0 @@ -import logging -import os -import time - -from volcenginesdkarkruntime import Ark - -from Config.Config import VOLC_API_KEY - -""" -在记忆库准备好后,我们先模拟一段包含两轮的完整对话。 -对话结束后,把这段对话历史消息写入记忆库。然后再开启一个新话题,提出和刚才相关的问题, -AI 就能用刚写入的记忆来回答。 -注意:首次写入需要 3–5 分钟建立索引,这段时间内检索会报错。 -""" -import json -import time -from VikingDBMemoryService import initialize_services, ensure_collection_exists, search_relevant_memories -# 控制日志输出 -logger = logging.getLogger('CollectionMemory') -logger.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) -logger.addHandler(handler) -def handle_conversation_turn(memory_service, llm_client, collection_name, user_id, user_message, conversation_history): - """处理一轮对话,包括记忆搜索和LLM响应。""" - logger.info("\n" + "=" * 60) - logger.info(f"用户: {user_message}") - - relevant_memories = search_relevant_memories(memory_service, collection_name, user_id, user_message) - - system_prompt = "你是一个富有同情心、善于倾听的AI伙伴,拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。" - if relevant_memories: - memory_context = "\n".join( - [f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories]) - system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。" - - logger.info("AI正在思考...") - - try: - messages = [{"role": "system", "content": system_prompt}] + conversation_history + [ - {"role": "user", "content": user_message}] - completion = llm_client.chat.completions.create( - model="doubao-seed-1-6-flash-250715", - messages=messages - ) - assistant_reply = completion.choices[0].message.content - except Exception as e: - logger.info(f"LLM调用失败: {e}") - assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?" - - logger.info(f"伙伴: {assistant_reply}") - - conversation_history.extend([ - {"role": "user", "content": user_message}, - {"role": "assistant", "content": assistant_reply} - ]) - return assistant_reply - -def archive_conversation(memory_service, collection_name, user_id, assistant_id, conversation_history, topic_name): - """将对话历史归档到记忆数据库。""" - if not conversation_history: - logger.info("没有对话可以归档。") - return False - - logger.info(f"\n正在归档关于 '{topic_name}' 的对话...") - session_id = f"{topic_name}_{int(time.time())}" - metadata = { - "default_user_id": user_id, - "default_assistant_id": assistant_id, - "time": int(time.time() * 1000) - } - - try: - memory_service.add_session( - collection_name=collection_name, - session_id=session_id, - messages=conversation_history, - metadata=metadata - ) - logger.info(f"对话已成功归档,会话ID: {session_id}") - logger.info("正在等待记忆索引更新...") - return True - except Exception as e: - logger.info(f"归档对话失败: {e}") - return False - -def setup_memory_collection(collection_name="emotional_support"): - """独立封装记忆体创建逻辑,返回memory_service供测试使用""" - try: - memory_service, _ = initialize_services() - ensure_collection_exists(memory_service, collection_name) - logger.info(f"记忆体 '{collection_name}' 创建/验证成功") - - # 添加集合就绪等待 - logger.info("等待集合准备就绪...") - if wait_for_collection_ready(memory_service, collection_name): - logger.info(f"集合 '{collection_name}' 已就绪") - return memory_service - else: - logger.info(f"集合 '{collection_name}' 未能就绪") - return None - except Exception as e: - logger.info(f"记忆体创建失败: {e}") - return None - -def wait_for_collection_ready(memory_service, collection_name, timeout=300, interval=10): - """ - 等待集合准备就绪 - :param memory_service: 记忆库服务实例 - :param collection_name: 集合名称 - :param timeout: 超时时间(秒) - :param interval: 检查间隔(秒) - :return: True if ready, False if timeout - """ - start_time = time.time() - while time.time() - start_time < timeout: - try: - collection_info = memory_service.get_collection(collection_name) - # 根据Volcengine API文档,状态可能在Status字段中,值可能为"READY"、"CREATING"等 - status = collection_info.get("Status", "UNKNOWN") - logger.info(f"集合 '{collection_name}' 当前状态: {status}") - if status == "READY": - return True - time.sleep(interval) - except Exception as e: - logger.info(f"检查集合状态失败: {e}") - time.sleep(interval) - logger.info(f"集合 '{collection_name}' 在{timeout}秒内未就绪") - return False - -def main(): - logger.info("开始端到端记忆测试...") - collection_name="emotional_support" - - try: - # 调用封装的记忆体创建函数 - memory_service = setup_memory_collection() - if not memory_service: - return - llm_client = Ark( - base_url="https://ark.cn-beijing.volces.com/api/v3", - api_key=VOLC_API_KEY - ) - user_id = "xiaoming" # 用户ID:小明 - assistant_id = "assistant1" # 助手ID:助手1 - except Exception as e: - logger.info(f"初始化失败: {e}") - return - - logger.info("\n--- 阶段 1: 初始对话 ---") - initial_conversation_history = [] - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "你好,我是小明,今年18岁,但压力好大。", - initial_conversation_history - ) - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "马上就要高考了,家里人的期待好高。", - initial_conversation_history - ) - - logger.info("\n--- 阶段 2: 归档记忆 ---") - archive_conversation( - memory_service, collection_name, user_id, assistant_id, - initial_conversation_history, "study_stress_discussion" - ) - - logger.info("\n--- 阶段 3: 验证记忆 ---") - verification_conversation_history = [] - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "我最近很焦虑,不知道该怎么办。", - verification_conversation_history - ) - - logger.info("\n端到端记忆测试完成!") - -if __name__ == "__main__": - setup_memory_collection() -# main() -""" -memory_service = setup_memory_collection() -if memory_service: - is_ready = wait_for_collection_ready(memory_service, "emotional_support") -""" \ No newline at end of file