import base64 import datetime import hashlib import hmac import json import ssl import time import xml.etree.ElementTree as ET from datetime import datetime from time import mktime from urllib.parse import urlencode from wsgiref.handlers import format_date_time import websocket from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY class XunFeiAudioEvaluator_cn: """讯飞语音评测类(中文专用)""" def __init__(self, appid, api_key, api_secret, audio_file, txt): self.appid = appid self.api_key = api_key self.api_secret = api_secret self.audio_file = audio_file self.txt = txt.strip() # 确保文本预处理 self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise" self.evaluation_results = {} # 统一结果存储变量 self.websocket_url = "" def generate_auth_url(self): """生成鉴权URL""" now_time = datetime.now() now_date = format_date_time(mktime(now_time.timetuple())) origin_base = "host: " + "ise-api.xfyun.cn" + "\n" origin_base += "date: " + now_date + "\n" origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1" # SHA256加密 signature_sha = hmac.new( self.api_secret.encode('utf-8'), origin_base.encode('utf-8'), digestmod=hashlib.sha256 ).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"' authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') dict_data = { "authorization": authorization, "date": now_date, "host": "ise-api.xfyun.cn" } return self.host_url + '?' + urlencode(dict_data) def parse_evaluation_results(self, xml_content): """解析评测结果XML并提得分信息(整合版)""" try: root = ET.fromstring(xml_content) # 查找包含评分数据的节点(优先read_chapter,兼容read_sentence) score_nodes = root.findall('.//read_chapter') + root.findall('.//read_sentence') if not score_nodes: print("未找到评分节点") return # 优先使用第一个包含total_score的节点 target_node = None for node in score_nodes: if 'total_score' in node.attrib: target_node = node break if not target_node: print("未找到包含评分数据的有效节点") return # 提取核心评分字段 self.evaluation_results = { 'total_score': float(target_node.get('total_score', 0)), 'accuracy_score': float(target_node.get('accuracy_score', 0)), 'fluency_score': float(target_node.get('fluency_score', 0)), 'integrity_score': float(target_node.get('integrity_score', 0)), 'tone_score': float(target_node.get('tone_score', 0)), 'phone_score': float(target_node.get('phone_score', 0)), 'emotion_score': float(target_node.get('emotion_score', 0)), 'is_rejected': target_node.get('is_rejected', 'false') == 'true' } # 提取句子级得分 sentences = [] for sent in target_node.findall('.//sentence'): sentences.append({ 'content': sent.get('content', ''), 'total_score': float(sent.get('total_score', 0)), 'fluency_score': float(sent.get('fluency_score', 0)) }) self.evaluation_results['sentences'] = sentences print(f"解析成功,提取到{len(self.evaluation_results)}个评分字段") except ET.ParseError as e: print(f"XML解析失败: {e}, 内容: {xml_content[:200]}") self.evaluation_results = {'error': f'解析失败: {str(e)}'} def on_message(self, ws, message): """WebSocket消息处理""" print(f"Received message: {message}") try: data = json.loads(message) if data.get('code') != 0: self.evaluation_results = {'error': data.get('message', 'Unknown error')} return inner_data = data.get('data', {}) xml_b64 = inner_data.get('data', '') if xml_b64: xml_data = base64.b64decode(xml_b64) xml_content = xml_data.decode("utf-8") self.parse_evaluation_results(xml_content) # 直接调用整合后的解析方法 ws.close() except Exception as e: print(f"Error processing message: {e}") self.evaluation_results = {'error': str(e)} # 安全获取data字段 response_data = data.get("data", {}) status = response_data.get("status") if status == 2 and response_data.get("data"): # 解析评测结果 xml_data = base64.b64decode(response_data["data"]) xml_content = xml_data.decode("utf-8") self.parse_evaluation_results(xml_content) ws.close() def on_error(self, ws, error): """错误处理""" print(f"Error: {error},{ws}") def on_close(self, ws, reason, res): """连接关闭处理""" print(f"WebSocket connection closed,{ws}") def on_open(self, ws): """连接建立处理(中文专用配置)""" print(f"WebSocket connection opened,{ws},ws连接建立成功...") # 发送初始参数(中文专用配置) send_dict = { "common": { "app_id": self.appid }, "business": { "category": "read_chapter", "rstcd": "utf8", "sub": "ise", "group": "adult", # 根据文档设置为小学组 "ent": "cn_vip", # 固定中文引擎 "tte": "utf-8", "cmd": "ssb", #"auf": "audio/L16;rate=16000", # 修复参数格式错误 "aue": "lame",# mp3 "text": '\uFEFF' + "[content]\n" + self.txt, # 确保UTF8 BOM "extra_ability": "multi_dimension" # 启用多维度评分 }, "data": { "status": 0, } } ws.send(json.dumps(send_dict)) # 发送音频数据 with open(self.audio_file, "rb") as file_flag: total_sent = 0 while True: buffer = file_flag.read(1280) if not buffer: # 发送最后一帧(仅在有数据发送时) if total_sent > 0: my_dict = { "business": { "cmd": "auw", "aus": 4, "aue": "lame" }, "data": { "status": 2, "data": "" # 空字符串表示结束 } } ws.send(json.dumps(my_dict)) time.sleep(1) break # 发送中间音频帧 send_dict = { "business": { "cmd": "auw", "aus": 1, "aue": "lame" }, "data": { "status": 1, "data": str(base64.b64encode(buffer).decode()), "data_type": 1, "encoding": "raw" } } ws.send(json.dumps(send_dict)) total_sent += len(buffer) time.sleep(0.04) def get_evaluation_summary(self): """获取评测结果摘要""" if not self.evaluation_results: return "暂无评测结果" summary = "=== 语音评测结果摘要 ===\n" summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n" summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n" summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n" # 修复这里,使用completeness_score summary += f"完整度得分: {self.evaluation_results.get('completeness_score', 0):.4f}\n" summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n" summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n" summary += f"是否被拒绝: {'是' if self.evaluation_results.get('is_rejected', False) else '否'}\n" if 'sentence' in self.evaluation_results: sentence = self.evaluation_results['sentence'] summary += f"\n=== 句子级别得分 ===\n" summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n" summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n" summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n" if 'words' in self.evaluation_results: summary += f"\n=== 单词级别得分 ===\n" for i, word in enumerate(self.evaluation_results['words']): dp_msg = self._get_dp_message_description(word['dp_message']) summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n" return summary def _get_dp_message_description(self, dp_message): """获取dp_message描述""" descriptions = { 0: "正常", 16: "漏读", 32: "增读", 64: "回读", 128: "替换" } return descriptions.get(dp_message, f"未知({dp_message})") def run_evaluation(self): """运行评测(修复SSL配置)""" start_time = datetime.now() websocket.enableTrace(False) ws_url = self.generate_auth_url() ws_entity = websocket.WebSocketApp( ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_open ) # 修复SSL配置,启用证书验证 import certifi ws_entity.run_forever(sslopt={ "cert_reqs": ssl.CERT_REQUIRED, "ca_certs": certifi.where() }) end_time = datetime.now() evaluation_time = end_time - start_time print(f"评测耗时: {evaluation_time}") return self.evaluation_results, evaluation_time