This commit is contained in:
2025-08-20 07:38:08 +08:00
parent 4ba9171718
commit a503acdae2
7 changed files with 989 additions and 0 deletions

View File

@@ -0,0 +1,9 @@
# 可灵AI使用指南图片生成
https://docs.qingque.cn/d/home/eZQApd0EZqQHWXBEi7lL16_lD?identityId=26L1FFNIZ7r#section=h.60mmflz96wnl
# 可灵控制台
https://console-cn.klingai.com/console/expense-center/resource-pack-manage
# 登录
18686619970
获取验证码登录

View File

@@ -0,0 +1,123 @@
import os
import json
import logging
import time
from datetime import datetime, timedelta
import requests
import jwt
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlCommon:
# 获取项目根目录路径
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../../..')).replace('\\', '/') + '/dsAi'
# 拼接相对路径
base_path = project_root + '/src/main/java/com/dsideal/aiSupport/Util/KeLing/Example/'
ak = 'c992fd02624d4900a93ca3b6da03d9e9' # 填写access key
sk = 'b37f67a00eb44f9bb57e4d530c328e1d' # 填写secret key
@staticmethod
def get_jwt():
try:
# 有效时间,此处示例代表当前时间+1800s(30min)
expired_at = datetime.utcnow() + timedelta(seconds=1800)
# 开始生效的时间,此处示例代表当前时间-5秒
not_before = datetime.utcnow() - timedelta(seconds=5)
# 构建JWT令牌
payload = {
'iss': KlCommon.ak,
'exp': expired_at,
'nbf': not_before
}
# 使用HS256算法签名
jwt_token = jwt.encode(payload, KlCommon.sk, algorithm='HS256', headers={'alg': 'HS256'})
return jwt_token
except Exception as e:
log.error(f"获取JWT令牌失败: {str(e)}")
return None
@staticmethod
def download_file(file_url, save_file_path):
"""
从URL下载文件到指定路径
@param file_url: 文件URL
@param save_file_path: 保存路径
@throws Exception: 下载过程中的异常
"""
try:
# 发送GET请求下载文件
response = requests.get(file_url, stream=True)
response.raise_for_status() # 检查响应状态码
# 保存文件
with open(save_file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
file_size = os.path.getsize(save_file_path)
log.info(f"文件下载成功,保存路径: {save_file_path}, 文件大小: {file_size}字节")
except Exception as e:
log.error(f"文件下载失败: {str(e)}")
raise Exception(f"文件下载失败: {str(e)}") from e
@staticmethod
def query_task_status(task_id, query_path, log_prefix):
"""
通用查询任务状态方法
@param task_id: 任务ID
@param query_path: 查询路径
@param log_prefix: 日志前缀,用于区分不同类型的查询
@return: 任务结果
@throws Exception: 异常信息
"""
# 获取JWT令牌
jwt_token = KlCommon.get_jwt()
if not jwt_token:
raise Exception("获取JWT令牌失败")
# 构建请求URL
url = f"https://api.klingai.com{query_path}{task_id}"
# 设置请求头
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {jwt_token}'
}
# 发送GET请求
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # 检查响应状态码
# 解析响应
response_body = response.text
response_json = json.loads(response_body)
log.info(f"{log_prefix}查询任务状态响应:{response_body}")
# 检查响应状态
code = response_json.get('code')
if code != 0:
message = response_json.get('message', '')
from .KlErrorCode import KlErrorCode
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"{log_prefix}查询任务状态失败:[{code}] {message} - {solution}"
raise Exception(error_msg)
return response_json
except requests.exceptions.RequestException as e:
log.error(f"{log_prefix}查询任务状态请求异常: {str(e)}")
raise Exception(f"{log_prefix}查询任务状态请求异常: {str(e)}") from e
except json.JSONDecodeError as e:
log.error(f"{log_prefix}解析响应JSON失败: {str(e)}")
raise Exception(f"{log_prefix}解析响应JSON失败: {str(e)}") from e
except Exception as e:
log.error(f"{log_prefix}查询任务状态发生未知异常: {str(e)}")
raise

View File

