|
|
import argparse
|
|
|
import alibabacloud_oss_v2 as oss
|
|
|
|
|
|
ALY_ACCESS_KEY_ID = 'LTAI5t5jxkgJtRK8wew8fnbq'
|
|
|
ALY_ACCESS_KEY_SECRET = 'b8HXNGz7IkI3Dhv7BZx9BNBEZy1uku'
|
|
|
ALY_REGION = 'cn-hangzhou'
|
|
|
ALY_ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com'
|
|
|
ALY_BUCKET_NAME = 'ylt'
|
|
|
|
|
|
# 目标键值
|
|
|
key = "HuangHai/Test/SourceWithPhoto.jpg"
|
|
|
|
|
|
def main():
|
|
|
# 使用硬编码参数替换命令行参数
|
|
|
args = argparse.Namespace(
|
|
|
region=ALY_REGION,
|
|
|
bucket=ALY_BUCKET_NAME,
|
|
|
endpoint=ALY_ENDPOINT,
|
|
|
key=key
|
|
|
)
|
|
|
|
|
|
# 使用硬编码凭证替代环境变量
|
|
|
credentials_provider = oss.credentials.StaticCredentialsProvider(
|
|
|
ALY_ACCESS_KEY_ID,
|
|
|
ALY_ACCESS_KEY_SECRET
|
|
|
)
|
|
|
|
|
|
# 加载SDK的默认配置,并设置凭证提供者
|
|
|
cfg = oss.config.load_default()
|
|
|
cfg.credentials_provider = credentials_provider
|
|
|
# 设置配置中的区域信息
|
|
|
cfg.region = args.region
|
|
|
# 如果提供了endpoint参数,则设置配置中的endpoint
|
|
|
if args.endpoint is not None:
|
|
|
cfg.endpoint = args.endpoint
|
|
|
|
|
|
# 使用配置好的信息创建OSS客户端
|
|
|
client = oss.Client(cfg)
|
|
|
|
|
|
# 定义一个字典变量 progress_state 用于保存上传进度状态,初始值为 0
|
|
|
progress_state = {'saved': 0}
|
|
|
|
|
|
def _progress_fn(n, written, total):
|
|
|
# 使用字典存储累计写入的字节数,避免使用 global 变量
|
|
|
progress_state['saved'] += n
|
|
|
|
|
|
# 计算当前上传百分比,将已写入字节数与总字节数进行除法运算后取整
|
|
|
rate = int(100 * (float(written) / float(total)))
|
|
|
|
|
|
# 打印当前上传进度,\r 表示回到行首,实现命令行中实时刷新效果
|
|
|
# end='' 表示不换行,使下一次打印覆盖当前行
|
|
|
print(f'\r上传进度:{rate}% ', end='')
|
|
|
|
|
|
# 执行上传对象的请求,指定存储空间名称、对象名称和数据内容
|
|
|
result = client.put_object_from_file(oss.PutObjectRequest(
|
|
|
bucket=args.bucket,
|
|
|
key=args.key,
|
|
|
progress_fn=_progress_fn,
|
|
|
),
|
|
|
r"D:\dsWork\dsProject\dsLightRag\ShiTi\Docx\SourceWithPhoto.jpg", # 指定本地文件路径
|
|
|
)
|
|
|
|
|
|
# 输出请求的结果状态码、请求ID、内容MD5、ETag、CRC64校验码和版本ID,用于检查请求是否成功
|
|
|
print(f'status code: {result.status_code},'
|
|
|
f' request id: {result.request_id},'
|
|
|
f' content md5: {result.content_md5},'
|
|
|
f' etag: {result.etag},'
|
|
|
f' hash crc64: {result.hash_crc64},'
|
|
|
f' version id: {result.version_id},'
|
|
|
)
|
|
|
|
|
|
# 新增:输出完整路径
|
|
|
print(f'文件已上传至:https://{args.bucket}.{args.endpoint.replace("https://","")}/{args.key}')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() # 脚本入口,当文件被直接运行时调用main函数
|