From 06bc343d387d67c9035cca9450638831bcf5cb83 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Fri, 22 Aug 2025 09:55:18 +0800 Subject: [PATCH] 'commit' --- dsLightRag/Test/tts_http_demo.py | 80 ----------------- dsLightRag/Util/TTSService.py | 150 +++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 80 deletions(-) delete mode 100644 dsLightRag/Test/tts_http_demo.py create mode 100644 dsLightRag/Util/TTSService.py diff --git a/dsLightRag/Test/tts_http_demo.py b/dsLightRag/Test/tts_http_demo.py deleted file mode 100644 index 5fe9af4d..00000000 --- a/dsLightRag/Test/tts_http_demo.py +++ /dev/null @@ -1,80 +0,0 @@ -import base64 -import uuid - -import requests - -from Config.Config import HS_APP_ID, HS_ACCESS_TOKEN, HS_CLUSTER_ID, HS_VOICE_TYPE - -appid = HS_APP_ID -access_token= HS_ACCESS_TOKEN -cluster = HS_CLUSTER_ID - -voice_type = HS_VOICE_TYPE -host = "openspeech.bytedance.com" -api_url = f"https://{host}/api/v1/tts" - -header = {"Authorization": f"Bearer;{access_token}"} - -request_json = { - "app": { - "appid": appid, - "token": "access_token", - "cluster": cluster - }, - "user": { - "uid": "388808087185088" - }, - "audio": { - "voice_type": voice_type, - "encoding": "mp3", - "speed_ratio": 1.0, - "volume_ratio": 1.0, - "pitch_ratio": 1.0, - }, - "request": { - "reqid": str(uuid.uuid4()), - "text": "我是小何啊,这是什么鬼~", - "text_type": "plain", - "operation": "query", - "with_frontend": 1, - "frontend_type": "unitTson" - - } -} - -if __name__ == '__main__': - try: - resp = requests.post(api_url, json=request_json, headers=header) - print(f"resp body: {resp.json()}") - - # 检查响应状态 - if resp.status_code == 200: - resp_data = resp.json() - if resp_data.get("code") == 3000 and "data" in resp_data: - data = resp_data["data"] - - # 检查data是否为空 - if data: - try: - # 解码Base64数据 - audio_data = base64.b64decode(data) - - # 保存为MP3文件 - output_file = "test_tts_output.mp3" - with open(output_file, "wb") as f: - f.write(audio_data) - print(f"音频文件已成功保存为: {output_file}") - except Exception as decode_error: - print(f"Base64解码或文件保存失败: {str(decode_error)}") - # 打印数据前100个字符,以便调试 - print(f"数据前100个字符: {data[:100]}") - else: - print("错误: 返回的data字段为空") - else: - print(f"API返回错误: code={resp_data.get('code')}, message={resp_data.get('message')}") - else: - print(f"HTTP请求失败: 状态码={resp.status_code}") - except Exception as e: - print(f"发生异常: {str(e)}") - import traceback - traceback.print_exc() diff --git a/dsLightRag/Util/TTSService.py b/dsLightRag/Util/TTSService.py new file mode 100644 index 00000000..cde2f67d --- /dev/null +++ b/dsLightRag/Util/TTSService.py @@ -0,0 +1,150 @@ +import base64 +import uuid +import requests +import logging +from Config.Config import HS_APP_ID, HS_ACCESS_TOKEN, HS_CLUSTER_ID, HS_VOICE_TYPE + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger('TTSService') + +class TTSService: + """ + 字节跳动TTS服务封装类 + """ + def __init__(self, appid=None, access_token=None, cluster=None, voice_type=None): + """ + 初始化TTS服务 + + @param appid: 应用ID,默认使用配置文件中的HS_APP_ID + @param access_token: 访问令牌,默认使用配置文件中的HS_ACCESS_TOKEN + @param cluster: 集群ID,默认使用配置文件中的HS_CLUSTER_ID + @param voice_type: 语音类型,默认使用配置文件中的HS_VOICE_TYPE + """ + self.appid = appid or HS_APP_ID + self.access_token = access_token or HS_ACCESS_TOKEN + self.cluster = cluster or HS_CLUSTER_ID + self.voice_type = voice_type or HS_VOICE_TYPE + self.host = "openspeech.bytedance.com" + self.api_url = f"https://{self.host}/api/v1/tts" + self.header = {"Authorization": f"Bearer;{self.access_token}"} + + logger.info(f"TTS服务初始化完成: appid={self.appid}, cluster={self.cluster}") + + def _prepare_request(self, text, encoding="mp3", speed_ratio=1.0, volume_ratio=1.0, pitch_ratio=1.0): + """ + 准备TTS请求参数 + + @param text: 要转换的文本 + @param encoding: 音频编码格式 + @param speed_ratio: 语速比例 + @param volume_ratio: 音量比例 + @param pitch_ratio: 音调比例 + @return: 请求参数字典 + """ + return { + "app": { + "appid": self.appid, + "token": self.access_token, + "cluster": self.cluster + }, + "user": { + "uid": "388808087185088" + }, + "audio": { + "voice_type": self.voice_type, + "encoding": encoding, + "speed_ratio": speed_ratio, + "volume_ratio": volume_ratio, + "pitch_ratio": pitch_ratio, + }, + "request": { + "reqid": str(uuid.uuid4()), + "text": text, + "text_type": "plain", + "operation": "query", + "with_frontend": 1, + "frontend_type": "unitTson" + } + } + + def synthesize(self, text, output_file="tts_output.mp3", **kwargs): + """ + 合成语音并保存到文件 + + @param text: 要转换的文本 + @param output_file: 输出文件路径 + @param kwargs: 其他可选参数(encoding, speed_ratio等) + @return: 成功返回True,失败返回False + """ + try: + # 准备请求参数 + request_json = self._prepare_request(text, **kwargs) + logger.info(f"准备发送TTS请求: text='{text[:20]}...'") + + # 发送请求 + resp = requests.post(self.api_url, json=request_json, headers=self.header) + logger.info(f"TTS请求响应状态码: {resp.status_code}") + + # 检查响应状态 + if resp.status_code == 200: + resp_data = resp.json() + logger.info(f"TTS API返回码: {resp_data.get('code')}, 消息: {resp_data.get('message')}") + + if resp_data.get("code") == 3000 and "data" in resp_data: + data = resp_data["data"] + + # 检查data是否为空 + if data: + try: + # 解码Base64数据 + audio_data = base64.b64decode(data) + + # 保存为文件 + with open(output_file, "wb") as f: + f.write(audio_data) + logger.info(f"音频文件已成功保存为: {output_file}") + return True + except Exception as decode_error: + logger.error(f"Base64解码或文件保存失败: {str(decode_error)}") + logger.error(f"数据前100个字符: {data[:100]}") + else: + logger.error("错误: 返回的data字段为空") + else: + logger.error(f"API返回错误: code={resp_data.get('code')}, message={resp_data.get('message')}") + else: + logger.error(f"HTTP请求失败: 状态码={resp.status_code}") + except Exception as e: + logger.error(f"发生异常: {str(e)}") + import traceback + traceback.print_exc() + + return False + +def main(): + """ + 测试TTS服务的主函数 + """ + # 初始化TTS服务 + tts_service = TTSService() + + # 测试文本 + test_text = "我是小何啊,这是什么鬼~" + print(f"正在合成语音: {test_text}") + + # 调用合成方法 + success = tts_service.synthesize( + text=test_text, + output_file="test_tts_output.mp3", + speed_ratio=1.0, + volume_ratio=1.0, + pitch_ratio=1.0 + ) + + if success: + print("语音合成成功!") + else: + print("语音合成失败!") + +if __name__ == '__main__': + main() \ No newline at end of file