Files
dsProject/dsLightRag/JiMeng/Kit/JmImg2VideoUtil.py

182 lines
7.1 KiB
Python
Raw Normal View History

2025-08-21 09:57:14 +08:00
import json
import time
import logging
from JiMeng.Kit.JmCommon import JmCommon
from JiMeng.Kit.JmErrorCode import JmErrorCode
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('JmImg2Video')
class JmImg2Video(JmCommon):
req_key = "jimeng_vgfm_i2v_l20"
action = "CVSync2AsyncSubmitTask"
default_max_retries = 10000 # 最大查询次数
default_retry_interval = 3000 # 查询间隔(毫秒)
def __init__(self, max_retries=None, retry_interval=None):
"""
初始化JmImg2Video实例
:param max_retries: 最大查询次数默认使用类的默认值
:param retry_interval: 查询间隔毫秒默认使用类的默认值
"""
self.max_retries = max_retries if max_retries is not None else self.default_max_retries
self.retry_interval = retry_interval if retry_interval is not None else self.default_retry_interval
def create_video_task(self, image_urls, prompt):
"""
根据首帧图片和分镜头脚本生成视频任务
:param image_urls: 图片URL列表
:param prompt: 提示词分镜头脚本
:return: 任务ID
:raises Exception: 任务创建失败时抛出异常
"""
# 创建请求体
req = {
"req_key": self.req_key,
"image_urls": image_urls,
"prompt": prompt
}
try:
response_body = self.do_request(
method="POST",
query_list={},
body=json.dumps(req).encode('utf-8'),
action=self.action
)
result = json.loads(response_body)
code = result.get("code")
if not JmErrorCode.is_success(code):
error_msg = f"提交任务失败: 错误码={code}, 错误信息={JmErrorCode.get_message_by_code(code)}"
logger.error(error_msg)
raise Exception(error_msg)
# 获取任务ID
task_id = result.get("data", {}).get("task_id")
if not task_id:
error_msg = "提交任务成功但未返回任务ID"
logger.error(error_msg)
raise Exception(error_msg)
logger.info(f"任务创建成功任务ID: {task_id}")
return task_id
except Exception as e:
logger.error(f"创建视频任务异常: {str(e)}", exc_info=True)
raise
def query_task_status(self, task_id):
"""
根据任务ID查询视频生成状态
:param task_id: 视频生成任务ID
:return: 字典包含任务状态信息
- status: 'completed' 表示完成, 'processing' 表示处理中, 'failed' 表示失败
- video_url: 视频URL如果任务完成
- error_msg: 错误信息如果任务失败
"""
try:
result = self.query_task_result(task_id)
code = result.get("code")
if not JmErrorCode.is_success(code):
error_msg = f"查询任务失败: 错误码={code}, 错误信息={JmErrorCode.get_message_by_code(code)}"
logger.error(error_msg)
return {
"status": "failed",
"error_msg": error_msg
}
data = result.get("data", {})
if data and data.get("video_url"):
video_url = data.get("video_url")
logger.info(f"任务已完成,视频地址: {video_url}")
return {
"status": "completed",
"video_url": video_url
}
else:
logger.info(f"任务处理中任务ID: {task_id}")
return {
"status": "processing"
}
except Exception as e:
logger.error(f"查询任务状态异常: {str(e)}", exc_info=True)
return {
"status": "failed",
"error_msg": str(e)
}
def wait_for_task_completion(self, task_id, download_path=None):
"""
等待任务完成并可选地下载视频
:param task_id: 视频生成任务ID
:param download_path: 视频下载路径如果为None则不下载
:return: 视频URL如果任务成功完成
:raises Exception: 任务失败或超时
"""
query_retry_count = 0
while query_retry_count < self.max_retries:
status_info = self.query_task_status(task_id)
if status_info["status"] == "completed":
video_url = status_info["video_url"]
if download_path:
logger.info(f"开始下载视频到: {download_path}")
self.download_file(video_url, download_path)
logger.info(f"视频已下载完成")
return video_url
elif status_info["status"] == "failed":
raise Exception(f"任务失败: {status_info.get('error_msg', '未知错误')}")
else:
logger.info(f"任务处理中,等待{self.retry_interval}毫秒后重试...")
time.sleep(self.retry_interval / 1000)
query_retry_count += 1
error_msg = f"任务查询超时,已达到最大查询次数: {self.max_retries}"
logger.error(error_msg)
raise Exception(error_msg)
def main():
try:
# 示例用法
image_urls = [
"https://dsideal.obs.cn-north-1.myhuaweicloud.com/HuangHai/Backup/Text2Img.jpg"
]
prompt = """
|镜号|运镜|画面内容|
| :---: | :---: | --- |
|1|固定镜头|穿着黄色安全帽反光背心的工人背着小女孩左手抱着小熊玩偶两人脸上洋溢着幸福的笑容背景是老旧居民楼和湿漉漉的路面|
|2|推镜头|镜头从两人全身慢慢推近至两人上半身聚焦两人亲密温馨的神态|
|3|平移镜头|镜头从两人正面平移至侧面展现工人的朴实与小女孩的可爱|
|4|拉镜头|镜头逐渐拉远展现两人在居民楼间道路上的整体场景|
|5|固定镜头|定格在两人互动的画面突出亲情的温暖氛围|
"""
# 创建实例
img2video = JmImg2Video(max_retries=100, retry_interval=2000)
# 创建视频任务
task_id = img2video.create_video_task(image_urls, prompt)
print(f"任务ID: {task_id}")
# 等待任务完成并下载视频
mp4_file_name = "image2video.mp4"
save_video_path = fr"D:\dsWork\dsProject\dsLightRag\JiMeng\{mp4_file_name}"
video_url = img2video.wait_for_task_completion(task_id, save_video_path)
print(f"视频生成完成,地址: {video_url}")
except Exception as e:
logger.error(f"程序执行异常: {str(e)}", exc_info=True)
if __name__ == "__main__":
main()