main
HuangHai 4 months ago
parent cecc04859e
commit 07e69b31aa

@ -60,6 +60,7 @@ ACCESS_KEY_SECRET = 'oizcTOZ8izbGUouboC00RcmGE8vBQ1'
BUCKET_NAME = 'hzkc'
ENDPOINT = 'https://oss-cn-beijing.aliyuncs.com'
OSS_PREFIX = "https://hzkc.oss-cn-beijing.aliyuncs.com/"
REGION_ID = 'cn-beijing'
# 阿里云中用来调用 deepseek v3 的密钥
# https://bailian.console.aliyun.com/?spm=a2c4g.11186623.0.0.77b17980FcMVYv&apiKey=1#/api-key

@ -15,6 +15,7 @@ from WxMini.Utils.MySQLUtil import init_mysql_pool, save_chat_to_mysql, get_chat
get_last_chat_log_id, get_user_by_login_name, get_chat_logs_by_risk_flag
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory
from WxMini.Utils.TtsUtil import TTS
from WxMini.Utils.OssUtil import get_sts_token, upload_to_oss
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
@ -309,12 +310,13 @@ async def get_chat_log(
result = await get_chat_log_by_session(app.state.mysql_pool, person_id, page, page_size)
return result
# 获取风险聊天记录接口
@app.get("/aichat/get_risk_chat_logs")
async def get_risk_chat_logs(
risk_flag: int = Query(..., description="风险标志1 表示有风险0 表示无风险)"),
page: int = Query(default=1, ge=1, description="当前页码(默认值为 1"),
page_size: int = Query(default=10, ge=1, le=100, description="每页记录数(默认值为 10最大值为 100")
risk_flag: int = Query(..., description="风险标志1 表示有风险0 表示无风险)"),
page: int = Query(default=1, ge=1, description="当前页码(默认值为 1"),
page_size: int = Query(default=10, ge=1, le=100, description="每页记录数(默认值为 10最大值为 100")
):
"""
获取聊天记录支持分页和风险标志过滤
@ -344,6 +346,24 @@ async def get_risk_chat_logs(
}
# 获取上传OSS的授权Token
@app.get("/aichat/get_oss_upload_token")
async def get_oss_upload_token():
# 获取 STS 临时凭证
sts_token = get_sts_token()
return {
"code": 200,
"message": "获取上传凭证成功",
"data": {
"access_key_id": sts_token['AccessKeyId'],
"access_key_secret": sts_token['AccessKeySecret'],
"security_token": sts_token['SecurityToken'],
"bucket_name": BUCKET_NAME,
"endpoint": ENDPOINT
}
}
# 运行 FastAPI 应用
if __name__ == "__main__":
import uvicorn

@ -0,0 +1,17 @@
from WxMini.Utils.OssUtil import get_sts_token, upload_to_oss
# 测试调用
if __name__ == "__main__":
# 获取 STS 临时凭证
sts_token = get_sts_token()
# print(f"AccessKeyId: {sts_token['AccessKeyId']}")
# print(f"AccessKeySecret: {sts_token['AccessKeySecret']}")
# print(f"SecurityToken: {sts_token['SecurityToken']}")
# 上传文件到 OSS
result = upload_to_oss(
sts_token['AccessKeyId'],
sts_token['AccessKeySecret'],
sts_token['SecurityToken']
)
print(result)

@ -1,52 +0,0 @@
import json
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_tea_openapi.models import Config
# https://help.aliyun.com/zh/oss/use-cases/add-signatures-on-the-client-by-using-javascript-and-upload-data-to-oss?spm=a2c4g.11186623.help-menu-31815.d_6_1_0_1.2dd15d03SrLg4Q#4f036801celh7
# 配置阿里云 AccessKey 和 RAM 角色 ARN
access_key_id = "LTAI5tJrhwuBzF2X9USrzubX"
access_key_secret = "I6ezLuYhk9z9MRjXD2q99STSpTONwW"
role_arn_for_oss_upload = "acs:ram::1546399445482588:role/huanghai-create-role" # RAM 角色 ARN
# 替换为实际的 bucket 名称和 region-id
BUCKET_NAME = 'hzkc'
REGION_ID = 'cn-beijing'
ENDPOINT = 'https://oss-cn-beijing.aliyuncs.com'
OSS_PREFIX = "https://hzkc.oss-cn-beijing.aliyuncs.com/"
host = OSS_PREFIX
expire_time = 3600 # 指定过期时间,单位为秒
upload_dir = 'dir' # 指定上传到 OSS 的文件前缀
role_session_name = 'role_session_name' # 自定义会话名称
# 直接使用 access_key_id 和 access_key_secret 初始化 Config
config = Config(
region_id=REGION_ID,
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# 创建 STS 客户端并获取临时凭证
sts_client = Sts20150401Client(config=config)
assume_role_request = sts_20150401_models.AssumeRoleRequest(
role_arn=role_arn_for_oss_upload,
role_session_name=role_session_name
)
response = sts_client.assume_role(assume_role_request)
token = json.dumps(response.body.credentials.to_map())
# 将 JSON 字符串解析为 Python 字典
token_dict = json.loads(token)
# 提取字段
access_key_id = token_dict['AccessKeyId']
access_key_secret = token_dict['AccessKeySecret']
expiration = token_dict['Expiration']
security_token = token_dict['SecurityToken']
# 打印提取的字段
print(f"AccessKeyId: {access_key_id}")
print(f"AccessKeySecret: {access_key_secret}")
print(f"Expiration: {expiration}")
print(f"SecurityToken: {security_token}")

@ -1,4 +1,10 @@
# https://help.aliyun.com/zh/oss/use-cases/add-signatures-on-the-client-by-using-javascript-and-upload-data-to-oss?spm=a2c4g.11186623.help-menu-31815.d_6_1_0_1.2dd15d03SrLg4Q#4f036801celh7
import json
import oss2
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_tea_openapi.models import Config
from WxMini.Milvus.Config.MulvusConfig import *
# 初始化 OSS Bucket
@ -32,4 +38,61 @@ def upload_mp3_to_oss_from_memory(oss_object_name, audio_data):
bucket.put_object(oss_object_name, audio_data)
print(f"音频数据已成功上传到 OSS存储为 {oss_object_name}")
except Exception as e:
print(f"上传失败: {e}")
print(f"上传失败: {e}")
def get_sts_token(access_key_id="LTAI5tJrhwuBzF2X9USrzubX", access_key_secret="I6ezLuYhk9z9MRjXD2q99STSpTONwW", role_arn="acs:ram::1546399445482588:role/huanghai-create-role", role_session_name="role_session_name"):
"""
获取 STS 临时凭证
:param access_key_id: 阿里云 AccessKey ID (默认值: "LTAI5tJrhwuBzF2X9USrzubX")
:param access_key_secret: 阿里云 AccessKey Secret (默认值: "I6ezLuYhk9z9MRjXD2q99STSpTONwW")
:param role_arn: RAM 角色 ARN (默认值: "acs:ram::1546399445482588:role/huanghai-create-role")
:param role_session_name: 自定义会话名称 (默认值: "role_session_name")
:return: 包含 AccessKeyIdAccessKeySecret SecurityToken 的字典
"""
# 初始化 Config
config = Config(
region_id=REGION_ID,
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# 创建 STS 客户端并获取临时凭证
sts_client = Sts20150401Client(config=config)
assume_role_request = sts_20150401_models.AssumeRoleRequest(
role_arn=role_arn,
role_session_name=role_session_name
)
response = sts_client.assume_role(assume_role_request)
token = json.dumps(response.body.credentials.to_map())
# 解析 token
token_dict = json.loads(token)
return {
'AccessKeyId': token_dict['AccessKeyId'],
'AccessKeySecret': token_dict['AccessKeySecret'],
'SecurityToken': token_dict['SecurityToken']
}
def upload_to_oss(access_key_id, access_key_secret, security_token, endpoint=ENDPOINT, bucket_name=BUCKET_NAME, file_key="Upload/example.txt", file_content="Hello, OSS!"):
"""
上传文件到 OSS
:param access_key_id: 临时 AccessKey ID
:param access_key_secret: 临时 AccessKey Secret
:param security_token: 临时 SecurityToken
:param endpoint: OSS Endpoint (默认值: ENDPOINT)
:param bucket_name: OSS Bucket 名称 (默认值: BUCKET_NAME)
:param file_key: 文件在 OSS 中的 Key (默认值: "Upload/example.txt")
:param file_content: 文件内容 (默认值: "Hello, OSS!")
:return: 上传结果成功或失败
"""
# 使用临时凭证初始化 OSS 客户端
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 上传文件
try:
bucket.put_object(file_key, file_content)
return "文件上传成功"
except oss2.exceptions.AccessDenied as e:
return f"文件上传失败:{e}"
Loading…
Cancel
Save