import logging import os import uuid import tempfile import shutil import sys from fastapi import APIRouter, UploadFile, File, Form # 配置日志 logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/xunFeiCn", tags=["讯飞"]) # 音频保存目录 UPLOAD_DIR = os.path.join(os.path.dirname(__file__), "..", "static", "audio") os.makedirs(UPLOAD_DIR, exist_ok=True) # 讯飞配置 sys.path.append(os.path.join(os.path.dirname(__file__), "..")) from Config.Config import XF_APPID, XF_APIKEY, XF_APISECRET @router.post("/save-audio") async def save_audio(audio: UploadFile = File(...), txt: str = Form(...), language: str = Form("english")): # 添加语言参数,默认英文 """保存音频文件并评分""" temp_file = None try: # 1. 保存音频到临时文件 temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.wav") content = await audio.read() with open(temp_file, "wb") as f: f.write(content) # 3. 保存到正式目录 file_name = f"audio_{uuid.uuid4().hex}.wav" file_path = os.path.join(UPLOAD_DIR, file_name) shutil.copy2(temp_file, file_path) logger.info(f"已保存文件到: {file_path}") # 添加wav转mp3功能 import subprocess mp3_temp_file = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.mp3") try: # 使用ffmpeg将wav转换为mp3 subprocess.run( ["ffmpeg", "-i", temp_file, "-y", mp3_temp_file], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) logger.info(f"已将wav转换为mp3: {mp3_temp_file}") except subprocess.CalledProcessError as e: logger.error(f"ffmpeg转换失败: {e.stderr.decode()}") raise Exception(f"音频格式转换失败: {e.stderr.decode()}") # 2. 讯飞评分 from KeDaXunFei.XunFeiAudioEvaluator_cn import XunFeiAudioEvaluator_cn evaluator = XunFeiAudioEvaluator_cn( appid=XF_APPID, api_key=XF_APIKEY, api_secret=XF_APISECRET, audio_file=mp3_temp_file, language=language, # 使用动态参数 txt=txt ) results, eval_time = evaluator.run_evaluation() print(evaluator.get_evaluation_summary()) return { "success": True, "file_name": file_name, "file_path": f"/static/audio/{file_name}", "evaluation": results, "evaluation_time": str(eval_time) } except Exception as e: logger.error(f"处理失败: {str(e)}") return {"success": False, "error": str(e)} finally: # 4. 清理临时文件 if temp_file and os.path.exists(temp_file): shutil.rmtree(os.path.dirname(temp_file), ignore_errors=True)