import oss2 from WxMini.Milvus.Config.MulvusConfig import * # 初始化 OSS Bucket auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME) def upload_mp3_to_oss(file_path, oss_object_name): """ 上传本地 MP3 文件到 OSS :param file_path: 本地 MP3 文件路径 :param oss_object_name: OSS 中存储的文件名 """ try: # 上传文件 with open(file_path, 'rb') as file: bucket.put_object(oss_object_name, file) print(f"文件 {file_path} 已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}") def upload_mp3_to_oss_from_memory(oss_object_name, audio_data): """ 从内存上传 MP3 数据到 OSS :param oss_object_name: OSS 中存储的文件名 :param audio_data: 音频数据(bytes 类型) """ try: # 上传音频数据 bucket.put_object(oss_object_name, audio_data) print(f"音频数据已成功上传到 OSS,存储为 {oss_object_name}") except Exception as e: print(f"上传失败: {e}")