This commit is contained in:
2025-09-05 20:28:18 +08:00
parent e7aed0ecef
commit d8ecfd0d91
10 changed files with 1032 additions and 0 deletions

BIN
dsLightRag/KeDaXunFei/1.mp3 Normal file

Binary file not shown.

View File

@@ -0,0 +1,269 @@
import base64
import datetime
import hashlib
import hmac
import json
import ssl
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
class XunFeiAudioEvaluator:
"""讯飞语音评测类"""
def __init__(self, appid, api_key, api_secret, audio_file):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
self.audio_file = audio_file
self.host_url = "ws://ise-api.xfyun.cn/v2/open-ise"
self.websocket_url = ""
self.evaluation_results = {}
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
data = json.loads(message)
status = data["data"]["status"]
if status == 2:
# 解析评测结果
xml_data = base64.b64decode(data["data"]["data"])
xml_content = xml_data.decode("utf-8")
#print(xml_content)
# 解析XML并提取得分信息
self.parse_evaluation_results(xml_content)
ws.close()
def on_error(self, ws, error):
"""错误处理"""
print(f"Error: {error},{ws}")
def on_close(self, ws, reason, res):
"""连接关闭处理"""
print(f"WebSocket connection closed,{ws}")
def on_open(self, ws):
"""连接建立处理"""
print(f"WebSocket connection opened,{ws},ws连接建立成功...")
# 发送初始参数
send_dict = {
"common": {
"app_id": self.appid
},
"business": {
"category": "read_sentence",
"rstcd": "utf8",
"sub": "ise",
"group": "pupil",
"ent": "en_vip",
"tte": "utf-8",
"cmd": "ssb",
"auf": "audio/L16;rate=16000",
"aue": "lame",
"text": '\uFEFF' + "[content]\nnice to meet you."
},
"data": {
"status": 0,
"data": ""
}
}
ws.send(json.dumps(send_dict))
# 发送音频数据
with open(self.audio_file, "rb") as file_flag:
while True:
buffer = file_flag.read(1280)
if not buffer:
# 发送最后一帧
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"data": str(base64.b64encode(buffer).decode())
}
}
ws.send(json.dumps(my_dict))
#print("发送最后一帧")
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
"cmd": "auw",
"aus": 1,
"aue": "lame"
},
"data": {
"status": 1,
"data": str(base64.b64encode(buffer).decode()),
"data_type": 1,
"encoding": "raw"
}
}
ws.send(json.dumps(send_dict))
time.sleep(0.04)
def parse_evaluation_results(self, xml_content):
"""解析评测结果XML并提取得分信息"""
try:
root = ET.fromstring(xml_content)
# 查找read_chapter节点
read_chapter = root.find('.//read_chapter')
if read_chapter is not None:
self.evaluation_results = {
'accuracy_score': float(read_chapter.get('accuracy_score', 0)),
'fluency_score': float(read_chapter.get('fluency_score', 0)),
'integrity_score': float(read_chapter.get('integrity_score', 0)),
'standard_score': float(read_chapter.get('standard_score', 0)),
'total_score': float(read_chapter.get('total_score', 0)),
'word_count': int(read_chapter.get('word_count', 0)),
'is_rejected': read_chapter.get('is_rejected', 'false') == 'true'
}
# 提取句子级别得分
sentence = read_chapter.find('.//sentence')
if sentence is not None:
self.evaluation_results['sentence'] = {
'accuracy_score': float(sentence.get('accuracy_score', 0)),
'fluency_score': float(sentence.get('fluency_score', 0)),
'total_score': float(sentence.get('total_score', 0))
}
# 提取单词级别得分
words = []
for word in read_chapter.findall('.//word'):
word_data = {
'content': word.get('content', ''),
'total_score': float(word.get('total_score', 0)),
'dp_message': int(word.get('dp_message', 0))
}
words.append(word_data)
self.evaluation_results['words'] = words
except ET.ParseError as e:
print(f"XML解析错误: {e}")
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n"
summary += f"完整度得分: {self.evaluation_results.get('integrity_score', 0):.4f}\n"
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i+1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
ws_entity.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time
# 使用示例
if __name__ == '__main__':
# 配置参数
appid = "5b83f8d6"
api_secret = "604fa6cb9c5ab664a0d153fe0ccc6802"
api_key = "5beb887923204000bfcb402046bb05a6"
audio_file = "./1.mp3"
# 创建评测器实例
evaluator = XunFeiAudioEvaluator(appid, api_key, api_secret, audio_file)
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 输出评测结果摘要
print("\n" + "="*50)
print(evaluator.get_evaluation_summary())
print(f"总评测时间: {eval_time}")
print("="*50)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,8 @@
Websocket服务接口认证信息
APPID
5b83f8d6
APISecret
604fa6cb9c5ab664a0d153fe0ccc6802
APIKey
5beb887923204000bfcb402046bb05a6
*SDK调用方式只需APPID。APIKey或APISecret适用于WebAPI调用方式。

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,140 @@
import logging
import os
import uuid
import time
from fastapi import APIRouter, HTTPException, BackgroundTasks, Query, UploadFile, File
from pydantic import BaseModel
from typing import Optional
import tempfile
from Util.ObsUtil import ObsUploader
from Config.Config import OBS_BUCKET, OBS_SERVER
from fastapi.responses import StreamingResponse
import requests
# 导入讯飞语音评测类
from KeDaXunFei.XunFeiAudioEvaluator import XunFeiAudioEvaluator
# 配置日志
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/xunFei", tags=["讯飞"])
# 请求模型
class AudioEvaluationRequest(BaseModel):
language: str = "chinese" # chinese 或 english
text: str # 评测文本内容
group: Optional[str] = "adult" # 群体adult, youth, pupil
check_type: Optional[str] = "common" # 检错严格程度easy, common, hard
grade: Optional[str] = "middle" # 学段junior, middle, senior
# 响应模型
class AudioEvaluationResponse(BaseModel):
evaluation_id: str
status: str
results: Optional[dict] = None
evaluation_time: Optional[float] = None
error_message: Optional[str] = None
# 科大讯飞API配置需要根据实际情况配置
XUNFEI_CONFIG = {
"appid": "your_appid_here",
"api_key": "your_api_key_here",
"api_secret": "your_api_secret_here"
}
@router.post("/evaluate-audio", response_model=AudioEvaluationResponse)
async def evaluate_audio(
background_tasks: BackgroundTasks,
language: str = Query("chinese", description="评测语言: chinese 或 english"),
text: str = Query(..., description="评测文本内容"),
group: str = Query("adult", description="群体类型: adult, youth, pupil"),
check_type: str = Query("common", description="检错严格程度: easy, common, hard"),
grade: str = Query("middle", description="学段: junior, middle, senior"),
audio_file: UploadFile = File(...)):
"""
语音评测接口 - 支持中文和英文篇章朗读判分
"""
try:
# 验证语言参数
if language not in ["chinese", "english"]:
raise HTTPException(status_code=400, detail="language参数必须是'chinese''english'")
# 验证群体参数
if group not in ["adult", "youth", "pupil"]:
raise HTTPException(status_code=400, detail="group参数必须是'adult', 'youth''pupil'")
# 创建临时文件保存上传的音频
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio:
audio_content = await audio_file.read()
temp_audio.write(audio_content)
temp_audio_path = temp_audio.name
# 创建评测器实例
evaluator = XunFeiAudioEvaluator(
appid=XUNFEI_CONFIG["appid"],
api_key=XUNFEI_CONFIG["api_key"],
api_secret=XUNFEI_CONFIG["api_secret"],
audio_file=temp_audio_path
)
# 根据语言设置不同的评测参数
if language == "chinese":
# 中文评测配置
evaluator.business_params = {
"category": "read_chapter",
"ent": "cn_vip",
"group": group,
"check_type": check_type,
"grade": grade,
"text": '\uFEFF' + f"[content]\n{text}"
}
else:
# 英文评测配置
evaluator.business_params = {
"category": "read_chapter",
"ent": "en_vip",
"text": '\uFEFF' + f"[content]\n{text}"
}
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 清理临时文件
os.unlink(temp_audio_path)
# 生成评测ID
evaluation_id = str(uuid.uuid4())
return AudioEvaluationResponse(
evaluation_id=evaluation_id,
status="success",
results=results,
evaluation_time=eval_time.total_seconds() if eval_time else None
)
except Exception as e:
logger.error(f"语音评测失败: {str(e)}")
# 确保临时文件被清理
if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path):
os.unlink(temp_audio_path)
return AudioEvaluationResponse(
evaluation_id=str(uuid.uuid4()),
status="error",
error_message=f"评测失败: {str(e)}"
)
@router.get("/evaluation-result/{evaluation_id}")
async def get_evaluation_result(evaluation_id: str):
"""
获取评测结果(示例接口,实际需要实现结果存储)
"""
# 这里需要实现从数据库或缓存中获取评测结果
# 目前返回示例数据
return {
"evaluation_id": evaluation_id,
"status": "completed",
"message": "请实现结果存储逻辑"
}
# 需要修改XunFeiAudioEvaluator类以支持参数配置
# 在XunFeiAudioEvaluator类中添加business_params属性并在on_open方法中使用