@@ -0,0 +1,90 @@
from enum import Enum
class KlErrorCode(Enum):
"""可灵AI API错误码枚举"""
SUCCESS = (0, "请求成功", "请求成功")
# 身份验证错误 (401)
AUTH_FAILED = (1000, "身份验证失败", "检查Authorization是否正确")
AUTH_EMPTY = (1001, "Authorization为空", "在RequestHeader中填写正确的Authorization")
AUTH_INVALID = (1002, "Authorization值非法", "在RequestHeader中填写正确的Authorization")
AUTH_NOT_EFFECTIVE = (1003, "Authorization未到有效时间", "检查token的开始生效时间等待生效或重新签发")
AUTH_EXPIRED = (1004, "Authorization已失效", "检查token的有效期重新签发")
# 账户异常 (429)
ACCOUNT_EXCEPTION = (1100, "账户异常", "检查账户配置信息")
ACCOUNT_ARREARS = (1101, "账户欠费 (后付费场景)", "进行账户充值,确保余额充足")
RESOURCE_EXHAUSTED = (1102, "资源包已用完/已过期(预付费场景)", "购买额外的资源包,或开通后付费服务(如有)")
# 权限错误 (403)
NO_PERMISSION = (1103, "请求的资源无权限,如接口/模型", "检查账户权限")
# 请求参数非法 (400/404)
PARAM_INVALID = (1200, "请求参数非法", "检查请求参数是否正确")
PARAM_ERROR = (1201, "参数非法如key写错或value非法", "参考返回体中message字段的具体信息修改请求参数")
METHOD_INVALID = (1202, "请求的method无效", "查看接口文档使用正确的requestmethod")
RESOURCE_NOT_EXIST = (1203, "请求的资源不存在,如模型", "参考返回体中message字段的具体信息修改请求参数")
# 触发策略 (400/429)
POLICY_TRIGGERED = (1300, "触发平台策略", "检查是否触发平台策略")
CONTENT_SECURITY = (1301, "触发平台的内容安全策略", "检查输入内容,修改后重新发起请求")
RATE_LIMIT = (1302, "API请求过快超过平台速率限制", "降低请求频率、稍后重试,或联系客服增加限额")
CONCURRENT_LIMIT = (1303, "并发或QPS超出预付费资源包限制", "降低请求频率、稍后重试,或联系客服增加限额")
IP_WHITELIST = (1304, "触发平台的IP白名单策略", "联系客服")
# 内部错误 (500/503/504)
INTERNAL_ERROR = (5000, "服务器内部错误", "稍后重试,或联系客服")
SERVICE_UNAVAILABLE = (5001, "服务器暂时不可用,通常是在维护", "稍后重试,或联系客服")
INTERNAL_TIMEOUT = (5002, "服务器内部超时,通常是发生积压", "稍后重试,或联系客服")
def __init__(self, code, message, solution):
self.code = code
self.message = message
self.solution = solution
@staticmethod
def get_by_code(code):
"""根据错误码获取枚举实例"""
for error_code in KlErrorCode:
if error_code.code == code:
return error_code
return None
@staticmethod
def get_message_by_code(code):
"""获取错误消息"""
error_code = KlErrorCode.get_by_code(code)
return error_code.message if error_code else "未知错误"
@staticmethod
def get_solution_by_code(code):
"""获取解决方案"""
error_code = KlErrorCode.get_by_code(code)
return error_code.solution if error_code else "请联系客服"
@staticmethod
def is_success(code):
"""检查是否成功"""
return code == KlErrorCode.SUCCESS.code
@staticmethod
def is_retryable(code):
"""检查是否需要重试"""
return code in [
KlErrorCode.RATE_LIMIT.code,
KlErrorCode.CONCURRENT_LIMIT.code,
KlErrorCode.INTERNAL_ERROR.code,
KlErrorCode.SERVICE_UNAVAILABLE.code,
KlErrorCode.INTERNAL_TIMEOUT.code
]
@staticmethod
def is_account_issue(code):
"""检查是否是账户问题"""
return code in [
KlErrorCode.ACCOUNT_EXCEPTION.code,
KlErrorCode.ACCOUNT_ARREARS.code,
KlErrorCode.RESOURCE_EXHAUSTED.code,
KlErrorCode.NO_PERMISSION.code
]

View File

