171 lines
6.1 KiB
Python
171 lines
6.1 KiB
Python
import logging
|
||
|
||
from fastapi import APIRouter, HTTPException, BackgroundTasks, Query
|
||
from pydantic import BaseModel
|
||
from typing import Optional
|
||
import os
|
||
import uuid
|
||
import time
|
||
from Util.ObsUtil import ObsUploader
|
||
from Config.Config import OBS_BUCKET, OBS_SERVER
|
||
# 导入TTS生成类
|
||
from Util.GengerateAudio import ByteDanceTTS
|
||
from fastapi.responses import StreamingResponse
|
||
import requests
|
||
|
||
# 配置日志
|
||
logger = logging.getLogger(__name__)
|
||
router = APIRouter(prefix="/api/tts", tags=["文生音频"])
|
||
# 初始化TTS实例(全局单例,避免重复创建)
|
||
tts_instance = ByteDanceTTS()
|
||
|
||
class TextToSpeechRequest(BaseModel):
|
||
text: str
|
||
voice_type: Optional[str] = None
|
||
speed_ratio: Optional[float] = 1.0
|
||
volume_ratio: Optional[float] = 1.0
|
||
pitch_ratio: Optional[float] = 1.0
|
||
encoding: Optional[str] = "mp3"
|
||
|
||
|
||
@router.get("/voice-categories")
|
||
async def get_voice_categories():
|
||
try:
|
||
categories = tts_instance.get_all_categories()
|
||
return {
|
||
"success": True,
|
||
"data": categories, # 恢复为原始的 data 字段
|
||
"message": "获取音色分类成功"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取音色分类错误: {e}")
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=f"获取音色分类失败: {str(e)}"
|
||
)
|
||
|
||
|
||
# 恢复原始的音色列表接口路由
|
||
@router.get("/voices")
|
||
async def get_voices_by_category(category: str = Query(...)): # 现在Query已定义
|
||
try:
|
||
voices = tts_instance.get_voices_by_category(category)
|
||
if not voices:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到分类 '{category}' 下的音色"
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"data": voices,
|
||
"message": f"获取分类 '{category}' 下的音色成功"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取分类 '{category}' 下的音色错误: {e}")
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=f"获取分类 '{category}' 下的音色失败: {str(e)}"
|
||
)
|
||
|
||
|
||
@router.get("/all")
|
||
async def get_all_voices():
|
||
"""
|
||
获取所有音色分类和音色列表接口
|
||
返回所有音色分类和每个分类下的音色列表
|
||
"""
|
||
try:
|
||
all_voices = tts_instance.get_all_voices()
|
||
return {
|
||
"success": True,
|
||
"data": all_voices,
|
||
"message": "获取所有音色分类和列表成功"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取所有音色分类和列表错误: {e}")
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=f"获取所有音色分类和列表失败: {str(e)}")
|
||
|
||
|
||
@router.post("/generate")
|
||
async def generate_audio(request: TextToSpeechRequest, background_tasks: BackgroundTasks):
|
||
try:
|
||
# 1. 生成唯一文件名和路径
|
||
timestamp = int(time.time())
|
||
filename = f"tts_{uuid.uuid4().hex[:8]}_{timestamp}.mp3"
|
||
output_dir = "static/audio"
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
output_path = os.path.join(output_dir, filename)
|
||
|
||
# 2. 使用ByteDanceTTS生成音频文件
|
||
# ------------------- TTS生成逻辑开始 -------------------
|
||
audio_data = tts_instance.generate_audio(
|
||
text=request.text,
|
||
output_path=output_path,
|
||
voice_type=request.voice_type,
|
||
speed_ratio=request.speed_ratio,
|
||
volume_ratio=request.volume_ratio,
|
||
pitch_ratio=request.pitch_ratio,
|
||
encoding=request.encoding
|
||
)
|
||
|
||
if not audio_data:
|
||
raise HTTPException(status_code=500, detail="TTS音频生成失败")
|
||
# ------------------- TTS生成逻辑结束 -------------------
|
||
|
||
# 3. 上传到OBS
|
||
obs_uploader = ObsUploader()
|
||
obs_object_key = f"HuangHai/tts/{filename}"
|
||
upload_success, upload_result = obs_uploader.upload_file(obs_object_key, output_path)
|
||
|
||
if not upload_success:
|
||
# 上传失败,清理文件并抛出异常
|
||
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
|
||
raise HTTPException(status_code=500, detail=f"OBS上传失败: {upload_result.get('errorMessage', '未知错误')}")
|
||
|
||
# 4. 构造OBS访问URL
|
||
obs_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/{obs_object_key}"
|
||
|
||
# 5. 安排后台任务删除本地文件
|
||
background_tasks.add_task(os.remove, output_path) if os.path.exists(output_path) else None
|
||
|
||
# 6. 返回OBS URL
|
||
return {
|
||
"status": "success",
|
||
"message": "音频生成并上传成功",
|
||
"audio_url": obs_url, # 确保此处没有添加任何引号或反引号
|
||
"filename": filename,
|
||
"audio_size": len(audio_data) / 1024 # 音频大小(KB)
|
||
}
|
||
|
||
except Exception as e:
|
||
# 捕获所有异常并清理文件
|
||
if 'output_path' in locals() and os.path.exists(output_path):
|
||
background_tasks.add_task(os.remove, output_path)
|
||
raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
|
||
|
||
|
||
@router.get("/download/{filename}")
|
||
async def download_audio(filename: str):
|
||
# 构建OBS文件URL
|
||
obs_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/HuangHai/tts/{filename}"
|
||
|
||
try:
|
||
# 流式获取OBS文件
|
||
response = requests.get(obs_url, stream=True)
|
||
if response.status_code != 200:
|
||
raise HTTPException(status_code=404, detail="音频文件不存在")
|
||
|
||
# 设置下载响应头
|
||
headers = {
|
||
"Content-Disposition": f"attachment; filename={filename}",
|
||
"Content-Type": "audio/mpeg"
|
||
}
|
||
|
||
# 返回流式响应
|
||
return StreamingResponse(response.iter_content(chunk_size=8192), headers=headers)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"下载失败: {str(e)}")
|