Files
dsProject/dsLightRag/KeDaXunFei/XunFeiAudioEvaluator_cn.py
2025-09-06 17:25:52 +08:00

285 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import datetime
import hashlib
import hmac
import json
import ssl
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator_cn:
"""讯飞语音评测类(中文专用)"""
def __init__(self, appid, api_key, api_secret, audio_file, txt):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
self.audio_file = audio_file
self.txt = txt.strip() # 确保文本预处理
self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise"
self.evaluation_results = {} # 统一结果存储变量
self.websocket_url = ""
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def parse_evaluation_results(self, xml_content):
"""解析评测结果XML并提得分信息整合版"""
try:
root = ET.fromstring(xml_content)
# 查找包含评分数据的节点优先read_chapter兼容read_sentence
score_nodes = root.findall('.//read_chapter') + root.findall('.//read_sentence')
if not score_nodes:
print("未找到评分节点")
return
# 优先使用第一个包含total_score的节点
target_node = None
for node in score_nodes:
if 'total_score' in node.attrib:
target_node = node
break
if not target_node:
print("未找到包含评分数据的有效节点")
return
# 提取核心评分字段
self.evaluation_results = {
'total_score': float(target_node.get('total_score', 0)),
'accuracy_score': float(target_node.get('accuracy_score', 0)),
'fluency_score': float(target_node.get('fluency_score', 0)),
'integrity_score': float(target_node.get('integrity_score', 0)),
'tone_score': float(target_node.get('tone_score', 0)),
'phone_score': float(target_node.get('phone_score', 0)),
'emotion_score': float(target_node.get('emotion_score', 0)),
'is_rejected': target_node.get('is_rejected', 'false') == 'true'
}
# 提取句子级得分
sentences = []
for sent in target_node.findall('.//sentence'):
sentences.append({
'content': sent.get('content', ''),
'total_score': float(sent.get('total_score', 0)),
'fluency_score': float(sent.get('fluency_score', 0))
})
self.evaluation_results['sentences'] = sentences
print(f"解析成功,提取到{len(self.evaluation_results)}个评分字段")
except ET.ParseError as e:
print(f"XML解析失败: {e}, 内容: {xml_content[:200]}")
self.evaluation_results = {'error': f'解析失败: {str(e)}'}
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
try:
data = json.loads(message)
if data.get('code') != 0:
self.evaluation_results = {'error': data.get('message', 'Unknown error')}
return
inner_data = data.get('data', {})
xml_b64 = inner_data.get('data', '')
if xml_b64:
xml_data = base64.b64decode(xml_b64)
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content) # 直接调用整合后的解析方法
ws.close()
except Exception as e:
print(f"Error processing message: {e}")
self.evaluation_results = {'error': str(e)}
# 安全获取data字段
response_data = data.get("data", {})
status = response_data.get("status")
if status == 2 and response_data.get("data"):
# 解析评测结果
xml_data = base64.b64decode(response_data["data"])
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content)
ws.close()
def on_error(self, ws, error):
"""错误处理"""
print(f"Error: {error},{ws}")
def on_close(self, ws, reason, res):
"""连接关闭处理"""
print(f"WebSocket connection closed,{ws}")
def on_open(self, ws):
"""连接建立处理(中文专用配置)"""
print(f"WebSocket connection opened,{ws},ws连接建立成功...")
# 发送初始参数(中文专用配置)
send_dict = {
"common": {
"app_id": self.appid
},
"business": {
"category": "read_chapter",
"rstcd": "utf8",
"sub": "ise",
"group": "adult", # 根据文档设置为小学组
"ent": "cn_vip", # 固定中文引擎
"tte": "utf-8",
"cmd": "ssb",
#"auf": "audio/L16;rate=16000", # 修复参数格式错误
"aue": "lame",# mp3
"text": '\uFEFF' + "[content]\n" + self.txt, # 确保UTF8 BOM
"extra_ability": "multi_dimension" # 启用多维度评分
},
"data": {
"status": 0,
}
}
ws.send(json.dumps(send_dict))
# 发送音频数据
with open(self.audio_file, "rb") as file_flag:
total_sent = 0
while True:
buffer = file_flag.read(1280)
if not buffer:
# 发送最后一帧(仅在有数据发送时)
if total_sent > 0:
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"data": "" # 空字符串表示结束
}
}
ws.send(json.dumps(my_dict))
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
"cmd": "auw",
"aus": 1,
"aue": "lame"
},
"data": {
"status": 1,
"data": str(base64.b64encode(buffer).decode()),
"data_type": 1,
"encoding": "raw"
}
}
ws.send(json.dumps(send_dict))
total_sent += len(buffer)
time.sleep(0.04)
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n"
# 修复这里使用completeness_score
summary += f"完整度得分: {self.evaluation_results.get('completeness_score', 0):.4f}\n"
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测修复SSL配置"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# 修复SSL配置启用证书验证
import certifi
ws_entity.run_forever(sslopt={
"cert_reqs": ssl.CERT_REQUIRED,
"ca_certs": certifi.where()
})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time