import time import logging import requests import json from KeLing.Kit.KlCommon import KlCommon from KeLing.Kit.KlErrorCode import KlErrorCode # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') log = logging.getLogger(__name__) class KlTxt2Img(KlCommon): BASE_URL = "https://api.klingai.com" GENERATION_PATH = "/v1/images/generations" QUERY_PATH = "/v1/images/generations/" @staticmethod def generate_image(prompt, model_name): """ 生成图片 :param prompt: 提示词 :param model_name: 模型名称,枚举值:kling-v1, kling-v1-5, kling-v2 :return: 任务ID :raises Exception: 异常信息 """ # 获取JWT令牌 jwt = KlCommon.get_jwt() # 创建请求体 request_body = { "model_name": model_name, "prompt": prompt } # 发送POST请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {jwt}" } url = f"{KlTxt2Img.BASE_URL}{KlTxt2Img.GENERATION_PATH}" log.info(f"生成图片请求体:{json.dumps(request_body)}") try: # 添加超时设置,避免请求长时间挂起 response = requests.post(url, headers=headers, json=request_body, timeout=30) # 检查响应状态码 if response.status_code != 200: raise Exception(f"请求失败,状态码:{response.status_code}") # 解析响应 response_body = response.text try: response_json = json.loads(response_body) except json.JSONDecodeError as e: raise Exception(f"响应解析失败:{str(e)}") log.info(f"生成图片响应:{response_body}") # 检查响应状态 code = response_json.get("code") if code != 0: message = response_json.get("message", "未知错误") solution = KlErrorCode.get_solution_by_code(code) error_msg = f"生成图片失败:[{code}] {message} - {solution}" # 特殊处理资源包耗尽的情况 if code == KlErrorCode.RESOURCE_EXHAUSTED.get_code(): log.error("可灵AI资源包已耗尽,请充值后再试") raise Exception("可灵AI资源包已耗尽,请充值后再试") raise Exception(error_msg) # 获取任务ID task_id = response_json.get("data", {}).get("task_id") if not task_id: raise Exception("未找到任务ID") log.info(f"生成图片任务ID:{task_id}") return task_id except requests.RequestException as e: raise Exception(f"网络请求异常:{str(e)}") @staticmethod def query_task_status(task_id): """ 查询任务状态 :param task_id: 任务ID :return: 任务结果 :raises Exception: 异常信息 """ return KlCommon.query_task_status(task_id, KlTxt2Img.QUERY_PATH, "文生图") if __name__ == "__main__": try: # 提示词和模型名称 prompt = "一只可爱的小猫咪在草地上玩耍,阳光明媚" model_name = "kling-v1" # 可选:kling-v1, kling-v1-5, kling-v2 save_image_path = f"{KlCommon.base_path}\KeLing_Txt_2_Image.png" # 添加重试逻辑 generate_retry_count = 0 max_generate_retries = 5 # 最大重试次数 initial_retry_interval = 2000 # 初始重试间隔(毫秒) max_retry_interval = 30000 # 最大重试间隔(毫秒) task_id = None account_issue = False while generate_retry_count < max_generate_retries and not account_issue: try: task_id = KlTxt2Img.generate_image(prompt, model_name) break except Exception as e: log.error(f"生成图片异常: {str(e)}", exc_info=True) # 检查是否是账户问题 error_msg = str(e) if "资源包已耗尽" in error_msg or "账户欠费" in error_msg or "无权限" in error_msg: log.error("账户问题,停止重试") account_issue = True else: generate_retry_count += 1 if generate_retry_count < max_generate_retries: # 指数退避策略: 基础延迟 * 2^(重试次数-1),但不超过最大延迟 current_interval = min(initial_retry_interval * (2 ** (generate_retry_count - 1)), max_retry_interval) log.warning(f"等待{current_interval}毫秒后重试...") try: time.sleep(current_interval / 1000) # 转换为秒 except KeyboardInterrupt: log.error("用户中断了程序执行") exit(1) else: log.error(f"生成图片失败,已达到最大重试次数: {max_generate_retries}") if task_id is None: if account_issue: log.error("账户问题,请检查账户状态或充值后再试") else: log.error(f"生成图片失败,已达到最大重试次数: {max_generate_retries}") exit(1) # 查询任务状态 query_retry_count = 0 max_query_retries = 1000 # 最大查询次数 query_retry_interval = 3000 # 查询间隔(毫秒) while query_retry_count < max_query_retries: try: result = KlTxt2Img.query_task_status(task_id) data = result.get("data", {}) task_status = data.get("task_status") if task_status == "failed": task_status_msg = data.get("task_status_msg", "未知错误") log.error(f"任务失败: {task_status_msg}") break elif task_status == "succeed": # 获取图片URL task_result = data.get("task_result", {}) images = task_result.get("images", []) for image in images: index = image.get("index") image_url = image.get("url") # 下载图片 log.info("开始下载图片...") KlCommon.download_file(image_url, save_image_path) log.info(f"图片已下载到: {save_image_path}") break else: log.info(f"任务状态: {task_status}, 等待{query_retry_interval}毫秒后重试...") time.sleep(query_retry_interval / 1000) # 转换为秒 query_retry_count += 1 except Exception as e: log.error(f"查询任务状态异常: {str(e)}", exc_info=True) query_retry_count += 1 if query_retry_count < max_query_retries: log.warn(f"等待{query_retry_interval}毫秒后重试...") time.sleep(query_retry_interval / 1000) # 转换为秒 else: raise e # 达到最大重试次数,抛出异常 if query_retry_count >= max_query_retries: log.error(f"任务查询超时,已达到最大查询次数: {max_query_retries}") except Exception as e: log.error(f"程序执行异常: {str(e)}", exc_info=True)