import logging import os import tempfile from datetime import datetime from fastapi import APIRouter, Request, File, UploadFile from fastapi.responses import JSONResponse # 创建路由路由器 router = APIRouter(prefix="/api", tags=["学伴"]) # 配置日志 logger = logging.getLogger(__name__) # 导入学伴工具函数 from Util.XueBanUtil import get_xueban_response_async @router.post("/xueban/upload-audio") async def upload_audio(file: UploadFile = File(...)): """ 上传音频文件并进行ASR处理 - 参数: file - 音频文件 - 返回: JSON包含识别结果 """ try: # 记录日志 logger.info(f"接收到音频文件: {file.filename}") # 保存临时文件 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") file_ext = os.path.splitext(file.filename)[1] temp_file_name = f"temp_audio_{timestamp}{file_ext}" temp_file_path = os.path.join(tempfile.gettempdir(), temp_file_name) with open(temp_file_path, "wb") as f: content = await file.read() f.write(content) logger.info(f"音频文件已保存至临时目录: {temp_file_path}") # 这里应该调用ASR服务进行处理 # 示例:asr_result = await process_asr(temp_file_path) # 为了演示,这里返回模拟结果 asr_result = { "text": "这是一段测试音频的识别结果", "confidence": 0.95, "audio_duration": len(content) / 1024 / 16 # 估算音频时长 } # 删除临时文件 os.remove(temp_file_path) logger.info(f"临时文件已删除: {temp_file_path}") # 返回识别结果 return JSONResponse(content={ "success": True, "message": "音频识别成功", "data": asr_result }) except Exception as e: logger.error(f"音频处理失败: {str(e)}") return JSONResponse(content={ "success": False, "message": f"音频处理失败: {str(e)}" }, status_code=500) # 实际应用中,您需要实现ASR处理函数 async def process_asr(audio_path: str) -> dict: """ 调用ASR服务处理音频文件 :param audio_path: 音频文件路径 :return: 识别结果字典 """ # 这里应该集成实际的ASR服务 # 例如百度AI、阿里云、讯飞等ASR服务 # 或者本地的ASR模型 pass @router.post("/xueban/chat") async def chat_with_xueban(request: Request): """ 与学伴大模型聊天的接口 - 参数: request body 中的 query_text (用户查询文本) - 返回: JSON包含聊天响应 """ try: # 获取请求体数据 data = await request.json() query_text = data.get("query_text", "") if not query_text.strip(): return JSONResponse(content={ "success": False, "message": "查询文本不能为空" }, status_code=400) # 记录日志 logger.info(f"接收到学伴聊天请求: {query_text}") # 调用异步接口获取学伴响应 response_content = [] async for chunk in get_xueban_response_async(query_text, stream=True): response_content.append(chunk) full_response = "".join(response_content) # 返回响应 return JSONResponse(content={ "success": True, "message": "聊天成功", "data": { "response": full_response } }) except Exception as e: logger.error(f"学伴聊天失败: {str(e)}") return JSONResponse(content={ "success": False, "message": f"聊天处理失败: {str(e)}" }, status_code=500)