import os import time import json import logging import requests from pathlib import Path from Config import Config # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') log = logging.getLogger('SunoMusicGenerator') class SunoMusicGenerator: GENERATE_URL = "https://goapi.gptnb.ai/suno/v2/generate" FEED_URL = "https://goapi.gptnb.ai/suno/v2/feed" MAX_RETRIES = 30 # 最大重试次数 RETRY_INTERVAL = 5000 # 重试间隔(毫秒) def __init__(self, ak): # 初始化访问密钥 self.ak = ak # 获取项目根目录路径 self.project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # 拼接相对路径 self.base_path = os.path.join(self.project_root, 'src', 'main', 'python', 'com', 'dsideal', 'aiSupport', 'Util', 'Suno', 'Example') # 确保目录存在 Path(self.base_path).mkdir(parents=True, exist_ok=True) def generate_music(self, music_description, make_instrumental=True): """ 生成音乐 :param music_description: 音乐描述 :param make_instrumental: 是否生成纯音乐 :return: 音频URL,如果生成失败则返回None """ log.info(f"开始生成音乐: {music_description}") # 构建JSON请求体 request_json = { "gpt_description_prompt": music_description, "mv": "chirp-v3-5", "prompt": "", "make_instrumental": make_instrumental } # 设置请求头 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.ak}" } # 执行生成请求 try: response = requests.post(self.GENERATE_URL, headers=headers, json=request_json, timeout=30) response.raise_for_status() except requests.exceptions.RequestException as e: log.error(f"音乐生成请求失败: {e}") return None # 解析响应 try: generate_json = response.json() log.info(f"音乐生成响应: {generate_json}") except json.JSONDecodeError as e: log.error(f"解析音乐生成响应失败: {e}") return None # 提取任务ID task_id = None if "id" in generate_json: task_id = generate_json["id"] elif "task_id" in generate_json: task_id = generate_json["task_id"] elif "clip_id" in generate_json: task_id = generate_json["clip_id"] if task_id is None: log.error("无法从响应中提取任务ID,无法继续查询") return None log.info(f"音乐生成任务已提交,任务ID: {task_id}") # 存储任务ID列表 task_ids = [task_id] # 查询任务状态 is_complete = False retry_count = 0 audio_url = None while not is_complete and retry_count < self.MAX_RETRIES: # 等待一段时间再查询 time.sleep(self.RETRY_INTERVAL / 1000) # 构建查询URL url_builder = [self.FEED_URL, "?"] # 尝试从生成响应中获取clips的ID clip_ids = [] if "clips" in generate_json: clips_array = generate_json["clips"] for clip in clips_array: if "id" in clip: clip_ids.append(clip["id"]) # 添加ids参数 if clip_ids: ids_param = ",".join(clip_ids) url_builder.append(f"ids={ids_param}") log.info(f"使用clips ID查询: {ids_param}") else: ids_param = ",".join(task_ids) url_builder.append(f"ids={ids_param}") log.info(f"使用任务ID查询: {ids_param}") url = "".join(url_builder) log.info(f"查询URL: {url}") # 设置请求头 feed_headers = { "Authorization": f"Bearer {self.ak}", "Accept": "application/json" } # 执行查询请求 try: feed_response = requests.get(url, headers=feed_headers, timeout=30) feed_response.raise_for_status() except requests.exceptions.RequestException as e: log.error(f"查询任务状态失败: {e}") retry_count += 1 continue # 解析查询响应 try: json_response = feed_response.json() clips = json_response.get("clips", []) except json.JSONDecodeError as e: log.error(f"解析查询响应失败: {e}") retry_count += 1 continue if clips: # 遍历所有返回的音乐片段 for clip in clips: clip_id = clip.get("id") status = clip.get("status") title = clip.get("title") log.info(f"\n查询结果 (第{retry_count + 1}次):") log.info(f"ID: {clip_id}") log.info(f"标题: {title}") log.info(f"状态: {status}") # 检查是否完成 if status == "complete": # 确保audio_url字段存在 if "audio_url" in clip and clip["audio_url"]: audio_url = clip["audio_url"] log.info("音乐生成已完成!") log.info(f"音频URL: {audio_url}") is_complete = True break else: log.warning("音乐生成已完成,但未找到音频URL!") log.info(f"完整的片段信息: {clip}") elif status == "streaming": log.info("音乐生成中,继续等待...") elif status == "failed": log.error("音乐生成失败!") is_complete = True break else: log.info("未找到音乐片段,继续等待...") retry_count += 1 # 下载音频文件(如果生成成功) if is_complete and audio_url: # 移除URL中可能存在的反引号 audio_url = audio_url.replace("`", "").strip() file_name = f"suno_music_{int(time.time())}.mp3" save_path = os.path.join(self.base_path, file_name) log.info(f"准备下载音频到: {save_path}") if self.download_audio(audio_url, save_path): return save_path elif retry_count >= self.MAX_RETRIES: log.warning(f"达到最大重试次数,任务可能仍在处理中") log.warning(f"请稍后手动查询任务ID: {task_id}") elif not audio_url: log.error("未能获取有效的音频URL,无法下载") return None def download_audio(self, audio_url, save_path): """ 下载音频文件 :param audio_url: 音频URL :param save_path: 保存路径 :return: 下载是否成功 """ log.info("开始下载音频文件...") try: response = requests.get(audio_url, stream=True, timeout=60) response.raise_for_status() # 确保目录存在 Path(os.path.dirname(save_path)).mkdir(parents=True, exist_ok=True) # 保存文件 with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) log.info(f"音频下载成功,保存路径: {save_path}") return True except requests.exceptions.RequestException as e: log.error(f"下载音频失败: {e}") return False if __name__ == "__main__": ak = Config.GPTNB_API_KEY # 创建音乐生成器实例 generator = SunoMusicGenerator(ak) # 生成音乐 music_description = "an anthemic dancepop song about dancing all night long" audio_path = generator.generate_music(music_description) if audio_path: print(f"音乐生成并下载成功: {audio_path}") else: print("音乐生成失败")