import json import time import logging from JiMeng.Kit.JmCommon import JmCommon from JiMeng.Kit.JmErrorCode import JmErrorCode # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('JmTxt2Video') class JmTxt2Video(JmCommon): action = "CVSync2AsyncSubmitTask" req_key = "jimeng_vgfm_t2v_l20" @staticmethod def generate_video(prompt): """ 生成视频 :param prompt: 提示词 :return: 任务结果 :raises Exception: 异常信息 """ req = { "req_key": JmTxt2Video.req_key, "prompt": prompt } response_body = JmCommon.do_request( method="POST", query_list={}, # 添加空字典作为query_list参数 headers={}, body=json.dumps(req).encode('utf-8'), action=JmTxt2Video.action ) return json.loads(response_body) def main(): try: # 示例提示词 # prompt = "蓝色毛绒玩具在超市里拖地,结果拖把洒出好多五颜六色的粉末,接着把粉末洒向镜头前,镜头随之穿过粉末" prompt = "第一个镜头-全景:身穿宇航服的狗狗左右环顾第二个镜头-近景:宇航员狗狗向操作台走去第三个镜头-特写:宇航员狗狗伸手去按操作界" # 保存的文件名称 mp4_file_name = "Text2Video.mp4" # 添加重试逻辑,处理API并发限制错误 jo = None generate_retry_count = 0 max_generate_retries = 1000 # 最大重试次数 generate_retry_interval = 5000 # 重试间隔(毫秒) while generate_retry_count < max_generate_retries: try: jo = JmTxt2Video.generate_video(prompt) logger.info(f"结果: {jo}") code = jo.get("code") if code == JmErrorCode.API_CONCURRENT_LIMIT[0]: logger.warning(f"API并发限制,等待{generate_retry_interval}毫秒后重试...") time.sleep(generate_retry_interval / 1000) generate_retry_count += 1 continue elif not JmErrorCode.is_success(code): logger.error(f"生成视频失败: 错误码={code}, 错误信息={JmErrorCode.get_message_by_code(code)}") return # 成功获取结果,跳出循环 break except Exception as e: logger.error(f"生成视频异常: {str(e)}", exc_info=True) generate_retry_count += 1 if generate_retry_count < max_generate_retries: logger.warning(f"等待{generate_retry_interval}毫秒后重试...") time.sleep(generate_retry_interval / 1000) else: raise # 达到最大重试次数,抛出异常 if generate_retry_count >= max_generate_retries: logger.error(f"生成视频失败,已达到最大重试次数: {max_generate_retries}") return task_id = jo.get("data", {}).get("task_id") logger.info(f"任务ID: {task_id}") # 检查任务是不是已经结束 retry_count = 0 max_retries = 1000 # 最大重试次数 retry_interval = 3000 # 重试间隔(毫秒) while retry_count < max_retries: result = JmCommon.query_task_result(task_id) logger.info(f"查询结果: {result}") code = result.get("code") if not JmErrorCode.is_success(code): logger.error(f"查询失败: 错误码={code}, 错误信息={JmErrorCode.get_message_by_code(code)}") break data = result.get("data") if data and data.get("video_url"): video_url = data.get("video_url") logger.info(f"视频地址:{video_url}") # 下载mp4 save_video_path = f"{JmCommon.base_path}{mp4_file_name}" logger.info("开始下载视频...") JmCommon.download_file(video_url, save_video_path) logger.info(f"视频已下载到: {save_video_path}") break else: logger.info(f"任务处理中,等待{retry_interval}毫秒后重试...") time.sleep(retry_interval / 1000) retry_count += 1 if retry_count >= max_retries: logger.error(f"任务查询超时,已达到最大重试次数: {max_retries}") except Exception as e: logger.error(f"程序执行异常: {str(e)}", exc_info=True) if __name__ == "__main__": main()