Files
dsProject/dsLightRag/KeDaXunFei/XunFeiAudioEvaluator_cn.py
2025-09-06 10:15:18 +08:00

314 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import datetime
import hashlib
import hmac
import json
import ssl
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator_cn:
"""讯飞语音评测类"""
def __init__(self, appid, api_key, api_secret, audio_file, language, txt):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
self.audio_file = audio_file
self.language = language
self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise"
self.websocket_url = ""
self.evaluation_results = {}
self.txt = txt
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
try:
data = json.loads(message)
if data.get('code') != 0:
self.results = {'error': data.get('message', 'Unknown error')}
return
# 修复移除状态码为2的硬性检查只要有数据就解析
inner_data = data.get('data', {})
xml_b64 = inner_data.get('data', '')
if xml_b64:
xml_data = base64.b64decode(xml_b64)
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content) # 确保中文也能进入解析
except Exception as e:
print(f"Error processing message: {e}")
self.results = {'error': str(e)}
# 安全获取data字段
response_data = data.get("data", {})
status = response_data.get("status")
if status == 2 and response_data.get("data"):
# 解析评测结果
xml_data = base64.b64decode(response_data["data"])
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content)
ws.close()
def on_error(self, ws, error):
"""错误处理"""
print(f"Error: {error},{ws}")
def on_close(self, ws, reason, res):
"""连接关闭处理"""
print(f"WebSocket connection closed,{ws}")
def on_open(self, ws):
"""连接建立处理"""
print(f"WebSocket connection opened,{ws},ws连接建立成功...")
# 发送初始参数
send_dict = {
"common": {
"app_id": self.appid
},
"business": {
"category": "read_sentence",
"rstcd": "utf8",
"sub": "ise",
"group": "pupil",
"ent": "cn_vip" if self.language == "chinese" else "en_vip",
"tte": "utf-8",
"cmd": "ssb",
"auf": "audio/L16;rate=16000",
"aue": "lame",
"text": '\uFEFF' + "[content]\n" + self.txt
},
"data": {
"status": 0,
"data": ""
}
}
ws.send(json.dumps(send_dict))
# 发送音频数据
with open(self.audio_file, "rb") as file_flag:
total_sent = 0
while True:
buffer = file_flag.read(1280)
if not buffer:
# 发送最后一帧(仅在有数据发送时)
if total_sent > 0:
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"data": "" # 空字符串表示结束
}
}
ws.send(json.dumps(my_dict))
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
"cmd": "auw",
"aus": 1,
"aue": "lame"
},
"data": {
"status": 1,
"data": str(base64.b64encode(buffer).decode()),
"data_type": 1,
"encoding": "raw"
}
}
ws.send(json.dumps(send_dict))
total_sent += len(buffer)
time.sleep(0.04)
def parse_evaluation_results(self, xml_content):
root = ET.fromstring(xml_content)
# 兼容中英文不同的根节点
read_node = root.find('.//read_sentence') or root.find('.//read_chapter')
if read_node is not None:
self.results = {
'total_score': read_node.attrib.get('total_score', '0'),
'accuracy': read_node.attrib.get('accuracy_score', '0'),
'fluency': read_node.attrib.get('fluency_score', '0'),
'integrity': read_node.attrib.get('integrity_score', '0')
}
else:
# 打印未匹配的XML结构用于调试
print(f"No evaluation nodes found in XML: {xml_content}")
self.results = {'error': '未找到有效评分节点'}
"""解析评测结果XML并提取得分信息"""
try:
print(xml_content)
root = ET.fromstring(xml_content)
# 查找read_chapter节点
read_chapter = root.find('.//read_chapter')
if read_chapter is not None:
# 保持字段名一致使用completeness_score
self.evaluation_results = {
'accuracy_score': float(read_chapter.get('accuracy_score', 0)),
'fluency_score': float(read_chapter.get('fluency_score', 0)),
'completeness_score': float(read_chapter.get('integrity_score', 0)),
'standard_score': float(read_chapter.get('standard_score', 0)),
'total_score': float(read_chapter.get('total_score', 0)),
'word_count': int(read_chapter.get('word_count', 0)),
'is_rejected': read_chapter.get('is_rejected', 'false') == 'true'
}
# 提取句子级别得分
sentence = read_chapter.find('.//sentence')
if sentence is not None:
self.evaluation_results['sentence'] = {
'accuracy_score': float(sentence.get('accuracy_score', 0)),
'fluency_score': float(sentence.get('fluency_score', 0)),
'total_score': float(sentence.get('total_score', 0))
}
# 提取单词级别得分
words = []
for word in read_chapter.findall('.//word'):
word_data = {
'content': word.get('content', ''),
'total_score': float(word.get('total_score', 0)),
'dp_message': int(word.get('dp_message', 0))
}
words.append(word_data)
self.evaluation_results['words'] = words
except ET.ParseError as e:
print(f"XML解析错误: {e}")
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n"
# 修复这里使用completeness_score
summary += f"完整度得分: {self.evaluation_results.get('completeness_score', 0):.4f}\n"
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
ws_entity.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time
# 使用示例
if __name__ == '__main__':
# 配置参数
# appid = "5b83f8d6"
# api_secret = "604fa6cb9c5ab664a0d153fe0ccc6802"
# api_key = "5beb887923204000bfcb402046bb05a6"
appid = XF_APPID
api_secret = XF_APISECRET
api_key = XF_APIKEY
# audio_file = "./1.mp3"
audio_file = r'D:\dsWork\dsProject\dsLightRag\static\audio\audio_afc0a96e382c428cba2f00e3f71e4e8f.mp3'
# 创建评测器实例
txt="Hello everyone! Nice to meet you. Today is a beautiful day. I am learning English pronunciation with this tool."
evaluator = XunFeiAudioEvaluator_cn(appid, api_key, api_secret, audio_file, "english", txt)
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 输出评测结果摘要
print("\n" + "=" * 50)
print(evaluator.get_evaluation_summary())
print(f"总评测时间: {eval_time}")
print("=" * 50)