This commit is contained in:
2025-08-21 14:24:35 +08:00
parent 2918f9597d
commit 5fda2cfeaf
8 changed files with 157 additions and 10 deletions

240
dsLightRag/Suno/sunoUtil.py Normal file
View File

@@ -0,0 +1,240 @@
import os
import time
import json
import logging
import requests
from pathlib import Path
from Config import Config
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger('SunoMusicGenerator')
class SunoMusicGenerator:
GENERATE_URL = "https://goapi.gptnb.ai/suno/v2/generate"
FEED_URL = "https://goapi.gptnb.ai/suno/v2/feed"
MAX_RETRIES = 30 # 最大重试次数
RETRY_INTERVAL = 5000 # 重试间隔(毫秒)
def __init__(self, ak):
# 初始化访问密钥
self.ak = ak
# 获取项目根目录路径
self.project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# 拼接相对路径
self.base_path = os.path.join(self.project_root, 'src', 'main', 'python', 'com', 'dsideal', 'aiSupport', 'Util',
'Suno', 'Example')
# 确保目录存在
Path(self.base_path).mkdir(parents=True, exist_ok=True)
def generate_music(self, music_description, make_instrumental=True):
"""
生成音乐
:param music_description: 音乐描述
:param make_instrumental: 是否生成纯音乐
:return: 音频URL如果生成失败则返回None
"""
log.info(f"开始生成音乐: {music_description}")
# 构建JSON请求体
request_json = {
"gpt_description_prompt": music_description,
"mv": "chirp-v3-5",
"prompt": "",
"make_instrumental": make_instrumental
}
# 设置请求头
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.ak}"
}
# 执行生成请求
try:
response = requests.post(self.GENERATE_URL, headers=headers, json=request_json, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestException as e:
log.error(f"音乐生成请求失败: {e}")
return None
# 解析响应
try:
generate_json = response.json()
log.info(f"音乐生成响应: {generate_json}")
except json.JSONDecodeError as e:
log.error(f"解析音乐生成响应失败: {e}")
return None
# 提取任务ID
task_id = None
if "id" in generate_json:
task_id = generate_json["id"]
elif "task_id" in generate_json:
task_id = generate_json["task_id"]
elif "clip_id" in generate_json:
task_id = generate_json["clip_id"]
if task_id is None:
log.error("无法从响应中提取任务ID无法继续查询")
return None
log.info(f"音乐生成任务已提交任务ID: {task_id}")
# 存储任务ID列表
task_ids = [task_id]
# 查询任务状态
is_complete = False
retry_count = 0
audio_url = None
while not is_complete and retry_count < self.MAX_RETRIES:
# 等待一段时间再查询
time.sleep(self.RETRY_INTERVAL / 1000)
# 构建查询URL
url_builder = [self.FEED_URL, "?"]
# 尝试从生成响应中获取clips的ID
clip_ids = []
if "clips" in generate_json:
clips_array = generate_json["clips"]
for clip in clips_array:
if "id" in clip:
clip_ids.append(clip["id"])
# 添加ids参数
if clip_ids:
ids_param = ",".join(clip_ids)
url_builder.append(f"ids={ids_param}")
log.info(f"使用clips ID查询: {ids_param}")
else:
ids_param = ",".join(task_ids)
url_builder.append(f"ids={ids_param}")
log.info(f"使用任务ID查询: {ids_param}")
url = "".join(url_builder)
log.info(f"查询URL: {url}")
# 设置请求头
feed_headers = {
"Authorization": f"Bearer {self.ak}",
"Accept": "application/json"
}
# 执行查询请求
try:
feed_response = requests.get(url, headers=feed_headers, timeout=30)
feed_response.raise_for_status()
except requests.exceptions.RequestException as e:
log.error(f"查询任务状态失败: {e}")
retry_count += 1
continue
# 解析查询响应
try:
json_response = feed_response.json()
clips = json_response.get("clips", [])
except json.JSONDecodeError as e:
log.error(f"解析查询响应失败: {e}")
retry_count += 1
continue
if clips:
# 遍历所有返回的音乐片段
for clip in clips:
clip_id = clip.get("id")
status = clip.get("status")
title = clip.get("title")
log.info(f"\n查询结果 (第{retry_count + 1}次):")
log.info(f"ID: {clip_id}")
log.info(f"标题: {title}")
log.info(f"状态: {status}")
# 检查是否完成
if status == "complete":
# 确保audio_url字段存在
if "audio_url" in clip and clip["audio_url"]:
audio_url = clip["audio_url"]
log.info("音乐生成已完成!")
log.info(f"音频URL: {audio_url}")
is_complete = True
break
else:
log.warning("音乐生成已完成但未找到音频URL!")
log.info(f"完整的片段信息: {clip}")
elif status == "streaming":
log.info("音乐生成中,继续等待...")
elif status == "failed":
log.error("音乐生成失败!")
is_complete = True
break
else:
log.info("未找到音乐片段,继续等待...")
retry_count += 1
# 下载音频文件(如果生成成功)
if is_complete and audio_url:
# 移除URL中可能存在的反引号
audio_url = audio_url.replace("`", "").strip()
file_name = f"suno_music_{int(time.time())}.mp3"
save_path = os.path.join(self.base_path, file_name)
log.info(f"准备下载音频到: {save_path}")
if self.download_audio(audio_url, save_path):
return save_path
elif retry_count >= self.MAX_RETRIES:
log.warning(f"达到最大重试次数,任务可能仍在处理中")
log.warning(f"请稍后手动查询任务ID: {task_id}")
elif not audio_url:
log.error("未能获取有效的音频URL无法下载")
return None
def download_audio(self, audio_url, save_path):
"""
下载音频文件
:param audio_url: 音频URL
:param save_path: 保存路径
:return: 下载是否成功
"""
log.info("开始下载音频文件...")
try:
response = requests.get(audio_url, stream=True, timeout=60)
response.raise_for_status()
# 确保目录存在
Path(os.path.dirname(save_path)).mkdir(parents=True, exist_ok=True)
# 保存文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
log.info(f"音频下载成功,保存路径: {save_path}")
return True
except requests.exceptions.RequestException as e:
log.error(f"下载音频失败: {e}")
return False
if __name__ == "__main__":
ak = Config.GPTNB_API_KEY
# 创建音乐生成器实例
generator = SunoMusicGenerator(ak)
# 生成音乐
music_description = "an anthemic dancepop song about dancing all night long"
audio_path = generator.generate_music(music_description)
if audio_path:
print(f"音乐生成并下载成功: {audio_path}")
else:
print("音乐生成失败")