Files
dsProject/dsLightRag/Routes/VideoRetalkRoute.py

169 lines
5.5 KiB
Python
Raw Normal View History

2025-09-02 06:55:13 +08:00
import datetime
import logging
import uuid
2025-09-02 08:58:30 +08:00
import os
import time
import requests
2025-09-02 06:55:13 +08:00
from typing import Optional
2025-09-02 08:58:30 +08:00
from fastapi import APIRouter, HTTPException, Query, Request, BackgroundTasks
2025-09-02 06:55:13 +08:00
from fastapi.responses import JSONResponse
from pydantic import BaseModel
2025-09-02 08:58:30 +08:00
from Util.ObsUtil import ObsUploader
from Config.Config import OBS_BUCKET, OBS_SERVER
2025-09-02 06:55:13 +08:00
from Config import Config
from Util.VideoRetalk import VideoRetalk
# 创建视频生成路由
router = APIRouter(prefix="/api/video", tags=["视频生成"])
# 配置日志
logger = logging.getLogger(__name__)
# 仅保留视频相关模型定义
class VideoRetalkRequest(BaseModel):
"""视频生成请求参数"""
image_url: str
audio_url: str
template_id: Optional[str] = "normal"
eye_move_freq: Optional[float] = 0.5
video_fps: Optional[int] = 30
mouth_move_strength: Optional[float] = 1.0
paste_back: Optional[bool] = True
head_move_strength: Optional[float] = 0.7
class VideoRetalkResponse(BaseModel):
"""视频生成响应"""
success: bool
message: str
task_id: Optional[str] = None
video_url: Optional[str] = None
video_duration: Optional[float] = None
video_ratio: Optional[str] = None
request_id: Optional[str] = None
@router.post("/generate", response_model=VideoRetalkResponse)
2025-09-02 08:58:30 +08:00
async def generate_video(request: VideoRetalkRequest, background_tasks: BackgroundTasks):
2025-09-02 06:55:13 +08:00
"""
生成人物朗读视频接口
根据输入的人物图片和音频生成口型匹配的朗读视频
"""
try:
# 初始化VideoRetalk实例
video_retalk = VideoRetalk(Config.ALY_LLM_API_KEY)
# 调用视频生成方法
2025-09-02 08:48:33 +08:00
video_result = video_retalk.generate_video(
2025-09-02 06:55:13 +08:00
image_url=request.image_url,
audio_url=request.audio_url,
template_id=request.template_id,
eye_move_freq=request.eye_move_freq,
video_fps=request.video_fps,
mouth_move_strength=request.mouth_move_strength,
paste_back=request.paste_back,
head_move_strength=request.head_move_strength
)
2025-09-02 08:58:30 +08:00
if not video_result or not video_result['video_url']:
2025-09-02 06:55:13 +08:00
return VideoRetalkResponse(
success=False,
message="视频生成失败"
)
2025-09-02 08:58:30 +08:00
# 下载视频到本地临时文件
timestamp = int(time.time())
filename = f"video_{uuid.uuid4().hex[:8]}_{timestamp}.mp4"
output_dir = "static/video"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
# 从API下载视频文件
response = requests.get(video_result['video_url'], stream=True)
if response.status_code != 200:
raise HTTPException(status_code=500, detail="视频文件下载失败")
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 上传到OBS
obs_uploader = ObsUploader()
obs_object_key = f"HuangHai/video/{filename}"
upload_success, upload_result = obs_uploader.upload_file(obs_object_key, output_path)
if not upload_success:
# 上传失败,清理文件并抛出异常
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
raise HTTPException(status_code=500, detail=f"OBS上传失败: {upload_result.get('errorMessage', '未知错误')}")
# 构造OBS访问URL
obs_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/{obs_object_key}"
# 安排后台任务删除本地文件
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
return VideoRetalkResponse(
success=True,
message="视频生成成功",
video_url=obs_url,
task_id=str(uuid.uuid4()),
video_duration=video_result['video_duration'],
video_ratio=video_result['video_ratio'],
request_id=str(uuid.uuid4())
)
2025-09-02 06:55:13 +08:00
except Exception as e:
logger.error(f"视频生成接口错误: {e}")
raise HTTPException(
status_code=500,
detail=f"视频生成失败: {str(e)}"
)
@router.get("/task/status")
async def get_task_status(task_id: str = Query(..., description="任务ID")):
"""
查询视频生成任务状态
"""
try:
video_retalk = VideoRetalk(Config.ALY_LLM_API_KEY)
task_status = video_retalk.get_task_status(task_id)
return {
"success": True,
"data": task_status
}
except Exception as e:
logger.error(f"查询任务状态错误: {e}")
raise HTTPException(
status_code=500,
detail=f"查询任务状态失败: {str(e)}"
)
@router.get("/health")
async def health_check():
"""
健康检查接口
"""
return {
"status": "healthy",
"timestamp": datetime.datetime.now().isoformat(),
"service": "VideoRetalk API"
}
# 保留全局异常处理
def global_exception_handler(request: Request, exc: Exception):
logger.error(f"全局异常: {exc}")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"服务器内部错误: {str(exc)}"}
)