74 lines
2.7 KiB
Python
74 lines
2.7 KiB
Python
|
from http import HTTPStatus
|
|||
|
from urllib.parse import urlparse, unquote
|
|||
|
from pathlib import PurePosixPath
|
|||
|
import requests
|
|||
|
from dashscope import ImageSynthesis
|
|||
|
import os
|
|||
|
|
|||
|
prompt = "一副典雅庄重的对联悬挂于厅堂之中,房间是个安静古典的中式布置,桌子上放着一些青花瓷,对联上左书“义本生知人机同道善思新”,右书“通云赋智乾坤启数高志远”, 横批“智启通义”,字体飘逸,中间挂在一着一副中国风的画作,内容是岳阳楼。"
|
|||
|
|
|||
|
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx"
|
|||
|
api_key = os.getenv("DASHSCOPE_API_KEY")
|
|||
|
|
|||
|
def async_call():
|
|||
|
print('----创建任务----')
|
|||
|
task_info = create_async_task()
|
|||
|
print('----等待任务结束返回图像url----')
|
|||
|
wait_async_task(task_info)
|
|||
|
|
|||
|
|
|||
|
# 创建异步任务
|
|||
|
def create_async_task():
|
|||
|
rsp = ImageSynthesis.async_call(api_key=api_key,
|
|||
|
model="qwen-image",
|
|||
|
prompt=prompt,
|
|||
|
n=1,
|
|||
|
size='1328*1328')
|
|||
|
print(rsp)
|
|||
|
if rsp.status_code == HTTPStatus.OK:
|
|||
|
print(rsp.output)
|
|||
|
else:
|
|||
|
print('创建任务失败, status_code: %s, code: %s, message: %s' %
|
|||
|
(rsp.status_code, rsp.code, rsp.message))
|
|||
|
return rsp
|
|||
|
|
|||
|
|
|||
|
# 等待异步任务结束
|
|||
|
def wait_async_task(task):
|
|||
|
rsp = ImageSynthesis.wait(task)
|
|||
|
print(rsp)
|
|||
|
if rsp.status_code == HTTPStatus.OK:
|
|||
|
print(rsp.output)
|
|||
|
for result in rsp.output.results:
|
|||
|
file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
|
|||
|
with open('./%s' % file_name, 'wb+') as f:
|
|||
|
f.write(requests.get(result.url).content)
|
|||
|
else:
|
|||
|
print('任务执行失败, status_code: %s, code: %s, message: %s' %
|
|||
|
(rsp.status_code, rsp.code, rsp.message))
|
|||
|
|
|||
|
|
|||
|
# 获取异步任务信息
|
|||
|
def fetch_task_status(task):
|
|||
|
status = ImageSynthesis.fetch(task)
|
|||
|
print(status)
|
|||
|
if status.status_code == HTTPStatus.OK:
|
|||
|
print(status.output.task_status)
|
|||
|
else:
|
|||
|
print('获取任务失败, status_code: %s, code: %s, message: %s' %
|
|||
|
(status.status_code, status.code, status.message))
|
|||
|
|
|||
|
|
|||
|
# 取消异步任务,只有处于PENDING状态的任务才可以取消
|
|||
|
def cancel_task(task):
|
|||
|
rsp = ImageSynthesis.cancel(task)
|
|||
|
print(rsp)
|
|||
|
if rsp.status_code == HTTPStatus.OK:
|
|||
|
print(rsp.output.task_status)
|
|||
|
else:
|
|||
|
print('取消任务失败, status_code: %s, code: %s, message: %s' %
|
|||
|
(rsp.status_code, rsp.code, rsp.message))
|
|||
|
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
async_call()
|