# https://help.aliyun.com/zh/oss/use-cases/add-signatures-on-the-client-by-using-javascript-and-upload-data-to-oss?spm=a2c4g.11186623.help-menu-31815.d_6_1_0_1.2dd15d03SrLg4Q#4f036801celh7 import hashlib import requests import time import hmac import base64 import urllib.parse from hashlib import sha1 import oss2 from WxMini.Milvus.Config.MulvusConfig import * # 初始化 OSS Bucket auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME) def upload_mp3_to_oss(file_path, oss_object_name): """ 上传本地 MP3 文件到 OSS :param file_path: 本地 MP3 文件路径 :param oss_object_name: OSS 中存储的文件名 """ try: # 上传文件 with open(file_path, 'rb') as file: bucket.put_object(oss_object_name, file) print(f"文件 {file_path} 已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}") def upload_mp3_to_oss_from_memory(oss_object_name, audio_data): """ 从内存上传 MP3 数据到 OSS :param oss_object_name: OSS 中存储的文件名 :param audio_data: 音频数据(bytes 类型) """ try: # 上传音频数据 bucket.put_object(oss_object_name, audio_data) print(f"音频数据已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}") def hmacsha256(key, data): """ 计算HMAC-SHA256哈希值的函数 :param key: 用于计算哈希的密钥,字节类型 :param data: 要进行哈希计算的数据,字符串类型 :return: 计算得到的HMAC-SHA256哈希值,字节类型 """ try: mac = hmac.new(key, data.encode(), hashlib.sha256) hmacBytes = mac.digest() return hmacBytes except Exception as e: raise RuntimeError(f"Failed to calculate HMAC-SHA256 due to {e}") def upload_to_oss(access_key_id, access_key_secret, security_token, endpoint=ENDPOINT, bucket_name=BUCKET_NAME, file_key="Upload/example.txt", file_content="Hello, OSS!"): """ 上传文件到 OSS :param access_key_id: 临时 AccessKey ID :param access_key_secret: 临时 AccessKey Secret :param security_token: 临时 SecurityToken :param endpoint: OSS Endpoint (默认值: ENDPOINT) :param bucket_name: OSS Bucket 名称 (默认值: BUCKET_NAME) :param file_key: 文件在 OSS 中的 Key (默认值: "Upload/example.txt") :param file_content: 文件内容 (默认值: "Hello, OSS!") :return: 上传结果(成功或失败) """ # 使用临时凭证初始化 OSS 客户端 auth = oss2.StsAuth(access_key_id, access_key_secret, security_token) bucket = oss2.Bucket(auth, endpoint, bucket_name) # 上传文件 try: bucket.put_object(file_key, file_content) return "文件上传成功" except oss2.exceptions.AccessDenied as e: return f"文件上传失败:{e}"