diff --git a/dsLightRag/Volcengine/Kit/VikingDBMemoryService.py b/dsLightRag/Volcengine/Kit/VikingDBMemoryService.py index 5db9e779..facbac66 100644 --- a/dsLightRag/Volcengine/Kit/VikingDBMemoryService.py +++ b/dsLightRag/Volcengine/Kit/VikingDBMemoryService.py @@ -415,4 +415,140 @@ class VikingDBMemoryService(Service): return None except Exception as e: logger.info(f"记忆体创建失败: {e}") - return None \ No newline at end of file + return None + + + +def initialize_services(): + """初始化服务和LLM客户端""" + ak = VOLC_ACCESSKEY + sk = VOLC_SECRETKEY + ark_api_key = VOLC_API_KEY + + if not all([ak, sk, ark_api_key]): + raise ValueError("必须在环境变量中设置 VOLC_ACCESSKEY, VOLC_SECRETKEY, 和 ARK_API_KEY。") + + memory_service = VikingDBMemoryService( + ak=ak, + sk=sk, + host="api-knowledgebase.mlp.cn-beijing.volces.com", + region="cn-beijing" + ) + llm_client = Ark( + base_url="https://ark.cn-beijing.volces.com/api/v3", + api_key=ark_api_key, + ) + return memory_service, llm_client + + +def search_relevant_memories(memory_service, collection_name, user_id, query): + """搜索与用户查询相关的记忆,并在索引构建中时重试。""" + logger.info(f"正在搜索与 '{query}' 相关的记忆...") + retry_attempt = 0 + while True: + try: + filter_params = { + "user_id": [user_id], + "memory_type": ["sys_event_v1", "sys_profile_v1"] + } + response = memory_service.search_memory( + collection_name=collection_name, + query=query, + filter=filter_params, + limit=3 + ) + + memories = [] + if response.get('data', {}).get('count', 0) > 0: + for result in response['data']['result_list']: + if 'memory_info' in result and result['memory_info']: + memories.append({ + 'memory_info': result['memory_info'], + 'score': result['score'] + }) + + if memories: + if retry_attempt > 0: + logger.info("重试后搜索成功。") + logger.info(f"找到 {len(memories)} 条相关记忆:") + for i, memory in enumerate(memories, 1): + logger.info( + f" {i}. (相关度: {memory['score']:.3f}): {json.dumps(memory['memory_info'], ensure_ascii=False, indent=2)}") + else: + logger.info("未找到相关记忆。") + return memories + + except Exception as e: + error_message = str(e) + if "1000023" in error_message: + retry_attempt += 1 + logger.info(f"记忆索引正在构建中。将在60秒后重试... (尝试次数 {retry_attempt})") + time.sleep(60) + else: + logger.info(f"搜索记忆时出错 (不可重试): {e}") + return [] + + +def handle_conversation_turn(memory_service, llm_client, collection_name, user_id, user_message, conversation_history): + """处理一轮对话,包括记忆搜索和LLM响应。""" + logger.info("\n" + "=" * 60) + logger.info(f"用户: {user_message}") + + relevant_memories = search_relevant_memories(memory_service, collection_name, user_id, user_message) + + system_prompt = "你是一个富有同情心、善于倾听的AI伙伴,拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。" + if relevant_memories: + memory_context = "\n".join( + [f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories]) + system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。" + + logger.info("AI正在思考...") + + try: + messages = [{"role": "system", "content": system_prompt}] + conversation_history + [ + {"role": "user", "content": user_message}] + completion = llm_client.chat.completions.create( + model="doubao-seed-1-6-flash-250715", + messages=messages + ) + assistant_reply = completion.choices[0].message.content + except Exception as e: + logger.info(f"LLM调用失败: {e}") + assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?" + + logger.info(f"伙伴: {assistant_reply}") + + conversation_history.extend([ + {"role": "user", "content": user_message}, + {"role": "assistant", "content": assistant_reply} + ]) + return assistant_reply + + +def archive_conversation(memory_service, collection_name, user_id, assistant_id, conversation_history, topic_name): + """将对话历史归档到记忆数据库。""" + if not conversation_history: + logger.info("没有对话可以归档。") + return False + + logger.info(f"\n正在归档关于 '{topic_name}' 的对话...") + session_id = f"{topic_name}_{int(time.time())}" + metadata = { + "default_user_id": user_id, + "default_assistant_id": assistant_id, + "time": int(time.time() * 1000) + } + + try: + memory_service.add_session( + collection_name=collection_name, + session_id=session_id, + messages=conversation_history, + metadata=metadata + ) + logger.info(f"对话已成功归档,会话ID: {session_id}") + logger.info("正在等待记忆索引更新...") + return True + except Exception as e: + logger.info(f"归档对话失败: {e}") + return False \ No newline at end of file diff --git a/dsLightRag/Volcengine/T3_ChatWithMemory.py b/dsLightRag/Volcengine/T3_ChatWithMemory.py index f3be793a..6df47877 100644 --- a/dsLightRag/Volcengine/T3_ChatWithMemory.py +++ b/dsLightRag/Volcengine/T3_ChatWithMemory.py @@ -1,11 +1,9 @@ import logging import sys import time -import json -from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY, VOLC_API_KEY -from Volcengine.Kit.VikingDBMemoryService import VikingDBMemoryService, MEMORY_COLLECTION_NAME -from volcenginesdkarkruntime import Ark +from Volcengine.Kit.VikingDBMemoryService import MEMORY_COLLECTION_NAME, initialize_services, \ + handle_conversation_turn, archive_conversation # 控制日志输出 logger = logging.getLogger('ChatWithMemory') @@ -18,140 +16,6 @@ if not logger.handlers: logger.addHandler(handler) -def initialize_services(): - """初始化服务和LLM客户端""" - ak = VOLC_ACCESSKEY - sk = VOLC_SECRETKEY - ark_api_key = VOLC_API_KEY - - if not all([ak, sk, ark_api_key]): - raise ValueError("必须在环境变量中设置 VOLC_ACCESSKEY, VOLC_SECRETKEY, 和 ARK_API_KEY。") - - memory_service = VikingDBMemoryService( - ak=ak, - sk=sk, - host="api-knowledgebase.mlp.cn-beijing.volces.com", - region="cn-beijing" - ) - llm_client = Ark( - base_url="https://ark.cn-beijing.volces.com/api/v3", - api_key=ark_api_key, - ) - return memory_service, llm_client - - -def search_relevant_memories(memory_service, collection_name, user_id, query): - """搜索与用户查询相关的记忆,并在索引构建中时重试。""" - logger.info(f"正在搜索与 '{query}' 相关的记忆...") - retry_attempt = 0 - while True: - try: - filter_params = { - "user_id": [user_id], - "memory_type": ["sys_event_v1", "sys_profile_v1"] - } - response = memory_service.search_memory( - collection_name=collection_name, - query=query, - filter=filter_params, - limit=3 - ) - - memories = [] - if response.get('data', {}).get('count', 0) > 0: - for result in response['data']['result_list']: - if 'memory_info' in result and result['memory_info']: - memories.append({ - 'memory_info': result['memory_info'], - 'score': result['score'] - }) - - if memories: - if retry_attempt > 0: - logger.info("重试后搜索成功。") - logger.info(f"找到 {len(memories)} 条相关记忆:") - for i, memory in enumerate(memories, 1): - logger.info( - f" {i}. (相关度: {memory['score']:.3f}): {json.dumps(memory['memory_info'], ensure_ascii=False, indent=2)}") - else: - logger.info("未找到相关记忆。") - return memories - - except Exception as e: - error_message = str(e) - if "1000023" in error_message: - retry_attempt += 1 - logger.info(f"记忆索引正在构建中。将在60秒后重试... (尝试次数 {retry_attempt})") - time.sleep(60) - else: - logger.info(f"搜索记忆时出错 (不可重试): {e}") - return [] - - -def handle_conversation_turn(memory_service, llm_client, collection_name, user_id, user_message, conversation_history): - """处理一轮对话,包括记忆搜索和LLM响应。""" - logger.info("\n" + "=" * 60) - logger.info(f"用户: {user_message}") - - relevant_memories = search_relevant_memories(memory_service, collection_name, user_id, user_message) - - system_prompt = "你是一个富有同情心、善于倾听的AI伙伴,拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。" - if relevant_memories: - memory_context = "\n".join( - [f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories]) - system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。" - - logger.info("AI正在思考...") - - try: - messages = [{"role": "system", "content": system_prompt}] + conversation_history + [ - {"role": "user", "content": user_message}] - completion = llm_client.chat.completions.create( - model="doubao-seed-1-6-flash-250715", - messages=messages - ) - assistant_reply = completion.choices[0].message.content - except Exception as e: - logger.info(f"LLM调用失败: {e}") - assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?" - - logger.info(f"伙伴: {assistant_reply}") - - conversation_history.extend([ - {"role": "user", "content": user_message}, - {"role": "assistant", "content": assistant_reply} - ]) - return assistant_reply - - -def archive_conversation(memory_service, collection_name, user_id, assistant_id, conversation_history, topic_name): - """将对话历史归档到记忆数据库。""" - if not conversation_history: - logger.info("没有对话可以归档。") - return False - - logger.info(f"\n正在归档关于 '{topic_name}' 的对话...") - session_id = f"{topic_name}_{int(time.time())}" - metadata = { - "default_user_id": user_id, - "default_assistant_id": assistant_id, - "time": int(time.time() * 1000) - } - - try: - memory_service.add_session( - collection_name=collection_name, - session_id=session_id, - messages=conversation_history, - metadata=metadata - ) - logger.info(f"对话已成功归档,会话ID: {session_id}") - logger.info("正在等待记忆索引更新...") - return True - except Exception as e: - logger.info(f"归档对话失败: {e}") - return False - def main(): logger.info("开始测试大模型记忆功能...")