Files
dsProject/dsLightRag/Volcengine/VikingDBMemoryService.py
2025-09-07 08:02:23 +08:00

297 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
pip install volcengine
pip install --upgrade "volcengine-python-sdk[ark]"
"""
import json
import os
import threading
import time
from dotenv import load_dotenv
from volcengine.ApiInfo import ApiInfo
from volcengine.Credentials import Credentials
from volcengine.ServiceInfo import ServiceInfo
from volcengine.auth.SignerV4 import SignerV4
from volcengine.base.Service import Service
from volcenginesdkarkruntime import Ark
from Config.Config import VOLC_SECRETKEY, VOLC_ACCESSKEY, VOLC_API_KEY
# 记忆体集合名称
MEMORY_COLLECTION_NAME="dsideal_collection"
def initialize_services(ak=None, sk=None, ark_api_key=None):
"""初始化记忆数据库服务和LLM客户端"""
load_dotenv()
# 如果参数未提供,尝试从环境变量获取
if not ak:
ak = VOLC_ACCESSKEY
if not sk:
sk = VOLC_SECRETKEY
if not ark_api_key:
ark_api_key = VOLC_API_KEY
if not all([ak, sk, ark_api_key]):
raise ValueError("必须提供 VOLC_ACCESSKEY, VOLC_SECRETKEY, 和 VOLC_API_KEY。")
memory_service = VikingDBMemoryService(ak=ak, sk=sk)
llm_client = Ark(
base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key=ark_api_key,
)
return memory_service, llm_client
def ensure_collection_exists(memory_service, collection_name, description="",
builtin_event_types=["sys_event_v1", "sys_profile_collect_v1"],
builtin_entity_types=["sys_profile_v1"]):
"""检查记忆集合是否存在,如果不存在则创建。"""
try:
memory_service.get_collection(collection_name)
print(f"记忆集合 '{collection_name}' 已存在。")
except Exception as e:
error_message = str(e)
if "collection not exist" in error_message:
print(f"记忆集合 '{collection_name}' 未找到,正在创建...")
try:
memory_service.create_collection(
collection_name=collection_name,
description=description,
builtin_event_types=builtin_event_types,
builtin_entity_types=builtin_entity_types
)
print(f"记忆集合 '{collection_name}' 创建成功。")
print("等待集合准备就绪...")
except Exception as create_e:
print(f"创建集合失败: {create_e}")
raise
else:
print(f"检查集合时出错: {e}")
raise
def search_relevant_memories(memory_service, collection_name, user_id, query, limit=3):
"""搜索与用户查询相关的记忆,并在索引构建中时重试。"""
print(f"正在搜索与 '{query}' 相关的记忆...")
retry_attempt = 0
while True:
try:
filter_params = {
"user_id": user_id, # 修正为字符串类型
"memory_type": ["sys_event_v1", "sys_profile_v1"]
}
response = memory_service.search_memory(
collection_name=collection_name,
query=query,
filter=filter_params,
limit=limit
)
memories = []
if response.get('data', {}).get('count', 0) > 0:
for result in response['data']['result_list']:
if 'memory_info' in result and result['memory_info']:
memories.append({
'memory_info': result['memory_info'],
'score': result['score']
})
if memories:
if retry_attempt > 0:
print("重试后搜索成功。")
print(f"找到 {len(memories)} 条相关记忆:")
for i, memory in enumerate(memories, 1):
print(
f" {i}. (相关度: {memory['score']:.3f}): {json.dumps(memory['memory_info'], ensure_ascii=False, indent=2)}")
else:
print("未找到相关记忆。")
return memories
except Exception as e:
error_message = str(e)
if "1000023" in error_message:
retry_attempt += 1
print(f"记忆索引正在构建中。将在60秒后重试... (尝试次数 {retry_attempt})")
time.sleep(60)
else:
print(f"搜索记忆时出错 (不可重试): {e}")
return []
class VikingDBMemoryException(Exception):
def __init__(self, code, request_id, message=None):
self.code = code
self.request_id = request_id
self.message = "{}, code:{}request_id:{}".format(message, self.code, self.request_id)
def __str__(self):
return self.message
class VikingDBMemoryService(Service):
_instance_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not hasattr(VikingDBMemoryService, "_instance"):
with VikingDBMemoryService._instance_lock:
if not hasattr(VikingDBMemoryService, "_instance"):
VikingDBMemoryService._instance = object.__new__(cls)
return VikingDBMemoryService._instance
def __init__(self, host="api-knowledgebase.mlp.cn-beijing.volces.com", region="cn-beijing", ak="", sk="",
sts_token="", scheme='https',
connection_timeout=30, socket_timeout=30):
self.service_info = VikingDBMemoryService.get_service_info(host, region, scheme, connection_timeout,
socket_timeout)
self.api_info = VikingDBMemoryService.get_api_info()
super(VikingDBMemoryService, self).__init__(self.service_info, self.api_info)
if ak:
self.set_ak(ak)
if sk:
self.set_sk(sk)
if sts_token:
self.set_session_token(session_token=sts_token)
try:
self.get_body("Ping", {}, json.dumps({}))
except Exception as e:
raise VikingDBMemoryException(1000028, "missed", "host or region is incorrect".format(str(e))) from None
def setHeader(self, header):
api_info = VikingDBMemoryService.get_api_info()
for key in api_info:
for item in header:
api_info[key].header[item] = header[item]
self.api_info = api_info
@staticmethod
def get_service_info(host, region, scheme, connection_timeout, socket_timeout):
service_info = ServiceInfo(host, {"Host": host},
Credentials('', '', 'air', region), connection_timeout, socket_timeout,
scheme=scheme)
return service_info
@staticmethod
def get_api_info():
api_info = {
"CreateCollection": ApiInfo("POST", "/api/memory/collection/create", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"GetCollection": ApiInfo("POST", "/api/memory/collection/info", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"DropCollection": ApiInfo("POST", "/api/memory/collection/delete", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"UpdateCollection": ApiInfo("POST", "/api/memory/collection/update", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"SearchMemory": ApiInfo("POST", "/api/memory/search", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"AddSession": ApiInfo("POST", "/api/memory/session/add", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
"Ping": ApiInfo("GET", "/api/memory/ping", {}, {},
{'Accept': 'application/json', 'Content-Type': 'application/json'}),
}
return api_info
def get_body(self, api, params, body):
if not (api in self.api_info):
raise Exception("no such api")
api_info = self.api_info[api]
r = self.prepare_request(api_info, params)
r.headers['Content-Type'] = 'application/json'
r.headers['Traffic-Source'] = 'SDK'
r.body = body
SignerV4.sign(r, self.service_info.credentials)
url = r.build()
resp = self.session.get(url, headers=r.headers, data=r.body,
timeout=(self.service_info.connection_timeout, self.service_info.socket_timeout))
if resp.status_code == 200:
return json.dumps(resp.json())
else:
raise Exception(resp.text.encode("utf-8"))
def get_body_exception(self, api, params, body):
try:
res = self.get_body(api, params, body)
except Exception as e:
try:
res_json = json.loads(e.args[0].decode("utf-8"))
except:
raise VikingDBMemoryException(1000028, "missed", "json load res error, res:{}".format(str(e))) from None
code = res_json.get("code", 1000028)
request_id = res_json.get("request_id", 1000028)
message = res_json.get("message", None)
raise VikingDBMemoryException(code, request_id, message)
if res == '':
raise VikingDBMemoryException(1000028, "missed",
"empty response due to unknown error, please contact customer service") from None
return res
def get_exception(self, api, params):
try:
res = self.get(api, params)
except Exception as e:
try:
res_json = json.loads(e.args[0].decode("utf-8"))
except:
raise VikingDBMemoryException(1000028, "missed", "json load res error, res:{}".format(str(e))) from None
code = res_json.get("code", 1000028)
request_id = res_json.get("request_id", 1000028)
message = res_json.get("message", None)
raise VikingDBMemoryException(code, request_id, message)
if res == '':
raise VikingDBMemoryException(1000028, "missed",
"empty response due to unknown error, please contact customer service") from None
return res
def create_collection(self, collection_name, description="", custom_event_type_schemas=[],
custom_entity_type_schemas=[], builtin_event_types=[], builtin_entity_types=[]):
params = {
"CollectionName": collection_name, "Description": description,
"CustomEventTypeSchemas": custom_event_type_schemas, "CustomEntityTypeSchemas": custom_entity_type_schemas,
"BuiltinEventTypes": builtin_event_types, "BuiltinEntityTypes": builtin_entity_types,
}
res = self.json("CreateCollection", {}, json.dumps(params))
return json.loads(res)
def get_collection(self, collection_name):
params = {"CollectionName": collection_name}
res = self.json("GetCollection", {}, json.dumps(params))
return json.loads(res)
def drop_collection(self, collection_name):
params = {"CollectionName": collection_name}
res = self.json("DropCollection", {}, json.dumps(params))
return json.loads(res)
def update_collection(self, collection_name, custom_event_type_schemas=[], custom_entity_type_schemas=[],
builtin_event_types=[], builtin_entity_types=[]):
params = {
"CollectionName": collection_name,
"CustomEventTypeSchemas": custom_event_type_schemas, "CustomEntityTypeSchemas": custom_entity_type_schemas,
"BuiltinEventTypes": builtin_event_types, "BuiltinEntityTypes": builtin_entity_types,
}
res = self.json("UpdateCollection", {}, json.dumps(params))
return json.loads(res)
def search_memory(self, collection_name, query, filter, limit=10):
params = {
"collection_name": collection_name,
"query": query,
"limit": limit,
"filter": filter,
}
res = self.json("SearchMemory", {}, json.dumps(params))
return json.loads(res)
def add_session(self, collection_name, session_id, messages, metadata, entities=None):
params = {
"collection_name": collection_name,
"session_id": session_id,
"messages": messages,
"metadata": metadata,
}
if entities is not None:
params["entities"] = entities
res = self.json("AddSession", {}, json.dumps(params))
return json.loads(res)