import json import requests from volcengine.base.Request import Request from volcengine.Credentials import Credentials from volcengine.auth.SignerV4 import SignerV4 from Config.Config import VOLC_ACCESSKEY, VOLC_SECRETKEY AK = VOLC_ACCESSKEY SK = VOLC_SECRETKEY Domain = "api-knowledgebase.mlp.cn-beijing.volces.com" def prepare_request(method, path, ak, sk, data=None): r = Request() r.set_shema("http") # 注意这里是 http r.set_method(method) r.set_host(Domain) r.set_path(path) if data is not None: r.set_body(json.dumps(data)) # 使用 air 服务和 cn-north-1 区域 credentials = Credentials(ak, sk, 'air', 'cn-north-1') SignerV4.sign(r, credentials) return r def internal_request(method, api, payload, params=None): req = prepare_request( method=method, path=api, ak=AK, sk=SK, data=payload ) r = requests.request( method=req.method, url="{}://{}{}".format(req.schema, req.host, req.path), headers=req.headers, data=req.body, params=params, ) return r # 删除记忆库 path = "/api/memory/collection/delete" payload = { "CollectionName": "emotional_support" } rsp = internal_request("POST", path, payload) print(rsp.json()) if rsp.status_code == 200: print("记忆库删除成功:", rsp.json()) else: print("错误:", rsp.status_code, rsp.text)