# https://help.aliyun.com/zh/oss/use-cases/add-signatures-on-the-client-by-using-javascript-and-upload-data-to-oss?spm=a2c4g.11186623.help-menu-31815.d_6_1_0_1.2dd15d03SrLg4Q#4f036801celh7 import json import oss2 from alibabacloud_sts20150401 import models as sts_20150401_models from alibabacloud_sts20150401.client import Client as Sts20150401Client from alibabacloud_tea_openapi.models import Config from WxMini.Milvus.Config.MulvusConfig import * # 初始化 OSS Bucket auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME) def upload_mp3_to_oss(file_path, oss_object_name): """ 上传本地 MP3 文件到 OSS :param file_path: 本地 MP3 文件路径 :param oss_object_name: OSS 中存储的文件名 """ try: # 上传文件 with open(file_path, 'rb') as file: bucket.put_object(oss_object_name, file) print(f"文件 {file_path} 已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}") def upload_mp3_to_oss_from_memory(oss_object_name, audio_data): """ 从内存上传 MP3 数据到 OSS :param oss_object_name: OSS 中存储的文件名 :param audio_data: 音频数据(bytes 类型) """ try: # 上传音频数据 bucket.put_object(oss_object_name, audio_data) print(f"音频数据已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}") def get_sts_token(access_key_id="LTAI5tJrhwuBzF2X9USrzubX", access_key_secret="I6ezLuYhk9z9MRjXD2q99STSpTONwW", role_arn="acs:ram::1546399445482588:role/huanghai-create-role", role_session_name="role_session_name"): """ 获取 STS 临时凭证 :param access_key_id: 阿里云 AccessKey ID (默认值: "LTAI5tJrhwuBzF2X9USrzubX") :param access_key_secret: 阿里云 AccessKey Secret (默认值: "I6ezLuYhk9z9MRjXD2q99STSpTONwW") :param role_arn: RAM 角色 ARN (默认值: "acs:ram::1546399445482588:role/huanghai-create-role") :param role_session_name: 自定义会话名称 (默认值: "role_session_name") :return: 包含 AccessKeyId、AccessKeySecret 和 SecurityToken 的字典 """ # 初始化 Config config = Config( region_id=REGION_ID, access_key_id=access_key_id, access_key_secret=access_key_secret ) # 创建 STS 客户端并获取临时凭证 sts_client = Sts20150401Client(config=config) assume_role_request = sts_20150401_models.AssumeRoleRequest( role_arn=role_arn, role_session_name=role_session_name ) response = sts_client.assume_role(assume_role_request) token = json.dumps(response.body.credentials.to_map()) # 解析 token token_dict = json.loads(token) return { 'AccessKeyId': token_dict['AccessKeyId'], 'AccessKeySecret': token_dict['AccessKeySecret'], 'SecurityToken': token_dict['SecurityToken'] } def upload_to_oss(access_key_id, access_key_secret, security_token, endpoint=ENDPOINT, bucket_name=BUCKET_NAME, file_key="Upload/example.txt", file_content="Hello, OSS!"): """ 上传文件到 OSS :param access_key_id: 临时 AccessKey ID :param access_key_secret: 临时 AccessKey Secret :param security_token: 临时 SecurityToken :param endpoint: OSS Endpoint (默认值: ENDPOINT) :param bucket_name: OSS Bucket 名称 (默认值: BUCKET_NAME) :param file_key: 文件在 OSS 中的 Key (默认值: "Upload/example.txt") :param file_content: 文件内容 (默认值: "Hello, OSS!") :return: 上传结果(成功或失败) """ # 使用临时凭证初始化 OSS 客户端 auth = oss2.StsAuth(access_key_id, access_key_secret, security_token) bucket = oss2.Bucket(auth, endpoint, bucket_name) # 上传文件 try: bucket.put_object(file_key, file_content) return "文件上传成功" except oss2.exceptions.AccessDenied as e: return f"文件上传失败:{e}"