@@ -0,0 +1,200 @@
import time
import logging
import requests
import json
from datetime import datetime
from KeLing.Kit.KlCommon import KlCommon
from KeLing.Kit.KlErrorCode import KlErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlAccount(KlCommon):
BASE_URL = "https://api.klingai.com"
ACCOUNT_COSTS_PATH = "/account/costs"
@staticmethod
def query_resource_packs(start_time, end_time, resource_pack_name=None):
"""
查询账号下资源包列表及余量
:param start_time: 查询的开始时间Unix时间戳、单位ms
:param end_time: 查询的结束时间Unix时间戳、单位ms
:param resource_pack_name: 资源包名称用于精准指定查询某个资源包可为None
:return: 资源包信息列表
:raises Exception: 异常信息
"""
# 获取JWT令牌
jwt = KlAccount.get_jwt()
# 构建URL
url = f"{KlAccount.BASE_URL}{KlAccount.ACCOUNT_COSTS_PATH}?start_time={start_time}&end_time={end_time}"
# 如果提供了资源包名称则添加到URL中
if resource_pack_name:
url += f"&resource_pack_name={resource_pack_name}"
# 发送GET请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt}"
}
log.info(f"查询账户资源包请求URL{url}")
response = requests.get(url, headers=headers)
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"请求失败,状态码:{response.status_code}")
# 解析响应
response_body = response.text
try:
response_json = json.loads(response_body)
except json.JSONDecodeError as e:
raise Exception(f"响应解析失败:{str(e)}")
log.info(f"查询账户资源包响应:{response_body}")
# 检查响应状态
code = response_json.get("code")
if code != 0:
message = response_json.get("message", "未知错误")
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"查询账户资源包失败:[{code}] {message} - {solution}"
raise Exception(error_msg)
# 解析资源包信息
resource_pack_info_list = []
data = response_json.get("data", {})
# 检查data中的code
data_code = data.get("code")
if data_code != 0:
data_msg = data.get("msg", "未知错误")
raise Exception(f"查询账户资源包失败:{data_msg}")
# 获取资源包列表
resource_pack_array = data.get("resource_pack_subscribe_infos", [])
for resource_pack_json in resource_pack_array:
resource_pack_info = ResourcePackInfo()
resource_pack_info.resource_pack_name = resource_pack_json.get("resource_pack_name")
resource_pack_info.resource_pack_id = resource_pack_json.get("resource_pack_id")
resource_pack_info.resource_pack_type = resource_pack_json.get("resource_pack_type")
resource_pack_info.total_quantity = resource_pack_json.get("total_quantity", 0.0)
resource_pack_info.remaining_quantity = resource_pack_json.get("remaining_quantity", 0.0)
resource_pack_info.purchase_time = resource_pack_json.get("purchase_time", 0)
resource_pack_info.effective_time = resource_pack_json.get("effective_time", 0)
resource_pack_info.invalid_time = resource_pack_json.get("invalid_time", 0)
resource_pack_info.status = resource_pack_json.get("status")
resource_pack_info_list.append(resource_pack_info)
return resource_pack_info_list
@staticmethod
def print_resource_pack_info(resource_pack_info_list):
"""
打印资源包信息
:param resource_pack_info_list: 资源包信息列表
"""
if not resource_pack_info_list:
log.info("未查询到资源包信息")
return
log.info("资源包信息列表:")
for info in resource_pack_info_list:
log.info("----------------------------------------")
log.info(f"资源包名称: {info.resource_pack_name}")
log.info(f"资源包ID: {info.resource_pack_id}")
log.info(f"资源包类型: {KlAccount.get_resource_pack_type_desc(info.resource_pack_type)}")
log.info(f"总量: {info.total_quantity}")
log.info(f"余量: {info.remaining_quantity}")
log.info(f"购买时间: {KlAccount.format_timestamp(info.purchase_time)}")
log.info(f"生效时间: {KlAccount.format_timestamp(info.effective_time)}")
log.info(f"失效时间: {KlAccount.format_timestamp(info.invalid_time)}")
log.info(f"状态: {KlAccount.get_status_desc(info.status)}")
log.info("----------------------------------------")
@staticmethod
def get_resource_pack_type_desc(type_str):
"""
获取资源包类型描述
:param type_str: 资源包类型
:return: 资源包类型描述
"""
if type_str == "decreasing_total":
return "总量递减型"
elif type_str == "constant_period":
return "周期恒定型"
else:
return type_str
@staticmethod
def get_status_desc(status):
"""
获取资源包状态描述
:param status: 资源包状态
:return: 资源包状态描述
"""
status_map = {
"toBeOnline": "待生效",
"online": "生效中",
"expired": "已到期",
"runOut": "已用完"
}
return status_map.get(status, status)
@staticmethod
def format_timestamp(timestamp):
"""
格式化时间戳
:param timestamp: 时间戳(毫秒)
:return: 格式化后的时间字符串
"""
if timestamp == 0:
return "N/A"
# 转换毫秒时间戳为秒
seconds = timestamp / 1000
return datetime.fromtimestamp(seconds).strftime("%Y-%m-%d %H:%M:%S")
class ResourcePackInfo:
"""资源包信息类"""
def __init__(self):
self.resource_pack_name = None # 资源包名称
self.resource_pack_id = None # 资源包ID
self.resource_pack_type = None # 资源包类型
self.total_quantity = 0.0 # 总量
self.remaining_quantity = 0.0 # 余量
self.purchase_time = 0 # 购买时间
self.effective_time = 0 # 生效时间
self.invalid_time = 0 # 失效时间
self.status = None # 资源包状态
if __name__ == "__main__":
try:
# 获取当前时间
current_time = int(time.time() * 1000) # 毫秒时间戳
# 设置查询时间范围为过去30天到当前
startTime = current_time - 30 * 24 * 60 * 60 * 1000
endTime = current_time
# 查询所有资源包
resource_pack_info_list = KlAccount.query_resource_packs(startTime, endTime)
# 打印资源包信息
KlAccount.print_resource_pack_info(resource_pack_info_list)
# 如果需要查询特定资源包,可以使用以下代码
# specific_pack_name = "视频生成-10000条"
# specific_resource_pack_info_list = KlAccount.query_resource_packs(startTime, endTime, specific_pack_name)
# KlAccount.print_resource_pack_info(specific_resource_pack_info_list)
except Exception as e:
log.error(f"查询账户资源包异常: {str(e)}", exc_info=True)

