Files
dsProject/dsLightRag/Util/TTSService.py

150 lines
5.3 KiB
Python
Raw Normal View History

2025-08-22 09:55:18 +08:00
import base64
import uuid
import requests
import logging
from Config.Config import HS_APP_ID, HS_ACCESS_TOKEN, HS_CLUSTER_ID, HS_VOICE_TYPE
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('TTSService')
class TTSService:
"""
字节跳动TTS服务封装类
"""
def __init__(self, appid=None, access_token=None, cluster=None, voice_type=None):
"""
初始化TTS服务
@param appid: 应用ID默认使用配置文件中的HS_APP_ID
@param access_token: 访问令牌默认使用配置文件中的HS_ACCESS_TOKEN
@param cluster: 集群ID默认使用配置文件中的HS_CLUSTER_ID
@param voice_type: 语音类型默认使用配置文件中的HS_VOICE_TYPE
"""
self.appid = appid or HS_APP_ID
self.access_token = access_token or HS_ACCESS_TOKEN
self.cluster = cluster or HS_CLUSTER_ID
self.voice_type = voice_type or HS_VOICE_TYPE
self.host = "openspeech.bytedance.com"
self.api_url = f"https://{self.host}/api/v1/tts"
self.header = {"Authorization": f"Bearer;{self.access_token}"}
logger.info(f"TTS服务初始化完成: appid={self.appid}, cluster={self.cluster}")
def _prepare_request(self, text, encoding="mp3", speed_ratio=1.0, volume_ratio=1.0, pitch_ratio=1.0):
"""
准备TTS请求参数
@param text: 要转换的文本
@param encoding: 音频编码格式
@param speed_ratio: 语速比例
@param volume_ratio: 音量比例
@param pitch_ratio: 音调比例
@return: 请求参数字典
"""
return {
"app": {
"appid": self.appid,
"token": self.access_token,
"cluster": self.cluster
},
"user": {
"uid": "388808087185088"
},
"audio": {
"voice_type": self.voice_type,
"encoding": encoding,
"speed_ratio": speed_ratio,
"volume_ratio": volume_ratio,
"pitch_ratio": pitch_ratio,
},
"request": {
"reqid": str(uuid.uuid4()),
"text": text,
"text_type": "plain",
"operation": "query",
"with_frontend": 1,
"frontend_type": "unitTson"
}
}
def synthesize(self, text, output_file="tts_output.mp3", **kwargs):
"""
合成语音并保存到文件
@param text: 要转换的文本
@param output_file: 输出文件路径
@param kwargs: 其他可选参数encoding, speed_ratio等
@return: 成功返回True失败返回False
"""
try:
# 准备请求参数
request_json = self._prepare_request(text, **kwargs)
logger.info(f"准备发送TTS请求: text='{text[:20]}...'")
# 发送请求
resp = requests.post(self.api_url, json=request_json, headers=self.header)
logger.info(f"TTS请求响应状态码: {resp.status_code}")
# 检查响应状态
if resp.status_code == 200:
resp_data = resp.json()
logger.info(f"TTS API返回码: {resp_data.get('code')}, 消息: {resp_data.get('message')}")
if resp_data.get("code") == 3000 and "data" in resp_data:
data = resp_data["data"]
# 检查data是否为空
if data:
try:
# 解码Base64数据
audio_data = base64.b64decode(data)
# 保存为文件
with open(output_file, "wb") as f:
f.write(audio_data)
logger.info(f"音频文件已成功保存为: {output_file}")
return True
except Exception as decode_error:
logger.error(f"Base64解码或文件保存失败: {str(decode_error)}")
logger.error(f"数据前100个字符: {data[:100]}")
else:
logger.error("错误: 返回的data字段为空")
else:
logger.error(f"API返回错误: code={resp_data.get('code')}, message={resp_data.get('message')}")
else:
logger.error(f"HTTP请求失败: 状态码={resp.status_code}")
except Exception as e:
logger.error(f"发生异常: {str(e)}")
import traceback
traceback.print_exc()
return False
def main():
"""
测试TTS服务的主函数
"""
# 初始化TTS服务
tts_service = TTSService()
# 测试文本
test_text = "我是小何啊,这是什么鬼~"
print(f"正在合成语音: {test_text}")
# 调用合成方法
success = tts_service.synthesize(
text=test_text,
output_file="test_tts_output.mp3",
speed_ratio=1.0,
volume_ratio=1.0,
pitch_ratio=1.0
)
if success:
print("语音合成成功!")
else:
print("语音合成失败!")
if __name__ == '__main__':
main()