From 6d3753176a9bf4002e91c2b6ff63ac0e2dafbf40 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Sun, 7 Sep 2025 13:58:13 +0800 Subject: [PATCH] 'commit' --- dsLightRag/Volcengine/T3_ChatWithMemory.py | 39 +- dsLightRag/Volcengine/chat.py.backup | 399 --------------------- 2 files changed, 20 insertions(+), 418 deletions(-) delete mode 100644 dsLightRag/Volcengine/chat.py.backup diff --git a/dsLightRag/Volcengine/T3_ChatWithMemory.py b/dsLightRag/Volcengine/T3_ChatWithMemory.py index 5f5e51d3..1198fca4 100644 --- a/dsLightRag/Volcengine/T3_ChatWithMemory.py +++ b/dsLightRag/Volcengine/T3_ChatWithMemory.py @@ -42,25 +42,25 @@ def main(): # 使用正确的handle_conversation_turn方法参数 response = handle_conversation_turn( - memory_service=memory_service,# 内存记忆服务 - llm_client=llm_client,# 大模型客户端 - collection_name=collection_name,# 集合名称 - user_id=user_id,# 用户ID - assistant_id=assistant_id,# 助手ID - user_message=f"请记住以下用户信息:{user_info}",# 要记忆的信息 - conversation_history=conversation_history # 对话历史 + memory_service=memory_service, # 内存记忆服务 + llm_client=llm_client, # 大模型客户端 + collection_name=collection_name, # 集合名称 + user_id=user_id, # 用户ID + assistant_id=assistant_id, # 助手ID + user_message=f"请记住以下用户信息:{user_info}", # 要记忆的信息 + conversation_history=conversation_history # 对话历史 ) logger.info(f"模型回复: {response}") # 归档对话 archive_conversation( - memory_service=memory_service, - collection_name=collection_name, - user_id=user_id, - assistant_id=assistant_id, - conversation_history=conversation_history, - topic_name="user_info" + memory_service=memory_service, # 内存记忆服务 + collection_name=collection_name, # 集合名称 + user_id=user_id, # 用户ID + assistant_id=assistant_id, # 助手ID + conversation_history=conversation_history, # 对话历史 + topic_name="user_info" # 会话主题 ) # 等待一段时间确保索引更新 @@ -72,15 +72,16 @@ def main(): test_conversation_history = [] test_response = handle_conversation_turn( - memory_service=memory_service, + memory_service=memory_service, # 内存记忆服务 llm_client=llm_client, - collection_name=collection_name, - user_id=user_id, - assistant_id=assistant_id, # 添加这一行 - user_message="请告诉我李明的个人信息", - conversation_history=test_conversation_history + collection_name=collection_name,# 集合名称 + user_id=user_id, # 用户ID + assistant_id=assistant_id, # 助手ID + user_message="请告诉我李明的个人信息",# 交流的信息 + conversation_history=test_conversation_history # 对话历史 ) + # 这次交互就没有记录信息,看来需要根据业务来决定是不是记录到记忆体中 logger.info(f"测试回复: {test_response}") # 检查回复中是否包含关键信息 diff --git a/dsLightRag/Volcengine/chat.py.backup b/dsLightRag/Volcengine/chat.py.backup deleted file mode 100644 index 2726c558..00000000 --- a/dsLightRag/Volcengine/chat.py.backup +++ /dev/null @@ -1,399 +0,0 @@ -import json -import threading -import time -from dotenv import load_dotenv -from volcengine.ApiInfo import ApiInfo -from volcengine.Credentials import Credentials -from volcengine.ServiceInfo import ServiceInfo -from volcengine.auth.SignerV4 import SignerV4 -from volcengine.base.Service import Service -from volcenginesdkarkruntime import Ark - -from Config.Config import VOLC_SECRETKEY, VOLC_API_KEY, VOLC_ACCESSKEY - - -class VikingDBMemoryException(Exception): - def __init__(self, code, request_id, message=None): - self.code = code - self.request_id = request_id - self.message = "{}, code:{},request_id:{}".format(message, self.code, self.request_id) - - def __str__(self): - return self.message - - -class VikingDBMemoryService(Service): - _instance_lock = threading.Lock() - - def __new__(cls, *args, **kwargs): - if not hasattr(VikingDBMemoryService, "_instance"): - with VikingDBMemoryService._instance_lock: - if not hasattr(VikingDBMemoryService, "_instance"): - VikingDBMemoryService._instance = object.__new__(cls) - return VikingDBMemoryService._instance - - def __init__(self, host="api-knowledgebase.mlp.cn-beijing.volces.com", region="cn-beijing", ak="", sk="", - sts_token="", scheme='https', - connection_timeout=30, socket_timeout=30): - self.service_info = VikingDBMemoryService.get_service_info(host, region, scheme, connection_timeout, - socket_timeout) - self.api_info = VikingDBMemoryService.get_api_info() - super(VikingDBMemoryService, self).__init__(self.service_info, self.api_info) - if ak: - self.set_ak(ak) - if sk: - self.set_sk(sk) - if sts_token: - self.set_session_token(session_token=sts_token) - try: - self.get_body("Ping", {}, json.dumps({})) - except Exception as e: - raise VikingDBMemoryException(1000028, "missed", "host or region is incorrect".format(str(e))) from None - - def setHeader(self, header): - api_info = VikingDBMemoryService.get_api_info() - for key in api_info: - for item in header: - api_info[key].header[item] = header[item] - self.api_info = api_info - - @staticmethod - def get_service_info(host, region, scheme, connection_timeout, socket_timeout): - service_info = ServiceInfo(host, {"Host": host}, - Credentials('', '', 'air', region), connection_timeout, socket_timeout, - scheme=scheme) - return service_info - - @staticmethod - def get_api_info(): - api_info = { - "CreateCollection": ApiInfo("POST", "/api/memory/collection/create", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - "GetCollection": ApiInfo("POST", "/api/memory/collection/info", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - "DropCollection": ApiInfo("POST", "/api/memory/collection/delete", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - "UpdateCollection": ApiInfo("POST", "/api/memory/collection/update", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - - "SearchMemory": ApiInfo("POST", "/api/memory/search", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - "AddSession": ApiInfo("POST", "/api/memory/session/add", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - - "Ping": ApiInfo("GET", "/api/memory/ping", {}, {}, - {'Accept': 'application/json', 'Content-Type': 'application/json'}), - } - return api_info - - def get_body(self, api, params, body): - if not (api in self.api_info): - raise Exception("no such api") - api_info = self.api_info[api] - r = self.prepare_request(api_info, params) - r.headers['Content-Type'] = 'application/json' - r.headers['Traffic-Source'] = 'SDK' - r.body = body - - SignerV4.sign(r, self.service_info.credentials) - - url = r.build() - resp = self.session.get(url, headers=r.headers, data=r.body, - timeout=(self.service_info.connection_timeout, self.service_info.socket_timeout)) - if resp.status_code == 200: - return json.dumps(resp.json()) - else: - raise Exception(resp.text.encode("utf-8")) - - def get_body_exception(self, api, params, body): - try: - res = self.get_body(api, params, body) - except Exception as e: - try: - res_json = json.loads(e.args[0].decode("utf-8")) - except: - raise VikingDBMemoryException(1000028, "missed", "json load res error, res:{}".format(str(e))) from None - code = res_json.get("code", 1000028) - request_id = res_json.get("request_id", 1000028) - message = res_json.get("message", None) - - raise VikingDBMemoryException(code, request_id, message) - - if res == '': - raise VikingDBMemoryException(1000028, "missed", - "empty response due to unknown error, please contact customer service") from None - return res - - def get_exception(self, api, params): - try: - res = self.get(api, params) - except Exception as e: - try: - res_json = json.loads(e.args[0].decode("utf-8")) - except: - raise VikingDBMemoryException(1000028, "missed", "json load res error, res:{}".format(str(e))) from None - code = res_json.get("code", 1000028) - request_id = res_json.get("request_id", 1000028) - message = res_json.get("message", None) - raise VikingDBMemoryException(code, request_id, message) - if res == '': - raise VikingDBMemoryException(1000028, "missed", - "empty response due to unknown error, please contact customer service") from None - return res - - def create_collection(self, collection_name, description="", custom_event_type_schemas=[], - custom_entity_type_schemas=[], builtin_event_types=[], builtin_entity_types=[]): - params = { - "CollectionName": collection_name, "Description": description, - "CustomEventTypeSchemas": custom_event_type_schemas, "CustomEntityTypeSchemas": custom_entity_type_schemas, - "BuiltinEventTypes": builtin_event_types, "BuiltinEntityTypes": builtin_entity_types, - } - res = self.json("CreateCollection", {}, json.dumps(params)) - return json.loads(res) - - def get_collection(self, collection_name): - params = {"CollectionName": collection_name} - res = self.json("GetCollection", {}, json.dumps(params)) - return json.loads(res) - - def drop_collection(self, collection_name): - params = {"CollectionName": collection_name} - res = self.json("DropCollection", {}, json.dumps(params)) - return json.loads(res) - - def update_collection(self, collection_name, custom_event_type_schemas=[], custom_entity_type_schemas=[], - builtin_event_types=[], builtin_entity_types=[]): - params = { - "CollectionName": collection_name, - "CustomEventTypeSchemas": custom_event_type_schemas, "CustomEntityTypeSchemas": custom_entity_type_schemas, - "BuiltinEventTypes": builtin_event_types, "BuiltinEntityTypes": builtin_entity_types, - } - res = self.json("UpdateCollection", {}, json.dumps(params)) - return json.loads(res) - - def search_memory(self, collection_name, query, filter, limit=10): - params = { - "collection_name": collection_name, - "query": query, - "limit": limit, - "filter": filter, - } - res = self.json("SearchMemory", {}, json.dumps(params)) - return json.loads(res) - - def add_session(self, collection_name, session_id, messages, metadata, entities=None): - params = { - "collection_name": collection_name, - "session_id": session_id, - "messages": messages, - "metadata": metadata, - } - if entities is not None: - params["entities"] = entities - res = self.json("AddSession", {}, json.dumps(params)) - return json.loads(res) - - - - -def initialize_services(): - load_dotenv() - ak = VOLC_ACCESSKEY - sk = VOLC_SECRETKEY - ark_api_key = VOLC_API_KEY - - if not all([ak, sk, ark_api_key]): - raise ValueError("必须在环境变量中设置 VOLC_ACCESSKEY, VOLC_SECRETKEY, 和 ARK_API_KEY。") - - memory_service = VikingDBMemoryService(ak=ak, sk=sk) - llm_client = Ark( - base_url="https://ark.cn-beijing.volces.com/api/v3", - api_key=ark_api_key, - ) - return memory_service, llm_client - - -def ensure_collection_exists(memory_service, collection_name): - """检查记忆集合是否存在,如果不存在则创建。""" - try: - memory_service.get_collection(collection_name) - print(f"记忆集合 '{collection_name}' 已存在。") - except Exception as e: - error_message = str(e) - if "collection not exist" in error_message: - print(f"记忆集合 '{collection_name}' 未找到,正在创建...") - try: - memory_service.create_collection( - collection_name=collection_name, - description="中文情感陪伴场景测试", - builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"], - builtin_entity_types=["sys_profile_v1"] - ) - print(f"记忆集合 '{collection_name}' 创建成功。") - print("等待集合准备就绪...") - except Exception as create_e: - print(f"创建集合失败: {create_e}") - raise - else: - print(f"检查集合时出错: {e}") - raise - - -def search_relevant_memories(memory_service, collection_name, user_id, query): - """搜索与用户查询相关的记忆,并在索引构建中时重试。""" - print(f"正在搜索与 '{query}' 相关的记忆...") - retry_attempt = 0 - while True: - try: - filter_params = { - "user_id": [user_id], - "memory_type": ["sys_event_v1", "sys_profile_v1"] - } - response = memory_service.search_memory( - collection_name=collection_name, - query=query, - filter=filter_params, - limit=3 - ) - - memories = [] - if response.get('data', {}).get('count', 0) > 0: - for result in response['data']['result_list']: - if 'memory_info' in result and result['memory_info']: - memories.append({ - 'memory_info': result['memory_info'], - 'score': result['score'] - }) - - if memories: - if retry_attempt > 0: - print("重试后搜索成功。") - print(f"找到 {len(memories)} 条相关记忆:") - for i, memory in enumerate(memories, 1): - print( - f" {i}. (相关度: {memory['score']:.3f}): {json.dumps(memory['memory_info'], ensure_ascii=False, indent=2)}") - else: - print("未找到相关记忆。") - return memories - - except Exception as e: - error_message = str(e) - if "1000023" in error_message: - retry_attempt += 1 - print(f"记忆索引正在构建中。将在60秒后重试... (尝试次数 {retry_attempt})") - time.sleep(60) - else: - print(f"搜索记忆时出错 (不可重试): {e}") - return [] - - -def handle_conversation_turn(memory_service, llm_client, collection_name, user_id, user_message, conversation_history): - """处理一轮对话,包括记忆搜索和LLM响应。""" - print("\n" + "=" * 60) - print(f"用户: {user_message}") - - relevant_memories = search_relevant_memories(memory_service, collection_name, user_id, user_message) - - system_prompt = "你是一个富有同情心、善于倾听的AI伙伴,拥有长期记忆能力。你的目标是为用户提供情感支持和温暖的陪伴。" - if relevant_memories: - memory_context = "\n".join( - [f"- {json.dumps(mem['memory_info'], ensure_ascii=False)}" for mem in relevant_memories]) - system_prompt += f"\n\n这是我们过去的一些对话记忆,请参考:\n{memory_context}\n\n请利用这些信息来更好地理解和回应用户。" - - print("AI正在思考...") - - try: - messages = [{"role": "system", "content": system_prompt}] + conversation_history + [ - {"role": "user", "content": user_message}] - completion = llm_client.chat.completions.create( - model="doubao-seed-1-6-flash-250715", - messages=messages - ) - assistant_reply = completion.choices[0].message.content - except Exception as e: - print(f"LLM调用失败: {e}") - assistant_reply = "抱歉,我现在有点混乱,无法回应。我们可以稍后再聊吗?" - - print(f"伙伴: {assistant_reply}") - - conversation_history.extend([ - {"role": "user", "content": user_message}, - {"role": "assistant", "content": assistant_reply} - ]) - return assistant_reply - - -def archive_conversation(memory_service, collection_name, user_id, assistant_id, conversation_history, topic_name): - """将对话历史归档到记忆数据库。""" - if not conversation_history: - print("没有对话可以归档。") - return False - - print(f"\n正在归档关于 '{topic_name}' 的对话...") - session_id = f"{topic_name}_{int(time.time())}" - metadata = { - "default_user_id": user_id, - "default_assistant_id": assistant_id, - "time": int(time.time() * 1000) - } - - try: - memory_service.add_session( - collection_name=collection_name, - session_id=session_id, - messages=conversation_history, - metadata=metadata - ) - print(f"对话已成功归档,会话ID: {session_id}") - print("正在等待记忆索引更新...") - return True - except Exception as e: - print(f"归档对话失败: {e}") - return False - - -def main(): - print("开始端到端记忆测试...") - - try: - memory_service, llm_client = initialize_services() - collection_name = "emotional_support" - user_id = "xiaoming" - assistant_id = "assistant" - ensure_collection_exists(memory_service, collection_name) - except Exception as e: - print(f"初始化失败: {e}") - return - - print("\n--- 阶段 1: 初始对话 ---") - initial_conversation_history = [] - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "你好,我是小明,今年18岁,但压力好大。", - initial_conversation_history - ) - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "马上就要高考了,家里人的期待好高。", - initial_conversation_history - ) - - print("\n--- 阶段 2: 归档记忆 ---") - archive_conversation( - memory_service, collection_name, user_id, assistant_id, - initial_conversation_history, "study_stress_discussion" - ) - - print("\n--- 阶段 3: 验证记忆 ---") - verification_conversation_history = [] - handle_conversation_turn( - memory_service, llm_client, collection_name, user_id, - "我最近很焦虑,不知道该怎么办。", - verification_conversation_history - ) - - print("\n端到端记忆测试完成!") - - -if __name__ == "__main__": - main() \ No newline at end of file