This commit is contained in:
2025-09-06 08:56:25 +08:00
parent ada13af11e
commit 93ccd0e7f5
7 changed files with 60 additions and 53 deletions

View File

@@ -18,8 +18,8 @@ from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator:
"""讯飞语音评测类"""
def __init__(self, appid, api_key, api_secret, audio_file, language):
def __init__(self, appid, api_key, api_secret, audio_file, language, txt):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
@@ -28,50 +28,51 @@ class XunFeiAudioEvaluator:
self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise"
self.websocket_url = ""
self.evaluation_results = {}
self.txt = txt
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
data = json.loads(message)
# 检查是否存在错误码
if data.get("code") != 0:
print(f"API错误: {data.get('message', '未知错误')}")
ws.close()
return
# 安全获取data字段
response_data = data.get("data", {})
status = response_data.get("status")
if status == 2 and response_data.get("data"):
# 解析评测结果
xml_data = base64.b64decode(response_data["data"])
@@ -106,7 +107,7 @@ class XunFeiAudioEvaluator:
"cmd": "ssb",
"auf": "audio/L16;rate=16000",
"aue": "lame",
"text": '\uFEFF' + "[content]\nnice to meet you."
"text": '\uFEFF' + "[content]\n" + self.txt
},
"data": {
"status": 0,
@@ -125,19 +126,19 @@ class XunFeiAudioEvaluator:
if total_sent > 0:
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"status": 2,
"data": "" # 空字符串表示结束
}
}
ws.send(json.dumps(my_dict))
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
@@ -155,12 +156,12 @@ class XunFeiAudioEvaluator:
ws.send(json.dumps(send_dict))
total_sent += len(buffer)
time.sleep(0.04)
def parse_evaluation_results(self, xml_content):
"""解析评测结果XML并提取得分信息"""
try:
root = ET.fromstring(xml_content)
# 查找read_chapter节点
read_chapter = root.find('.//read_chapter')
if read_chapter is not None:
@@ -174,7 +175,7 @@ class XunFeiAudioEvaluator:
'word_count': int(read_chapter.get('word_count', 0)),
'is_rejected': read_chapter.get('is_rejected', 'false') == 'true'
}
# 提取句子级别得分
sentence = read_chapter.find('.//sentence')
if sentence is not None:
@@ -183,7 +184,7 @@ class XunFeiAudioEvaluator:
'fluency_score': float(sentence.get('fluency_score', 0)),
'total_score': float(sentence.get('total_score', 0))
}
# 提取单词级别得分
words = []
for word in read_chapter.findall('.//word'):
@@ -193,17 +194,17 @@ class XunFeiAudioEvaluator:
'dp_message': int(word.get('dp_message', 0))
}
words.append(word_data)
self.evaluation_results['words'] = words
except ET.ParseError as e:
print(f"XML解析错误: {e}")
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
@@ -213,53 +214,53 @@ class XunFeiAudioEvaluator:
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i+1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
ws_entity.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time
@@ -272,16 +273,16 @@ if __name__ == '__main__':
appid = XF_APPID
api_secret = XF_APISECRET
api_key = XF_APIKEY
#audio_file = "./1.mp3"
audio_file=r'D:\dsWork\dsProject\dsLightRag\static\audio\audio_1f0a8c47db2d4f9ba7674055a352cab8.wav'
# audio_file = "./1.mp3"
audio_file = r'D:\dsWork\dsProject\dsLightRag\static\audio\audio_1f0a8c47db2d4f9ba7674055a352cab8.wav'
# 创建评测器实例
evaluator = XunFeiAudioEvaluator(appid, api_key, api_secret, audio_file, "english")
evaluator = XunFeiAudioEvaluator(appid, api_key, api_secret, audio_file, "english","nice to meet you.")
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 输出评测结果摘要
print("\n" + "="*50)
print("\n" + "=" * 50)
print(evaluator.get_evaluation_summary())
print(f"总评测时间: {eval_time}")
print("="*50)
print("=" * 50)

View File

@@ -4,8 +4,7 @@ import uuid
import tempfile
import shutil
import sys
import logging
from fastapi import APIRouter, UploadFile, File
from fastapi import APIRouter, UploadFile, File, Form
# 配置日志
logger = logging.getLogger(__name__)
@@ -20,7 +19,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from Config.Config import XF_APPID, XF_APIKEY, XF_APISECRET
@router.post("/save-audio")
async def save_audio(audio: UploadFile = File(...)):
async def save_audio(audio: UploadFile = File(...), txt: str = Form(...)):
"""保存音频文件并评分"""
temp_file = None
try:
@@ -45,7 +44,8 @@ async def save_audio(audio: UploadFile = File(...)):
api_key=XF_APIKEY,
api_secret=XF_APISECRET,
audio_file=temp_file,
language="english"
language="english",
txt=txt
)
results, eval_time = evaluator.run_evaluation()
print(evaluator.get_evaluation_summary())

View File

@@ -450,7 +450,11 @@
</div>
</div>
</div>
<!-- 添加英文朗读文本输入区域 -->
<div class="text-container">
<h3 style="color: #5eead4; margin-bottom: 10px;">📝 请朗读以下文本</h3>
<textarea id="readingText" rows="4" style="width: 100%; padding: 12px; border-radius: 8px; background: rgba(15, 23, 42, 0.7); border: 1px solid rgba(94, 234, 212, 0.2); color: #e2e8f0; font-size: 16px; resize: vertical; font-family: inherit;">Hello everyone! Nice to meet you. Today is a beautiful day. I am learning English pronunciation with this tool.</textarea>
</div>
<div class="visualizer">
<canvas id="visualizerCanvas"></canvas>
</div>
@@ -760,7 +764,9 @@
const fileName = `recording_${Date.now()}.${extensions[format]}`;
formData.append('audio', this.recordedBlob, fileName);
// 添加朗读文本到表单数据
const readingText = document.getElementById('readingText').value;
formData.append('txt', readingText);
try {
this.simulateUploadProgress();