diff --git a/dsLightRag/Suno/suno_music_generator.py b/dsLightRag/Suno/suno_music_generator.py new file mode 100644 index 00000000..ff35283a --- /dev/null +++ b/dsLightRag/Suno/suno_music_generator.py @@ -0,0 +1,240 @@ +import os +import time +import json +import logging +import requests +from pathlib import Path + +from Config import Config + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +log = logging.getLogger('SunoMusicGenerator') + + +class SunoMusicGenerator: + GENERATE_URL = "https://goapi.gptnb.ai/suno/v2/generate" + FEED_URL = "https://goapi.gptnb.ai/suno/v2/feed" + MAX_RETRIES = 30 # 最大重试次数 + RETRY_INTERVAL = 5000 # 重试间隔(毫秒) + + def __init__(self, ak): + # 初始化访问密钥 + self.ak = ak + # 获取项目根目录路径 + self.project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + # 拼接相对路径 + self.base_path = os.path.join(self.project_root, 'src', 'main', 'python', 'com', 'dsideal', 'aiSupport', 'Util', + 'Suno', 'Example') + # 确保目录存在 + Path(self.base_path).mkdir(parents=True, exist_ok=True) + + def generate_music(self, music_description, make_instrumental=True): + """ + 生成音乐 + :param music_description: 音乐描述 + :param make_instrumental: 是否生成纯音乐 + :return: 音频URL,如果生成失败则返回None + """ + log.info(f"开始生成音乐: {music_description}") + + # 构建JSON请求体 + request_json = { + "gpt_description_prompt": music_description, + "mv": "chirp-v3-5", + "prompt": "", + "make_instrumental": make_instrumental + } + + # 设置请求头 + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.ak}" + } + + # 执行生成请求 + try: + response = requests.post(self.GENERATE_URL, headers=headers, json=request_json, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + log.error(f"音乐生成请求失败: {e}") + return None + + # 解析响应 + try: + generate_json = response.json() + log.info(f"音乐生成响应: {generate_json}") + except json.JSONDecodeError as e: + log.error(f"解析音乐生成响应失败: {e}") + return None + + # 提取任务ID + task_id = None + if "id" in generate_json: + task_id = generate_json["id"] + elif "task_id" in generate_json: + task_id = generate_json["task_id"] + elif "clip_id" in generate_json: + task_id = generate_json["clip_id"] + + if task_id is None: + log.error("无法从响应中提取任务ID,无法继续查询") + return None + + log.info(f"音乐生成任务已提交,任务ID: {task_id}") + + # 存储任务ID列表 + task_ids = [task_id] + + # 查询任务状态 + is_complete = False + retry_count = 0 + audio_url = None + + while not is_complete and retry_count < self.MAX_RETRIES: + # 等待一段时间再查询 + time.sleep(self.RETRY_INTERVAL / 1000) + + # 构建查询URL + url_builder = [self.FEED_URL, "?"] + + # 尝试从生成响应中获取clips的ID + clip_ids = [] + if "clips" in generate_json: + clips_array = generate_json["clips"] + for clip in clips_array: + if "id" in clip: + clip_ids.append(clip["id"]) + + # 添加ids参数 + if clip_ids: + ids_param = ",".join(clip_ids) + url_builder.append(f"ids={ids_param}") + log.info(f"使用clips ID查询: {ids_param}") + else: + ids_param = ",".join(task_ids) + url_builder.append(f"ids={ids_param}") + log.info(f"使用任务ID查询: {ids_param}") + + url = "".join(url_builder) + log.info(f"查询URL: {url}") + + # 设置请求头 + feed_headers = { + "Authorization": f"Bearer {self.ak}", + "Accept": "application/json" + } + + # 执行查询请求 + try: + feed_response = requests.get(url, headers=feed_headers, timeout=30) + feed_response.raise_for_status() + except requests.exceptions.RequestException as e: + log.error(f"查询任务状态失败: {e}") + retry_count += 1 + continue + + # 解析查询响应 + try: + json_response = feed_response.json() + clips = json_response.get("clips", []) + except json.JSONDecodeError as e: + log.error(f"解析查询响应失败: {e}") + retry_count += 1 + continue + + if clips: + # 遍历所有返回的音乐片段 + for clip in clips: + clip_id = clip.get("id") + status = clip.get("status") + title = clip.get("title") + + log.info(f"\n查询结果 (第{retry_count + 1}次):") + log.info(f"ID: {clip_id}") + log.info(f"标题: {title}") + log.info(f"状态: {status}") + + # 检查是否完成 + if status == "complete": + # 确保audio_url字段存在 + if "audio_url" in clip and clip["audio_url"]: + audio_url = clip["audio_url"] + log.info("音乐生成已完成!") + log.info(f"音频URL: {audio_url}") + is_complete = True + break + else: + log.warning("音乐生成已完成,但未找到音频URL!") + log.info(f"完整的片段信息: {clip}") + elif status == "streaming": + log.info("音乐生成中,继续等待...") + elif status == "failed": + log.error("音乐生成失败!") + is_complete = True + break + else: + log.info("未找到音乐片段,继续等待...") + + retry_count += 1 + + # 下载音频文件(如果生成成功) + if is_complete and audio_url: + # 移除URL中可能存在的反引号 + audio_url = audio_url.replace("`", "").strip() + + file_name = f"suno_music_{int(time.time())}.mp3" + save_path = os.path.join(self.base_path, file_name) + log.info(f"准备下载音频到: {save_path}") + if self.download_audio(audio_url, save_path): + return save_path + elif retry_count >= self.MAX_RETRIES: + log.warning(f"达到最大重试次数,任务可能仍在处理中") + log.warning(f"请稍后手动查询任务ID: {task_id}") + elif not audio_url: + log.error("未能获取有效的音频URL,无法下载") + + return None + + def download_audio(self, audio_url, save_path): + """ + 下载音频文件 + :param audio_url: 音频URL + :param save_path: 保存路径 + :return: 下载是否成功 + """ + log.info("开始下载音频文件...") + + try: + response = requests.get(audio_url, stream=True, timeout=60) + response.raise_for_status() + + # 确保目录存在 + Path(os.path.dirname(save_path)).mkdir(parents=True, exist_ok=True) + + # 保存文件 + with open(save_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + log.info(f"音频下载成功,保存路径: {save_path}") + return True + except requests.exceptions.RequestException as e: + log.error(f"下载音频失败: {e}") + return False + + +if __name__ == "__main__": + ak = Config.GPTNB_API_KEY + + # 创建音乐生成器实例 + generator = SunoMusicGenerator(ak) + + # 生成音乐 + music_description = "an anthemic dancepop song about dancing all night long" + audio_path = generator.generate_music(music_description) + if audio_path: + print(f"音乐生成并下载成功: {audio_path}") + else: + print("音乐生成失败")