View File

@@ -0,0 +1,190 @@
import time
import logging
import requests
import json
from KeLing.Kit.KlCommon import KlCommon
from KeLing.Kit.KlErrorCode import KlErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlImg2Video(KlCommon):
BASE_URL = "https://api.klingai.com"
GENERATION_PATH = "/v1/videos/image2video"
QUERY_PATH = "/v1/videos/image2video/"
@staticmethod
def generate_video(image_url, model_name):
"""
生成视频使用图片URL
:param image_url: 图片URL
:param model_name: 模型名称枚举值kling-v1, kling-v1-5, kling-v1-6
:return: 任务ID
:raises Exception: 异常信息
"""
# 获取JWT令牌
jwt = KlCommon.get_jwt()
# 创建请求体
request_body = {
"model_name": model_name,
"image": image_url
}
# 发送POST请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt}"
}
url = f"{KlImg2Video.BASE_URL}{KlImg2Video.GENERATION_PATH}"
log.info(f"生成视频请求体:{json.dumps(request_body)}")
try:
response = requests.post(url, headers=headers, json=request_body)
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"请求失败,状态码:{response.status_code}")
# 解析响应
response_body = response.text
try:
response_json = json.loads(response_body)
except json.JSONDecodeError as e:
raise Exception(f"响应解析失败:{str(e)}")
log.info(f"生成视频响应:{response_body}")
# 检查响应状态
code = response_json.get("code")
if code != 0:
message = response_json.get("message", "未知错误")
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"生成视频失败:[{code}] {message} - {solution}"
# 特殊处理资源包耗尽的情况
if code == KlErrorCode.RESOURCE_EXHAUSTED.get_code():
log.error("可灵AI资源包已耗尽请充值后再试")
raise Exception("可灵AI资源包已耗尽请充值后再试")
raise Exception(error_msg)
# 获取任务ID
task_id = response_json.get("data", {}).get("task_id")
if not task_id:
raise Exception("未找到任务ID")
log.info(f"生成视频任务ID{task_id}")
return task_id
except requests.RequestException as e:
raise Exception(f"网络请求异常:{str(e)}")
@staticmethod
def query_task_status(task_id):
"""
查询任务状态
:param task_id: 任务ID
:return: 任务结果
:raises Exception: 异常信息
"""
return KlCommon.query_task_status(task_id, KlImg2Video.QUERY_PATH, "图生视频")
if __name__ == "__main__":
try:
# 图片URL和模型名称
image_url = "https://dsideal.obs.cn-north-1.myhuaweicloud.com/HuangHai/BlogImages/202505131058596.png" # 替换为实际可访问的图片URL
model_name = "kling-v1" # 可选kling-v1, kling-v1-5, kling-v1-6
# 添加重试逻辑
generate_retry_count = 0
max_generate_retries = 1000 # 最大重试次数
generate_retry_interval = 5000 # 重试间隔(毫秒)
task_id = None
account_issue = False
while not account_issue:
try:
task_id = KlImg2Video.generate_video(image_url, model_name)
break
except Exception as e:
log.error(f"生成视频异常: {str(e)}", exc_info=True)
# 检查是否是账户问题
error_msg = str(e)
if "资源包已耗尽" in error_msg or
"账户欠费" in error_msg or
"无权限" in error_msg:
log.error("账户问题,停止重试")
account_issue = True
else:
generate_retry_count += 1
if generate_retry_count < max_generate_retries:
log.warn(f"等待{generate_retry_interval}毫秒后重试...")
time.sleep(generate_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if task_id is None:
if account_issue:
log.error("账户问题,请检查账户状态或充值后再试")
else:
log.error(f"生成视频失败,已达到最大重试次数: {max_generate_retries}")
exit(1)
# 查询任务状态
query_retry_count = 0
max_query_retries = 1000 # 最大查询次数
query_retry_interval = 5000 # 查询间隔(毫秒)
while query_retry_count < max_query_retries:
try:
result = KlImg2Video.query_task_status(task_id)
data = result.get("data", {})
task_status = data.get("task_status")
if task_status == "failed":
task_status_msg = data.get("task_status_msg", "未知错误")
log.error(f"任务失败: {task_status_msg}")
break
elif task_status == "succeed":
# 获取视频URL
task_result = data.get("task_result", {})
videos = task_result.get("videos", [])
for video in videos:
video_id = video.get("id")
video_url = video.get("url")
duration = video.get("duration")
log.info(f"视频ID: {video_id}, 时长: {duration}")
# 下载视频
save_video_path = f"{KlCommon.base_path}image2video_{video_id}.mp4"
log.info("开始下载视频...")
KlCommon.download_file(video_url, save_video_path)
log.info(f"视频已下载到: {save_video_path}")
break
else:
log.info(f"任务状态: {task_status}, 等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
query_retry_count += 1
except Exception as e:
log.error(f"查询任务状态异常: {str(e)}", exc_info=True)
query_retry_count += 1
if query_retry_count < max_query_retries:
log.warn(f"等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if query_retry_count >= max_query_retries:
log.error(f"任务查询超时,已达到最大查询次数: {max_query_retries}")
except Exception as e:
log.error(f"程序执行异常: {str(e)}", exc_info=True)

View File

@@ -0,0 +1,187 @@
import time
import logging
import requests
import json
from KeLing.Kit.KlCommon import KlCommon
from KeLing.Kit.KlErrorCode import KlErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlTxt2Img(KlCommon):
BASE_URL = "https://api.klingai.com"
GENERATION_PATH = "/v1/images/generations"
QUERY_PATH = "/v1/images/generations/"
@staticmethod
def generate_image(prompt, model_name):
"""
生成图片
:param prompt: 提示词
:param model_name: 模型名称枚举值kling-v1, kling-v1-5, kling-v2
:return: 任务ID
:raises Exception: 异常信息
"""
# 获取JWT令牌
jwt = KlCommon.get_jwt()
# 创建请求体
request_body = {
"model_name": model_name,
"prompt": prompt
}
# 发送POST请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt}"
}
url = f"{KlTxt2Img.BASE_URL}{KlTxt2Img.GENERATION_PATH}"
log.info(f"生成图片请求体:{json.dumps(request_body)}")
try:
response = requests.post(url, headers=headers, json=request_body)
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"请求失败,状态码:{response.status_code}")
# 解析响应
response_body = response.text
try:
response_json = json.loads(response_body)
except json.JSONDecodeError as e:
raise Exception(f"响应解析失败:{str(e)}")
log.info(f"生成图片响应:{response_body}")
# 检查响应状态
code = response_json.get("code")
if code != 0:
message = response_json.get("message", "未知错误")
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"生成图片失败:[{code}] {message} - {solution}"
# 特殊处理资源包耗尽的情况
if code == KlErrorCode.RESOURCE_EXHAUSTED.get_code():
log.error("可灵AI资源包已耗尽请充值后再试")
raise Exception("可灵AI资源包已耗尽请充值后再试")
raise Exception(error_msg)
# 获取任务ID
task_id = response_json.get("data", {}).get("task_id")
if not task_id:
raise Exception("未找到任务ID")
log.info(f"生成图片任务ID{task_id}")
return task_id
except requests.RequestException as e:
raise Exception(f"网络请求异常:{str(e)}")
@staticmethod
def query_task_status(task_id):
"""
查询任务状态
:param task_id: 任务ID
:return: 任务结果
:raises Exception: 异常信息
"""
return KlCommon.query_task_status(task_id, KlTxt2Img.QUERY_PATH, "文生图")
if __name__ == "__main__":
try:
# 提示词和模型名称
prompt = "一只可爱的小猫咪在草地上玩耍,阳光明媚"
model_name = "kling-v1" # 可选kling-v1, kling-v1-5, kling-v2
save_image_path = f"{KlCommon.base_path}KeLing_Txt_2_Image.png"
# 添加重试逻辑
generate_retry_count = 0
max_generate_retries = 5 # 最大重试次数
generate_retry_interval = 5000 # 重试间隔(毫秒)
task_id = None
account_issue = False
while not account_issue:
try:
task_id = KlTxt2Img.generate_image(prompt, model_name)
break
except Exception as e:
log.error(f"生成图片异常: {str(e)}", exc_info=True)
# 检查是否是账户问题
error_msg = str(e)
if "资源包已耗尽" in error_msg or
"账户欠费" in error_msg or
"无权限" in error_msg:
log.error("账户问题,停止重试")
account_issue = True
else:
generate_retry_count += 1
if generate_retry_count < max_generate_retries:
log.warn(f"等待{generate_retry_interval}毫秒后重试...")
time.sleep(generate_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if task_id is None:
if account_issue:
log.error("账户问题,请检查账户状态或充值后再试")
else:
log.error(f"生成图片失败,已达到最大重试次数: {max_generate_retries}")
exit(1)
# 查询任务状态
query_retry_count = 0
max_query_retries = 1000 # 最大查询次数
query_retry_interval = 3000 # 查询间隔(毫秒)
while query_retry_count < max_query_retries:
try:
result = KlTxt2Img.query_task_status(task_id)
data = result.get("data", {})
task_status = data.get("task_status")
if task_status == "failed":
task_status_msg = data.get("task_status_msg", "未知错误")
log.error(f"任务失败: {task_status_msg}")
break
elif task_status == "succeed":
# 获取图片URL
task_result = data.get("task_result", {})
images = task_result.get("images", [])
for image in images:
index = image.get("index")
image_url = image.get("url")
# 下载图片
log.info("开始下载图片...")
KlCommon.download_file(image_url, save_image_path)
log.info(f"图片已下载到: {save_image_path}")
break
else:
log.info(f"任务状态: {task_status}, 等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
query_retry_count += 1
except Exception as e:
log.error(f"查询任务状态异常: {str(e)}", exc_info=True)
query_retry_count += 1
if query_retry_count < max_query_retries:
log.warn(f"等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if query_retry_count >= max_query_retries:
log.error(f"任务查询超时,已达到最大查询次数: {max_query_retries}")
except Exception as e:
log.error(f"程序执行异常: {str(e)}", exc_info=True)

View File

@@ -0,0 +1,190 @@
import time
import logging
import requests
import json
from KeLing.Kit.KlCommon import KlCommon
from KeLing.Kit.KlErrorCode import KlErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
class KlTxt2Video(KlCommon):
BASE_URL = "https://api.klingai.com"
GENERATION_PATH = "/v1/videos/text2video"
QUERY_PATH = "/v1/videos/text2video/"
@staticmethod
def generate_video(prompt, model_name):
"""
生成视频
:param prompt: 提示词
:param model_name: 模型名称枚举值kling-v1, kling-v1-6
:return: 任务ID
:raises Exception: 异常信息
"""
# 获取JWT令牌
jwt = KlCommon.get_jwt()
# 创建请求体
request_body = {
"model_name": model_name,
"prompt": prompt
}
# 发送POST请求
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {jwt}"
}
url = f"{KlTxt2Video.BASE_URL}{KlTxt2Video.GENERATION_PATH}"
log.info(f"生成视频请求体:{json.dumps(request_body)}")
try:
response = requests.post(url, headers=headers, json=request_body)
# 检查响应状态码
if response.status_code != 200:
raise Exception(f"请求失败,状态码:{response.status_code}")
# 解析响应
response_body = response.text
try:
response_json = json.loads(response_body)
except json.JSONDecodeError as e:
raise Exception(f"响应解析失败:{str(e)}")
log.info(f"生成视频响应:{response_body}")
# 检查响应状态
code = response_json.get("code")
if code != 0:
message = response_json.get("message", "未知错误")
solution = KlErrorCode.get_solution_by_code(code)
error_msg = f"生成视频失败:[{code}] {message} - {solution}"
# 特殊处理资源包耗尽的情况
if code == KlErrorCode.RESOURCE_EXHAUSTED.get_code():
log.error("可灵AI资源包已耗尽请充值后再试")
raise Exception("可灵AI资源包已耗尽请充值后再试")
raise Exception(error_msg)
# 获取任务ID
task_id = response_json.get("data", {}).get("task_id")
if not task_id:
raise Exception("未找到任务ID")
log.info(f"生成视频任务ID{task_id}")
return task_id
except requests.RequestException as e:
raise Exception(f"网络请求异常:{str(e)}")
@staticmethod
def query_task_status(task_id):
"""
查询任务状态
:param task_id: 任务ID
:return: 任务结果
:raises Exception: 异常信息
"""
return KlCommon.query_task_status(task_id, KlTxt2Video.QUERY_PATH, "文生视频")
if __name__ == "__main__":
try:
# 提示词和模型名称
prompt = "一只可爱的小猫咪在草地上奔跑,阳光明媚"
model_name = "kling-v1" # 可选kling-v1, kling-v1-6
# 添加重试逻辑
generate_retry_count = 0
max_generate_retries = 1000 # 最大重试次数
generate_retry_interval = 5000 # 重试间隔(毫秒)
task_id = None
account_issue = False
while not account_issue:
try:
task_id = KlTxt2Video.generate_video(prompt, model_name)
break
except Exception as e:
log.error(f"生成视频异常: {str(e)}", exc_info=True)
# 检查是否是账户问题
error_msg = str(e)
if "资源包已耗尽" in error_msg or
"账户欠费" in error_msg or
"无权限" in error_msg:
log.error("账户问题,停止重试")
account_issue = True
else:
generate_retry_count += 1
if generate_retry_count < max_generate_retries:
log.warn(f"等待{generate_retry_interval}毫秒后重试...")
time.sleep(generate_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if task_id is None:
if account_issue:
log.error("账户问题,请检查账户状态或充值后再试")
else:
log.error(f"生成视频失败,已达到最大重试次数: {max_generate_retries}")
exit(1)
# 查询任务状态
query_retry_count = 0
max_query_retries = 1000 # 最大查询次数
query_retry_interval = 5000 # 查询间隔(毫秒)
while query_retry_count < max_query_retries:
try:
result = KlTxt2Video.query_task_status(task_id)
data = result.get("data", {})
task_status = data.get("task_status")
if task_status == "failed":
task_status_msg = data.get("task_status_msg", "未知错误")
log.error(f"任务失败: {task_status_msg}")
break
elif task_status == "succeed":
# 获取视频URL
task_result = data.get("task_result", {})
videos = task_result.get("videos", [])
for video in videos:
video_id = video.get("id")
video_url = video.get("url")
duration = video.get("duration")
log.info(f"视频ID: {video_id}, 时长: {duration}")
# 下载视频
save_video_path = f"{KlCommon.base_path}video_{video_id}.mp4"
log.info("开始下载视频...")
KlCommon.download_file(video_url, save_video_path)
log.info(f"视频已下载到: {save_video_path}")
break
else:
log.info(f"任务状态: {task_status}, 等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
query_retry_count += 1
except Exception as e:
log.error(f"查询任务状态异常: {str(e)}", exc_info=True)
query_retry_count += 1
if query_retry_count < max_query_retries:
log.warn(f"等待{query_retry_interval}毫秒后重试...")
time.sleep(query_retry_interval / 1000) # 转换为秒
else:
raise e # 达到最大重试次数,抛出异常
if query_retry_count >= max_query_retries:
log.error(f"任务查询超时,已达到最大查询次数: {max_query_retries}")
except Exception as e:
log.error(f"程序执行异常: {str(e)}", exc_info=True)