from http import HTTPStatus from urllib.parse import urlparse, unquote from pathlib import PurePosixPath import requests from dashscope import ImageSynthesis import os from Config.Config import ALY_LLM_API_KEY prompt = "一副典雅庄重的对联悬挂于厅堂之中,房间是个安静古典的中式布置,桌子上放着一些青花瓷,对联上左书“义本生知人机同道善思新”,右书“通云赋智乾坤启数高志远”, 横批“智启通义”,字体飘逸,中间挂在一着一副中国风的画作,内容是岳阳楼。" api_key = ALY_LLM_API_KEY def async_call(): print('----创建任务----') task_info = create_async_task() print('----等待任务结束返回图像url----') wait_async_task(task_info) # 创建异步任务 def create_async_task(): rsp = ImageSynthesis.async_call(api_key=api_key, model="qwen-image", prompt=prompt, n=1, size='1328*1328') print(rsp) if rsp.status_code == HTTPStatus.OK: print(rsp.output) else: print('创建任务失败, status_code: %s, code: %s, message: %s' % (rsp.status_code, rsp.code, rsp.message)) return rsp # 等待异步任务结束 def wait_async_task(task): rsp = ImageSynthesis.wait(task) print(rsp) if rsp.status_code == HTTPStatus.OK: print(rsp.output) for result in rsp.output.results: file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1] with open('./%s' % file_name, 'wb+') as f: f.write(requests.get(result.url).content) else: print('任务执行失败, status_code: %s, code: %s, message: %s' % (rsp.status_code, rsp.code, rsp.message)) # 获取异步任务信息 def fetch_task_status(task): status = ImageSynthesis.fetch(task) print(status) if status.status_code == HTTPStatus.OK: print(status.output.task_status) else: print('获取任务失败, status_code: %s, code: %s, message: %s' % (status.status_code, status.code, status.message)) # 取消异步任务,只有处于PENDING状态的任务才可以取消 def cancel_task(task): rsp = ImageSynthesis.cancel(task) print(rsp) if rsp.status_code == HTTPStatus.OK: print(rsp.output.task_status) else: print('取消任务失败, status_code: %s, code: %s, message: %s' % (rsp.status_code, rsp.code, rsp.message)) if __name__ == '__main__': async_call()