Files
dsProject/dsLightRag/Routes/VideoRetalkRoute.py
2025-09-02 08:58:30 +08:00

169 lines
5.5 KiB
Python

import datetime
import logging
import uuid
import os
import time
import requests
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from Util.ObsUtil import ObsUploader
from Config.Config import OBS_BUCKET, OBS_SERVER
from Config import Config
from Util.VideoRetalk import VideoRetalk
# 创建视频生成路由
router = APIRouter(prefix="/api/video", tags=["视频生成"])
# 配置日志
logger = logging.getLogger(__name__)
# 仅保留视频相关模型定义
class VideoRetalkRequest(BaseModel):
"""视频生成请求参数"""
image_url: str
audio_url: str
template_id: Optional[str] = "normal"
eye_move_freq: Optional[float] = 0.5
video_fps: Optional[int] = 30
mouth_move_strength: Optional[float] = 1.0
paste_back: Optional[bool] = True
head_move_strength: Optional[float] = 0.7
class VideoRetalkResponse(BaseModel):
"""视频生成响应"""
success: bool
message: str
task_id: Optional[str] = None
video_url: Optional[str] = None
video_duration: Optional[float] = None
video_ratio: Optional[str] = None
request_id: Optional[str] = None
@router.post("/generate", response_model=VideoRetalkResponse)
async def generate_video(request: VideoRetalkRequest, background_tasks: BackgroundTasks):
"""
生成人物朗读视频接口
根据输入的人物图片和音频,生成口型匹配的朗读视频
"""
try:
# 初始化VideoRetalk实例
video_retalk = VideoRetalk(Config.ALY_LLM_API_KEY)
# 调用视频生成方法
video_result = video_retalk.generate_video(
image_url=request.image_url,
audio_url=request.audio_url,
template_id=request.template_id,
eye_move_freq=request.eye_move_freq,
video_fps=request.video_fps,
mouth_move_strength=request.mouth_move_strength,
paste_back=request.paste_back,
head_move_strength=request.head_move_strength
)
if not video_result or not video_result['video_url']:
return VideoRetalkResponse(
success=False,
message="视频生成失败"
)
# 下载视频到本地临时文件
timestamp = int(time.time())
filename = f"video_{uuid.uuid4().hex[:8]}_{timestamp}.mp4"
output_dir = "static/video"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
# 从API下载视频文件
response = requests.get(video_result['video_url'], stream=True)
if response.status_code != 200:
raise HTTPException(status_code=500, detail="视频文件下载失败")
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 上传到OBS
obs_uploader = ObsUploader()
obs_object_key = f"HuangHai/video/{filename}"
upload_success, upload_result = obs_uploader.upload_file(obs_object_key, output_path)
if not upload_success:
# 上传失败,清理文件并抛出异常
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
raise HTTPException(status_code=500, detail=f"OBS上传失败: {upload_result.get('errorMessage', '未知错误')}")
# 构造OBS访问URL
obs_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/{obs_object_key}"
# 安排后台任务删除本地文件
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
return VideoRetalkResponse(
success=True,
message="视频生成成功",
video_url=obs_url,
task_id=str(uuid.uuid4()),
video_duration=video_result['video_duration'],
video_ratio=video_result['video_ratio'],
request_id=str(uuid.uuid4())
)
except Exception as e:
logger.error(f"视频生成接口错误: {e}")
raise HTTPException(
status_code=500,
detail=f"视频生成失败: {str(e)}"
)
@router.get("/task/status")
async def get_task_status(task_id: str = Query(..., description="任务ID")):
"""
查询视频生成任务状态
"""
try:
video_retalk = VideoRetalk(Config.ALY_LLM_API_KEY)
task_status = video_retalk.get_task_status(task_id)
return {
"success": True,
"data": task_status
}
except Exception as e:
logger.error(f"查询任务状态错误: {e}")
raise HTTPException(
status_code=500,
detail=f"查询任务状态失败: {str(e)}"
)
@router.get("/health")
async def health_check():
"""
健康检查接口
"""
return {
"status": "healthy",
"timestamp": datetime.datetime.now().isoformat(),
"service": "VideoRetalk API"
}
# 保留全局异常处理
def global_exception_handler(request: Request, exc: Exception):
logger.error(f"全局异常: {exc}")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"服务器内部错误: {str(exc)}"}
)