import logging import os import uuid import time import asyncio # 添加此行 from fastapi import APIRouter, HTTPException, BackgroundTasks, Query, UploadFile, File, Form from pydantic import BaseModel from typing import Optional import tempfile from starlette.websockets import WebSocket from Util.ObsUtil import ObsUploader from Config.Config import OBS_BUCKET, OBS_SERVER, XF_APPID, XF_APISECRET, XF_APIKEY from fastapi.responses import StreamingResponse import requests # 导入讯飞语音评测类 from KeDaXunFei.XunFeiAudioEvaluator import XunFeiAudioEvaluator # 配置日志 logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/xunFei", tags=["讯飞"]) # 请求模型 class AudioEvaluationRequest(BaseModel): language: str = "chinese" # chinese 或 english text: str # 评测文本内容 group: Optional[str] = "adult" # 群体:adult, youth, pupil check_type: Optional[str] = "common" # 检错严格程度:easy, common, hard grade: Optional[str] = "middle" # 学段:junior, middle, senior # 响应模型 class AudioEvaluationResponse(BaseModel): evaluation_id: str status: str results: Optional[dict] = None evaluation_time: Optional[float] = None error_message: Optional[str] = None # 科大讯飞API配置(需要根据实际情况配置) XUNFEI_CONFIG = { "appid": XF_APPID, # 修复参数名颠倒问题 "api_key": XF_APIKEY, "api_secret": XF_APISECRET } @router.post("/evaluate-audio", response_model=AudioEvaluationResponse) async def evaluate_audio( # 添加async关键字 background_tasks: BackgroundTasks, language: str = Form(..., description="语言类型: chinese/english"), group: str = Form("adult", description="群体类型: adult, youth, pupil"), check_type: str = Form("common", description="检错严格程度: easy, common, hard"), grade: str = Form("middle", description="学段: junior, middle, senior"), audio_file: UploadFile = File(...)): """ 语音评测接口 - 支持中文和英文篇章朗读判分 """ try: # 验证语言参数 if language not in ["chinese", "english"]: raise HTTPException(status_code=400, detail="language参数必须是'chinese'或'english'") # 新增参数验证 if check_type not in ["easy", "common", "hard"]: raise HTTPException(status_code=400, detail="check_type参数必须是'easy'、'common'或'hard'") if grade not in ["junior", "middle", "senior"]: raise HTTPException(status_code=400, detail="grade参数必须是'junior'、'middle'或'senior'") # 验证群体参数 if group not in ["adult", "youth", "pupil"]: raise HTTPException(status_code=400, detail="group参数必须是'adult', 'youth'或'pupil'") # 创建临时文件保存上传的音频 # 修改临时文件处理逻辑 import os import uuid # 创建持久化保存目录 output_dir = os.path.join(os.path.dirname(__file__), '../static/audio') os.makedirs(output_dir, exist_ok=True) # 生成唯一文件名 temp_audio_path = os.path.join(output_dir, f"eval_audio_{uuid.uuid4()}.wav") with open(temp_audio_path, 'wb') as temp_audio: # 先读取音频内容 audio_content = await audio_file.read() # 再进行格式转换 import wave with wave.open(temp_audio, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(audio_content) # 创建评测器实例 evaluator = XunFeiAudioEvaluator( appid=XUNFEI_CONFIG["appid"], # 与AudioEvaluator示例用法保持一致 api_key=XUNFEI_CONFIG["api_key"], api_secret=XUNFEI_CONFIG["api_secret"], audio_file=temp_audio_path, language=language # 添加语言参数 ) # 根据语言设置不同的评测参数 if language == "chinese": # 中文评测配置 evaluator.business_params = { "category": "read_chapter", "ent": "cn_vip", "group": group, "check_type": check_type, "grade": grade, } else: # 英文评测配置 evaluator.business_params = { "category": "read_chapter", "ent": "en_vip", "group": group, # 添加缺失参数 "check_type": check_type, # 添加缺失参数 "grade": grade, # 添加缺失参数 } # 运行评测 from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) results, eval_time = await asyncio.get_event_loop().run_in_executor( executor, evaluator.run_evaluation ) # 清理临时文件 - 注释掉此行以便保留文件 # os.unlink(temp_audio_path) # 生成评测ID evaluation_id = str(uuid.uuid4()) return AudioEvaluationResponse( evaluation_id=evaluation_id, status="success", results=results, evaluation_time=eval_time.total_seconds() if eval_time else None ) except Exception as e: logger.error(f"语音评测失败: {str(e)}") # 确保临时文件被清理 if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.unlink(temp_audio_path) return AudioEvaluationResponse( evaluation_id=str(uuid.uuid4()), status="error", error_message=f"评测失败: {str(e)}" ) @router.get("/evaluation-result/{evaluation_id}") async def get_evaluation_result(evaluation_id: str): """ 获取评测结果(示例接口,实际需要实现结果存储) """ # 这里需要实现从数据库或缓存中获取评测结果 # 目前返回示例数据 return { "evaluation_id": evaluation_id, "status": "completed", "message": "请实现结果存储逻辑" } @router.websocket("/xunfei/streaming-evaluate") async def streaming_evaluate(websocket: WebSocket): await websocket.accept() logger.info("讯飞语音评测WebSocket连接已建立") # 生成唯一音频文件名 output_dir = os.path.join(os.path.dirname(__file__), '../static/audio') os.makedirs(output_dir, exist_ok=True) temp_audio_path = os.path.join(output_dir, f"eval_audio_{uuid.uuid4()}.wav") try: # 初始化WAV文件 with wave.open(temp_audio_path, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) # 持续接收音频流 while True: data = await websocket.receive_bytes() if not data: # 结束标记 break wf.writeframes(data) # 执行评测(复用现有逻辑) evaluator = XunFeiAudioEvaluator( appid=XUNFEI_CONFIG["appid"], api_key=XUNFEI_CONFIG["api_key"], api_secret=XUNFEI_CONFIG["api_secret"], audio_file=temp_audio_path, language=await websocket.receive_json().get("language", "chinese") ) # 根据语言设置不同的评测参数 if language == "chinese": # 中文评测配置 evaluator.business_params = { "category": "read_chapter", "ent": "cn_vip", "group": group, "check_type": check_type, "grade": grade, } else: # 英文评测配置 evaluator.business_params = { "category": "read_chapter", "ent": "en_vip", "group": group, # 添加缺失参数 "check_type": check_type, # 添加缺失参数 "grade": grade, # 添加缺失参数 } # 运行评测 from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) results, eval_time = await asyncio.get_event_loop().run_in_executor( executor, evaluator.run_evaluation ) # 清理临时文件 - 注释掉此行以便保留文件 # os.unlink(temp_audio_path) # 生成评测ID evaluation_id = str(uuid.uuid4()) return AudioEvaluationResponse( evaluation_id=evaluation_id, status="success", results=results, evaluation_time=eval_time.total_seconds() if eval_time else None ) except Exception as e: logger.error(f"评测失败: {str(e)}") await websocket.send_json({ "status": "error", "message": str(e) }) await websocket.close()