Files
dsProject/dsLightRag/KeLing/KlTxt2Img.py
2025-08-20 07:38:08 +08:00

187 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import logging
import requests
import json
from KeLing.Kit.KlCommon import KlCommon
from KeLing.Kit.KlErrorCode import KlErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlTxt2Img(KlCommon):
BASE_URL = "https://api.klingai.com"
GENERATION_PATH = "/v1/images/generations"
QUERY_PATH = "/v1/images/generations/"
@staticmethod
def generate_image(prompt, model_name):
"""
生成图片
:param prompt: 提示词
:param model_name: 模型名称枚举值kling-v1, kling-v1-5, kling-v2
:return: 任务ID
:raises Exception: 异常信息
"""
# 获取JWT令牌
jwt = KlCommon.get_jwt()
# 创建请求体
request_body = {
"model_name": model_name,
"prompt": prompt
}
# 发送POST请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt}"
}
url = f"{KlTxt2Img.BASE_URL}{KlTxt2Img.GENERATION_PATH}"
log.info(f"生成图片请求体:{json.dumps(request_body)}")
try:
response = requests.post(url, headers=headers, json=request_body)
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"请求失败,状态码:{response.status_code}")
# 解析响应
response_body = response.text
try:
response_json = json.loads(response_body)
except json.JSONDecodeError as e:
raise Exception(f"响应解析失败:{str(e)}")
log.info(f"生成图片响应:{response_body}")
# 检查响应状态
code = response_json.get("code")
if code != 0:
message = response_json.get("message", "未知错误")
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"生成图片失败:[{code}] {message} - {solution}"
# 特殊处理资源包耗尽的情况
if code == KlErrorCode.RESOURCE_EXHAUSTED.get_code():
log.error("可灵AI资源包已耗尽请充值后再试")
raise Exception("可灵AI资源包已耗尽请充值后再试")
raise Exception(error_msg)
# 获取任务ID
task_id = response_json.get("data", {}).get("task_id")
if not task_id:
raise Exception("未找到任务ID")
log.info(f"生成图片任务ID{task_id}")
return task_id
except requests.RequestException as e:
raise Exception(f"网络请求异常:{str(e)}")
@staticmethod
def query_task_status(task_id):
"""
查询任务状态
:param task_id: 任务ID
:return: 任务结果
:raises Exception: 异常信息
"""
return KlCommon.query_task_status(task_id, KlTxt2Img.QUERY_PATH, "文生图")
if __name__ == "__main__":
try:
# 提示词和模型名称
prompt = "一只可爱的小猫咪在草地上玩耍,阳光明媚"
model_name = "kling-v1" # 可选kling-v1, kling-v1-5, kling-v2
save_image_path = f"{KlCommon.base_path}KeLing_Txt_2_Image.png"
# 添加重试逻辑
generate_retry_count = 0
max_generate_retries = 5 # 最大重试次数
generate_retry_interval = 5000 # 重试间隔(毫秒)
task_id = None
account_issue = False
while not account_issue:
try:
task_id = KlTxt2Img.generate_image(prompt, model_name)
break
except Exception as e:
log.error(f"生成图片异常: {str(e)}", exc_info=True)
# 检查是否是账户问题
error_msg = str(e)
if "资源包已耗尽" in error_msg or
"账户欠费" in error_msg or
"无权限" in error_msg:
log.error("账户问题,停止重试")
account_issue = True
else:
generate_retry_count += 1
if generate_retry_count < max_generate_retries:
log.warn(f"等待{generate_retry_interval}毫秒后重试...")
time.sleep(generate_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if task_id is None:
if account_issue:
log.error("账户问题,请检查账户状态或充值后再试")
else:
log.error(f"生成图片失败,已达到最大重试次数: {max_generate_retries}")
exit(1)
# 查询任务状态
query_retry_count = 0
max_query_retries = 1000 # 最大查询次数
query_retry_interval = 3000 # 查询间隔(毫秒)
while query_retry_count < max_query_retries:
try:
result = KlTxt2Img.query_task_status(task_id)
data = result.get("data", {})
task_status = data.get("task_status")
if task_status == "failed":
task_status_msg = data.get("task_status_msg", "未知错误")
log.error(f"任务失败: {task_status_msg}")
break
elif task_status == "succeed":
# 获取图片URL
task_result = data.get("task_result", {})
images = task_result.get("images", [])
for image in images:
index = image.get("index")
image_url = image.get("url")
# 下载图片
log.info("开始下载图片...")
KlCommon.download_file(image_url, save_image_path)
log.info(f"图片已下载到: {save_image_path}")
break
else:
log.info(f"任务状态: {task_status}, 等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
query_retry_count += 1
except Exception as e:
log.error(f"查询任务状态异常: {str(e)}", exc_info=True)
query_retry_count += 1
if query_retry_count < max_query_retries:
log.warn(f"等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if query_retry_count >= max_query_retries:
log.error(f"任务查询超时,已达到最大查询次数: {max_query_retries}")
except Exception as e:
log.error(f"程序执行异常: {str(e)}", exc_info=True)