main
HuangHai 4 months ago
parent 7a3f8b630c
commit c7d357504d

@ -11,7 +11,22 @@ from jose import JWTError, jwt
from openai import AsyncOpenAI
from passlib.context import CryptContext
from starlette.responses import StreamingResponse
import json
import oss2
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_openapi.models import Config
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_credentials.client import Client as CredentialClient
import os
import json
import base64
import hmac
import datetime
import time
import hashlib
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
@ -20,7 +35,7 @@ from WxMini.Utils.ImageUtil import *
from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session, get_user_by_login_name, \
get_chat_logs_by_risk_flag, get_chat_logs_summary
from WxMini.Utils.MySQLUtil import update_risk, get_last_chat_log_id
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory, get_sts_token
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory, hmacsha256
from WxMini.Utils.TtsUtil import TTS
# 配置日志
@ -498,23 +513,94 @@ async def chat_logs_summary(
# 获取上传OSS的授权Token
@app.get("/aichat/get_oss_upload_token")
async def get_oss_upload_token(current_user: dict = Depends(get_current_user)):
# 获取 STS 临时凭证
sts_token = get_sts_token()
return {
"success": True,
"message": "获取上传凭证成功",
"data": {
"access_key_id": sts_token['AccessKeyId'],
"access_key_secret": sts_token['AccessKeySecret'],
"security_token": sts_token['SecurityToken'],
"bucket_name": BUCKET_NAME,
"endpoint": ENDPOINT,
"login_name": current_user["login_name"],
"person_name": current_user["person_name"]
}
@app.get("/aichat/get_post_signature_for_oss_upload")
async def generate_upload_params(current_user: dict = Depends(get_current_user)):
logger.info(f"current_user={current_user['login_name']}")
# 子账号的AK,SK,ARN
access_key_id = "LTAI5tJrhwuBzF2X9USrzubX"
access_key_secret = "I6ezLuYhk9z9MRjXD2q99STSpTONwW"
role_arn_for_oss_upload = "acs:ram::1546399445482588:role/huanghai-create-role"
# 桶名
oss_bucket = 'hzkc'
# 区域
region_id = 'cn-beijing'
host = f'http://{oss_bucket}.oss-cn-beijing.aliyuncs.com'
upload_dir = 'Upload' # 指定上传到OSS的文件前缀。
role_session_name = 'role_session_name' # 自定义会话名称
# 初始化配置,直接传递凭据
config = Config(
region_id=region_id,
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# 创建 STS 客户端并获取临时凭证
sts_client = Sts20150401Client(config=config)
assume_role_request = sts_20150401_models.AssumeRoleRequest(
role_arn=role_arn_for_oss_upload,
role_session_name=role_session_name
)
response = sts_client.assume_role(assume_role_request)
token_data = response.body.credentials.to_map()
# 使用 STS 返回的临时凭据
sts_access_key_id = token_data['AccessKeyId']
sts_access_key_secret = token_data['AccessKeySecret']
security_token = token_data['SecurityToken']
now = int(time.time())
# 将时间戳转换为datetime对象
dt_obj = datetime.datetime.utcfromtimestamp(now)
# 在当前时间增加3小时设置为请求的过期时间
dt_obj_plus_3h = dt_obj + datetime.timedelta(hours=1)
# 请求时间
dt_obj_1 = dt_obj.strftime('%Y%m%dT%H%M%S') + 'Z'
# 请求日期
dt_obj_2 = dt_obj.strftime('%Y%m%d')
# 请求过期时间
expiration_time = dt_obj_plus_3h.strftime('%Y-%m-%dT%H:%M:%S.000Z')
# 构建 Policy 并生成签名
policy = {
"expiration": expiration_time,
"conditions": [
["eq", "$success_action_status", "200"],
{"x-oss-signature-version": "OSS4-HMAC-SHA256"},
{"x-oss-credential": f"{sts_access_key_id}/{dt_obj_2}/{region_id}/oss/aliyun_v4_request"},
{"x-oss-security-token": security_token},
{"x-oss-date": dt_obj_1},
]
}
policy_str = json.dumps(policy).strip()
# 步骤2构造待签名字符串StringToSign
stringToSign = base64.b64encode(policy_str.encode()).decode()
# 步骤3计算SigningKey
dateKey = hmacsha256(("aliyun_v4" + sts_access_key_secret).encode(), dt_obj_2)
dateRegionKey = hmacsha256(dateKey, region_id)
dateRegionServiceKey = hmacsha256(dateRegionKey, "oss")
signingKey = hmacsha256(dateRegionServiceKey, "aliyun_v4_request")
# 步骤4计算Signature
result = hmacsha256(signingKey, stringToSign)
signature = result.hex()
# 组织返回数据
response_data = {
'policy': stringToSign, # 表单域
'x_oss_signature_version': "OSS4-HMAC-SHA256", # 指定签名的版本和算法固定值为OSS4-HMAC-SHA256
'x_oss_credential': f"{sts_access_key_id}/{dt_obj_2}/{region_id}/oss/aliyun_v4_request", # 指明派生密钥的参数集
'x_oss_date': dt_obj_1, # 请求的时间
'signature': signature, # 签名认证描述信息
'host': host,
'dir': upload_dir,
'security_token': security_token # 安全令牌
}
return response_data
@app.get("/aichat/process_image")

@ -1,9 +1,8 @@
# https://help.aliyun.com/zh/oss/use-cases/add-signatures-on-the-client-by-using-javascript-and-upload-data-to-oss?spm=a2c4g.11186623.help-menu-31815.d_6_1_0_1.2dd15d03SrLg4Q#4f036801celh7
import json
import hashlib
import hmac
import oss2
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_tea_openapi.models import Config
from WxMini.Milvus.Config.MulvusConfig import *
@ -11,7 +10,6 @@ from WxMini.Milvus.Config.MulvusConfig import *
auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)
def upload_mp3_to_oss(file_path, oss_object_name):
"""
上传本地 MP3 文件到 OSS
@ -41,38 +39,20 @@ def upload_mp3_to_oss_from_memory(oss_object_name, audio_data):
print(f"上传失败: {e}")
def get_sts_token(access_key_id="LTAI5tJrhwuBzF2X9USrzubX", access_key_secret="I6ezLuYhk9z9MRjXD2q99STSpTONwW", role_arn="acs:ram::1546399445482588:role/huanghai-create-role", role_session_name="role_session_name"):
def hmacsha256(key, data):
"""
获取 STS 临时凭证
:param access_key_id: 阿里云 AccessKey ID (默认值: "LTAI5tJrhwuBzF2X9USrzubX")
:param access_key_secret: 阿里云 AccessKey Secret (默认值: "I6ezLuYhk9z9MRjXD2q99STSpTONwW")
:param role_arn: RAM 角色 ARN (默认值: "acs:ram::1546399445482588:role/huanghai-create-role")
:param role_session_name: 自定义会话名称 (默认值: "role_session_name")
:return: 包含 AccessKeyIdAccessKeySecret SecurityToken 的字典
"""
# 初始化 Config
config = Config(
region_id=REGION_ID,
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# 创建 STS 客户端并获取临时凭证
sts_client = Sts20150401Client(config=config)
assume_role_request = sts_20150401_models.AssumeRoleRequest(
role_arn=role_arn,
role_session_name=role_session_name
)
response = sts_client.assume_role(assume_role_request)
token = json.dumps(response.body.credentials.to_map())
计算HMAC-SHA256哈希值的函数
# 解析 token
token_dict = json.loads(token)
return {
'AccessKeyId': token_dict['AccessKeyId'],
'AccessKeySecret': token_dict['AccessKeySecret'],
'SecurityToken': token_dict['SecurityToken']
}
:param key: 用于计算哈希的密钥字节类型
:param data: 要进行哈希计算的数据字符串类型
:return: 计算得到的HMAC-SHA256哈希值字节类型
"""
try:
mac = hmac.new(key, data.encode(), hashlib.sha256)
hmacBytes = mac.digest()
return hmacBytes
except Exception as e:
raise RuntimeError(f"Failed to calculate HMAC-SHA256 due to {e}")
def upload_to_oss(access_key_id, access_key_secret, security_token, endpoint=ENDPOINT, bucket_name=BUCKET_NAME, file_key="Upload/example.txt", file_content="Hello, OSS!"):
"""

Loading…
Cancel
Save