View File

@@ -32,6 +32,7 @@ from Routes.ttsRoute import router as tts_router
from Routes.CopyFaceRoute import router as copyFace_router
from Routes.WenShengTu import router as wenshengtu_router
from Routes.TeacherHelperRoute import router as teacherHelper_router
from Routes.XunFeiRoute import router as xunfei_router
# 控制日志输出
logger = logging.getLogger('lightrag')
logger.setLevel(logging.INFO)
@@ -90,6 +91,7 @@ app.include_router(tts_router) # 文本转语音
app.include_router(copyFace_router) # 抄脸
app.include_router(wenshengtu_router) # 文生图
app.include_router(teacherHelper_router) # 教师助手
app.include_router(xunfei_router) # 讯飞
# Teaching Model 相关路由
# 登录相关(不用登录)

View File

@@ -0,0 +1,318 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>语音评测系统</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.container {
background: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
text-align: center;
color: #333;
margin-bottom: 30px;
}
.form-group {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
color: #555;
}
select, input[type="text"] {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 16px;
}
.btn {
background-color: #007bff;
color: white;
padding: 12px 24px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
margin-right: 10px;
}
.btn:disabled {
background-color: #ccc;
cursor: not-allowed;
}
.btn-record {
background-color: #dc3545;
}
.btn-stop {
background-color: #28a745;
}
.status {
margin-top: 20px;
padding: 10px;
border-radius: 5px;
background-color: #e9ecef;
}
.result {
margin-top: 20px;
padding: 15px;
border: 1px solid #dee2e6;
border-radius: 5px;
background-color: #f8f9fa;
}
.error {
color: #dc3545;
background-color: #f8d7da;
border-color: #f5c6cb;
}
.success {
color: #155724;
background-color: #d4edda;
border-color: #c3e6cb;
}
</style>
</head>
<body>
<div class="container">
<h1>语音评测系统</h1>
<div class="form-group">
<label for="language">选择评测语言:</label>
<select id="language">
<option value="chinese">中文</option>
<option value="english">英文</option>
</select>
</div>
<div class="form-group">
<label for="text">评测文本内容:</label>
<input type="text" id="text" placeholder="请输入要朗读的文本内容">
</div>
<div class="form-group">
<button id="recordBtn" class="btn btn-record">开始录音</button>
<button id="stopBtn" class="btn btn-stop" disabled>停止录音</button>
<button id="evaluateBtn" class="btn" disabled>提交评测</button>
</div>
<div id="status" class="status">准备就绪</div>
<div id="result" class="result" style="display: none;">
<h3>评测结果</h3>
<div id="resultContent"></div>
</div>
</div>
<script>
let mediaRecorder;
let audioChunks = [];
let audioBlob;
// 获取DOM元素
const languageSelect = document.getElementById('language');
const textInput = document.getElementById('text');
const recordBtn = document.getElementById('recordBtn');
const stopBtn = document.getElementById('stopBtn');
const evaluateBtn = document.getElementById('evaluateBtn');
const statusDiv = document.getElementById('status');
const resultDiv = document.getElementById('result');
const resultContent = document.getElementById('resultContent');
// 语言切换时自动填充示例文本
languageSelect.addEventListener('change', () => {
if (languageSelect.value === 'chinese') {
textInput.value = '窗前明月光,疑是地上霜。';
} else {
textInput.value = 'Nice to meet you.';
}
});
// 页面加载时自动填充中文示例
window.addEventListener('load', () => {
textInput.value = '窗前明月光,疑是地上霜。';
});
// 开始录音
recordBtn.addEventListener('click', async () => {
try {
statusDiv.textContent = '正在获取麦克风权限...';
statusDiv.className = 'status';
// 使用更明确的提示并添加详细的错误处理
const stream = await navigator.mediaDevices.getUserMedia({ audio: true })
.catch(err => {
// 根据不同的错误类型提供更具体的提示
if (err.name === 'NotAllowedError') {
throw new Error('用户拒绝了麦克风访问权限。请在浏览器设置中允许麦克风访问,或刷新页面重试。');
} else if (err.name === 'NotFoundError') {
throw new Error('未检测到麦克风设备,请确保您的设备已正确连接麦克风。');
} else if (err.name === 'NotReadableError') {
throw new Error('麦克风设备正在被其他程序占用,请关闭其他可能使用麦克风的程序后重试。');
} else {
throw err;
}
});
mediaRecorder = new MediaRecorder(stream);
audioChunks = [];
mediaRecorder.ondataavailable = (event) => {
audioChunks.push(event.data);
};
mediaRecorder.onstop = () => {
audioBlob = new Blob(audioChunks, { type: 'audio/webm' });
statusDiv.textContent = '录音完成,可以提交评测';
evaluateBtn.disabled = false;
};
// 添加录音最大时长限制60秒
setTimeout(() => {
if (mediaRecorder && mediaRecorder.state === 'recording') {
alert('已达到最大录音时长60秒');
mediaRecorder.stop();
mediaRecorder.stream.getTracks().forEach(track => track.stop());
recordBtn.disabled = false;
stopBtn.disabled = true;
}
}, 60000);
mediaRecorder.start();
statusDiv.textContent = '正在录音...最多60秒';
recordBtn.disabled = true;
stopBtn.disabled = false;
} catch (error) {
console.error('录音失败:', error);
// 使用更明显的错误提示方式
alert('录音失败: ' + error.message);
statusDiv.textContent = '录音失败: ' + error.message;
statusDiv.className = 'status error';
// 确保按钮状态正确重置
recordBtn.disabled = false;
stopBtn.disabled = true;
}
});
// 停止录音
stopBtn.addEventListener('click', () => {
if (mediaRecorder && mediaRecorder.state === 'recording') {
mediaRecorder.stop();
mediaRecorder.stream.getTracks().forEach(track => track.stop());
recordBtn.disabled = false;
stopBtn.disabled = true;
}
});
// 提交评测
evaluateBtn.addEventListener('click', async () => {
if (!audioBlob) {
statusDiv.textContent = '请先完成录音';
statusDiv.className = 'status error';
return;
}
if (!textInput.value.trim()) {
statusDiv.textContent = '请输入评测文本内容';
statusDiv.className = 'status error';
return;
}
try {
statusDiv.textContent = '正在提交评测...';
statusDiv.className = 'status';
evaluateBtn.disabled = true;
const formData = new FormData();
formData.append('audio_file', audioBlob, 'recording.webm');
formData.append('language', languageSelect.value);
formData.append('text', textInput.value.trim());
formData.append('group', 'adult');
formData.append('check_type', 'common');
formData.append('grade', 'middle');
const response = await fetch('/api/xunFei/evaluate-audio', {
method: 'POST',
body: formData
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
if (result.status === 'success') {
displayResults(result.results);
statusDiv.textContent = '评测成功完成';
statusDiv.className = 'status success';
} else {
throw new Error(result.error_message || '评测失败');
}
} catch (error) {
console.error('评测失败:', error);
statusDiv.textContent = '评测失败: ' + error.message;
statusDiv.className = 'status error';
} finally {
evaluateBtn.disabled = false;
}
});
// 显示评测结果
function displayResults(results) {
resultDiv.style.display = 'block';
if (!results) {
resultContent.innerHTML = '<p>暂无评测结果</p>';
return;
}
let html = '<div class="result-summary">';
// 显示总分
if (results.total_score !== undefined) {
html += `<p><strong>总分:</strong> ${results.total_score.toFixed(4)} / 5.0</p>`;
}
// 显示各项分数
if (results.accuracy_score !== undefined) {
html += `<p><strong>准确度:</strong> ${results.accuracy_score.toFixed(4)}</p>`;
}
if (results.fluency_score !== undefined) {
html += `<p><strong>流利度:</strong> ${results.fluency_score.toFixed(4)}</p>`;
}
if (results.completeness_score !== undefined) {
html += `<p><strong>完整度:</strong> ${results.completeness_score.toFixed(4)}</p>`;
}
html += '</div>';
// 显示单词级评分(如果有)
if (results.words && results.words.length > 0) {
html += '<div class="word-scores"><h4>单词评分:</h4><ul>';
results.words.forEach(word => {
html += `<li>${word.content}: ${word.score.toFixed(4)}</li>`;
});
html += '</ul></div>';
}
resultContent.innerHTML = html;
}
</script>
</body>
</html>