Files
dsProject/dsLightRag/Test/Test_TTS.py

101 lines
2.9 KiB
Python
Raw Normal View History

2025-08-31 09:20:47 +08:00
#!/usr/bin/env python3
import json
import logging
import uuid
# pip install websockets
import websockets
from Config import Config
from Util.TTS_Protocols import EventType, MsgType, full_client_request, receive_message
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_resource_id(voice: str) -> str:
if voice.startswith("S_"):
return "volc.megatts.default"
return "volc.service_type.10029"
text = '你好,我是火山引擎的语音合成服务。这是一个美好的旅程。' # 文本
voice_type = 'zh_female_wanwanxiaohe_moon_bigtts' # 湾湾小何
encoding = 'wav' # 输出文件编码
endpoint = "wss://openspeech.bytedance.com/api/v3/tts/unidirectional/stream" # WebSocket端点URL
async def main():
# Connect to server
headers = {
"X-Api-App-Key": Config.HS_APP_ID,
"X-Api-Access-Key": Config.HS_ACCESS_TOKEN,
"X-Api-Resource-Id": get_resource_id(voice_type),
"X-Api-Connect-Id": str(uuid.uuid4()),
}
logger.info(f"Connecting to {endpoint} with headers: {headers}")
websocket = await websockets.connect(
endpoint, additional_headers=headers, max_size=10 * 1024 * 1024
)
logger.info(
f"Connected to WebSocket server, Logid: {websocket.response.headers['x-tt-logid']}",
)
try:
# Prepare request payload
request = {
"user": {
"uid": str(uuid.uuid4()),
},
"req_params": {
"speaker": voice_type,
"audio_params": {
"format": encoding,
"sample_rate": 24000,
"enable_timestamp": True,
},
"text": text,
"additions": json.dumps(
{
"disable_markdown_filter": False,
}
),
},
}
# Send request
await full_client_request(websocket, json.dumps(request).encode())
# Receive audio data
audio_data = bytearray()
while True:
msg = await receive_message(websocket)
if msg.type == MsgType.FullServerResponse:
if msg.event == EventType.SessionFinished:
break
elif msg.type == MsgType.AudioOnlyServer:
audio_data.extend(msg.payload)
else:
raise RuntimeError(f"TTS conversion failed: {msg}")
# Check if we received any audio data
if not audio_data:
raise RuntimeError("No audio data received")
# Save audio file
filename = f"{voice_type}.{encoding}"
with open(filename, "wb") as f:
f.write(audio_data)
logger.info(f"Audio received: {len(audio_data)}, saved to {filename}")
finally:
await websocket.close()
logger.info("Connection closed")
if __name__ == "__main__":
import asyncio
asyncio.run(main())