import argparse import alibabacloud_oss_v2 as oss ALY_ACCESS_KEY_ID = 'LTAI5t5jxkgJtRK8wew8fnbq' ALY_ACCESS_KEY_SECRET = 'b8HXNGz7IkI3Dhv7BZx9BNBEZy1uku' ALY_REGION = 'cn-hangzhou' ALY_ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com' ALY_BUCKET_NAME = 'ylt' # 目标键值 key = "HuangHai/Test/SourceWithPhoto.jpg" def main(): # 使用硬编码参数替换命令行参数 args = argparse.Namespace( region=ALY_REGION, bucket=ALY_BUCKET_NAME, endpoint=ALY_ENDPOINT, key=key ) # 使用硬编码凭证替代环境变量 credentials_provider = oss.credentials.StaticCredentialsProvider( ALY_ACCESS_KEY_ID, ALY_ACCESS_KEY_SECRET ) # 加载SDK的默认配置,并设置凭证提供者 cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider # 设置配置中的区域信息 cfg.region = args.region # 如果提供了endpoint参数,则设置配置中的endpoint if args.endpoint is not None: cfg.endpoint = args.endpoint # 使用配置好的信息创建OSS客户端 client = oss.Client(cfg) # 定义一个字典变量 progress_state 用于保存上传进度状态,初始值为 0 progress_state = {'saved': 0} def _progress_fn(n, written, total): # 使用字典存储累计写入的字节数,避免使用 global 变量 progress_state['saved'] += n # 计算当前上传百分比,将已写入字节数与总字节数进行除法运算后取整 rate = int(100 * (float(written) / float(total))) # 打印当前上传进度,\r 表示回到行首,实现命令行中实时刷新效果 # end='' 表示不换行,使下一次打印覆盖当前行 print(f'\r上传进度:{rate}% ', end='') # 执行上传对象的请求,指定存储空间名称、对象名称和数据内容 result = client.put_object_from_file(oss.PutObjectRequest( bucket=args.bucket, key=args.key, progress_fn=_progress_fn, ), r"D:\dsWork\dsProject\dsLightRag\ShiTi\Docx\SourceWithPhoto.jpg", # 指定本地文件路径 ) # 输出请求的结果状态码、请求ID、内容MD5、ETag、CRC64校验码和版本ID,用于检查请求是否成功 print(f'status code: {result.status_code},' f' request id: {result.request_id},' f' content md5: {result.content_md5},' f' etag: {result.etag},' f' hash crc64: {result.hash_crc64},' f' version id: {result.version_id},' ) # 新增:输出完整路径 print(f'文件已上传至:https://{args.bucket}.{args.endpoint.replace("https://","")}/{args.key}') if __name__ == "__main__": main() # 脚本入口,当文件被直接运行时调用main函数