Files
dsProject/dsLightRag/KeDaXunFei/XunFeiAudioEvaluator_cn.py
2025-09-06 10:29:48 +08:00

335 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import datetime
import hashlib
import hmac
import json
import ssl
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator_cn:
"""讯飞语音评测类(中文专用)"""
def __init__(self, appid, api_key, api_secret, audio_file, txt):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
self.audio_file = audio_file
self.txt = txt.strip() # 确保文本预处理
self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise"
self.evaluation_results = {} # 统一结果存储变量
self.websocket_url = ""
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
try:
data = json.loads(message)
if data.get('code') != 0:
self.results = {'error': data.get('message', 'Unknown error')}
return
# 修复移除状态码为2的硬性检查只要有数据就解析
inner_data = data.get('data', {})
xml_b64 = inner_data.get('data', '')
if xml_b64:
xml_data = base64.b64decode(xml_b64)
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content) # 确保中文也能进入解析
except Exception as e:
print(f"Error processing message: {e}")
self.results = {'error': str(e)}
# 安全获取data字段
response_data = data.get("data", {})
status = response_data.get("status")
if status == 2 and response_data.get("data"):
# 解析评测结果
xml_data = base64.b64decode(response_data["data"])
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content)
ws.close()
def on_error(self, ws, error):
"""错误处理"""
print(f"Error: {error},{ws}")
def on_close(self, ws, reason, res):
"""连接关闭处理"""
print(f"WebSocket connection closed,{ws}")
def on_open(self, ws):
"""连接建立处理(中文专用配置)"""
print(f"WebSocket connection opened,{ws},ws连接建立成功...")
# 发送初始参数(中文专用配置)
send_dict = {
"common": {
"app_id": self.appid
},
"business": {
"category": "read_sentence",
"rstcd": "utf8",
"sub": "ise",
"group": "pupil", # 根据文档设置为小学组
"ent": "cn_vip", # 固定中文引擎
"tte": "utf-8",
"cmd": "ssb",
"auf": "audio/L16;rate=16000", # 修复参数格式错误
"aue": "raw", # 中文推荐使用raw格式
"text": '\uFEFF' + "[content]\n" + self.txt, # 确保UTF8 BOM
"extra_ability": "multi_dimension" # 启用多维度评分
},
"data": {
"status": 0,
"data": ""
}
}
ws.send(json.dumps(send_dict))
# 发送音频数据
with open(self.audio_file, "rb") as file_flag:
total_sent = 0
while True:
buffer = file_flag.read(1280)
if not buffer:
# 发送最后一帧(仅在有数据发送时)
if total_sent > 0:
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"data": "" # 空字符串表示结束
}
}
ws.send(json.dumps(my_dict))
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
"cmd": "auw",
"aus": 1,
"aue": "lame"
},
"data": {
"status": 1,
"data": str(base64.b64encode(buffer).decode()),
"data_type": 1,
"encoding": "raw"
}
}
ws.send(json.dumps(send_dict))
total_sent += len(buffer)
time.sleep(0.04)
def parse_evaluation_results(self, xml_content):
"""解析中文评测XML结果仅处理read_sentence节点"""
try:
root = ET.fromstring(xml_content)
# 中文专用仅查找read_sentence节点
read_sentence = root.find('.//read_sentence')
if read_sentence is not None:
# 中文评分字段映射
self.evaluation_results = {
'total_score': float(read_sentence.get('total_score', 0)),
'accuracy_score': float(read_sentence.get('accuracy_score', 0)),
'fluency_score': float(read_sentence.get('fluency_score', 0)),
'integrity_score': float(read_sentence.get('integrity_score', 0)),
'tone_score': float(read_sentence.get('tone_score', 0)), # 中文特有调型分
'is_rejected': read_sentence.get('is_rejected', 'false') == 'true'
}
# 提取句子级得分(中文特有)
sentences = []
for sent in read_sentence.findall('.//sentence'):
sentences.append({
'content': sent.get('content', ''),
'total_score': float(sent.get('total_score', 0)),
'fluency_score': float(sent.get('fluency_score', 0))
})
self.evaluation_results['sentences'] = sentences
print(f"中文评测结果解析成功: {self.evaluation_results}")
else:
print(f"未找到中文评分节点: {xml_content[:200]}") # 打印前200字符调试
self.evaluation_results = {'error': '未找到read_sentence节点'}
except ET.ParseError as e:
print(f"XML解析失败: {e}, 内容: {xml_content[:200]}")
self.evaluation_results = {'error': f'解析失败: {str(e)}'}
"""解析评测结果XML并提得分信息"""
try:
print(xml_content)
root = ET.fromstring(xml_content)
# 查找read_chapter节点
read_chapter = root.find('.//read_chapter')
if read_chapter is not None:
# 保持字段名一致使用completeness_score
self.evaluation_results = {
'accuracy_score': float(read_chapter.get('accuracy_score', 0)),
'fluency_score': float(read_chapter.get('fluency_score', 0)),
'completeness_score': float(read_chapter.get('integrity_score', 0)),
'standard_score': float(read_chapter.get('standard_score', 0)),
'total_score': float(read_chapter.get('total_score', 0)),
'word_count': int(read_chapter.get('word_count', 0)),
'is_rejected': read_chapter.get('is_rejected', 'false') == 'true'
}
# 提取句子级别得分
sentence = read_chapter.find('.//sentence')
if sentence is not None:
self.evaluation_results['sentence'] = {
'accuracy_score': float(sentence.get('accuracy_score', 0)),
'fluency_score': float(sentence.get('fluency_score', 0)),
'total_score': float(sentence.get('total_score', 0))
}
# 提取单词级别得分
words = []
for word in read_chapter.findall('.//word'):
word_data = {
'content': word.get('content', ''),
'total_score': float(word.get('total_score', 0)),
'dp_message': int(word.get('dp_message', 0))
}
words.append(word_data)
self.evaluation_results['words'] = words
except ET.ParseError as e:
print(f"XML解析错误: {e}")
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n"
# 修复这里使用completeness_score
summary += f"完整度得分: {self.evaluation_results.get('completeness_score', 0):.4f}\n"
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测修复SSL配置"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# 修复SSL配置启用证书验证
import certifi
ws_entity.run_forever(sslopt={
"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": certifi.where()
})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time
# 使用示例
if __name__ == '__main__':
# 中文使用示例
appid = XF_APPID
api_secret = XF_APISECRET
api_key = XF_APIKEY
audio_file = r'D:\dsWork\dsProject\dsLightRag\static\audio\test_cn.wav' # 中文测试音频
txt = "大家好。很高兴认识你们。今天天气很好。我正在使用这个工具练习中文发音。"
evaluator = XunFeiAudioEvaluator_cn(appid, api_key, api_secret, audio_file, txt)
results, eval_time = evaluator.run_evaluation()
print(evaluator.get_evaluation_summary())
# 创建评测器实例
txt="Hello everyone! Nice to meet you. Today is a beautiful day. I am learning English pronunciation with this tool."
evaluator = XunFeiAudioEvaluator_cn(appid, api_key, api_secret, audio_file, "english", txt)
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 输出评测结果摘要
print("\n" + "=" * 50)
print(evaluator.get_evaluation_summary())
print(f"总评测时间: {eval_time}")
print("=" * 50)