Files
dsProject/dsLightRag/Test/QWenImage/T2_QWenImageAsync.py
2025-08-27 08:15:34 +08:00

75 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from http import HTTPStatus
from urllib.parse import urlparse, unquote
from pathlib import PurePosixPath
import requests
from dashscope import ImageSynthesis
import os
from Config.Config import ALY_LLM_API_KEY
prompt = "一副典雅庄重的对联悬挂于厅堂之中,房间是个安静古典的中式布置,桌子上放着一些青花瓷,对联上左书“义本生知人机同道善思新”,右书“通云赋智乾坤启数高志远”, 横批“智启通义”,字体飘逸,中间挂在一着一副中国风的画作,内容是岳阳楼。"
api_key = ALY_LLM_API_KEY
def async_call():
print('----创建任务----')
task_info = create_async_task()
print('----等待任务结束返回图像url----')
wait_async_task(task_info)
# 创建异步任务
def create_async_task():
rsp = ImageSynthesis.async_call(api_key=api_key,
model="qwen-image",
prompt=prompt,
n=1,
size='1328*1328')
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output)
else:
print('创建任务失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
return rsp
# 等待异步任务结束
def wait_async_task(task):
rsp = ImageSynthesis.wait(task)
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output)
for result in rsp.output.results:
file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
with open('./%s' % file_name, 'wb+') as f:
f.write(requests.get(result.url).content)
else:
print('任务执行失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
# 获取异步任务信息
def fetch_task_status(task):
status = ImageSynthesis.fetch(task)
print(status)
if status.status_code == HTTPStatus.OK:
print(status.output.task_status)
else:
print('获取任务失败, status_code: %s, code: %s, message: %s' %
(status.status_code, status.code, status.message))
# 取消异步任务只有处于PENDING状态的任务才可以取消
def cancel_task(task):
rsp = ImageSynthesis.cancel(task)
print(rsp)
if rsp.status_code == HTTPStatus.OK:
print(rsp.output.task_status)
else:
print('取消任务失败, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
if __name__ == '__main__':
async_call()