import base64 import datetime import hashlib import hmac import json import ssl import time import xml.etree.ElementTree as ET from datetime import datetime from time import mktime from urllib.parse import urlencode from wsgiref.handlers import format_date_time import websocket from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY class XunFeiAudioEvaluator: """讯飞语音评测类""" def __init__(self, appid, api_key, api_secret, audio_file, language): self.appid = appid self.api_key = api_key self.api_secret = api_secret self.audio_file = audio_file self.language = language self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise" self.websocket_url = "" self.evaluation_results = {} def generate_auth_url(self): """生成鉴权URL""" now_time = datetime.now() now_date = format_date_time(mktime(now_time.timetuple())) origin_base = "host: " + "ise-api.xfyun.cn" + "\n" origin_base += "date: " + now_date + "\n" origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1" # SHA256加密 signature_sha = hmac.new( self.api_secret.encode('utf-8'), origin_base.encode('utf-8'), digestmod=hashlib.sha256 ).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"' authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') dict_data = { "authorization": authorization, "date": now_date, "host": "ise-api.xfyun.cn" } return self.host_url + '?' + urlencode(dict_data) def on_message(self, ws, message): """WebSocket消息处理""" print(f"Received message: {message}") data = json.loads(message) status = data["data"]["status"] if status == 2: # 解析评测结果 xml_data = base64.b64decode(data["data"]["data"]) xml_content = xml_data.decode("utf-8") #print(xml_content) # 解析XML并提取得分信息 self.parse_evaluation_results(xml_content) ws.close() def on_error(self, ws, error): """错误处理""" print(f"Error: {error},{ws}") def on_close(self, ws, reason, res): """连接关闭处理""" print(f"WebSocket connection closed,{ws}") def on_open(self, ws): """连接建立处理""" print(f"WebSocket connection opened,{ws},ws连接建立成功...") # 发送初始参数 send_dict = { "common": { "app_id": self.appid }, "business": { "category": "read_sentence", "rstcd": "utf8", "sub": "ise", "group": "pupil", "ent": "zh_cn" if self.language == "chinese" else "en_vip", "tte": "utf-8", "cmd": "ssb", "auf": "audio/L16;rate=16000", "aue": "lame", "text": '\uFEFF' + "[content]\nnice to meet you." }, "data": { "status": 0, "data": "" } } ws.send(json.dumps(send_dict)) # 发送音频数据 with open(self.audio_file, "rb") as file_flag: while True: buffer = file_flag.read(1280) if not buffer: # 发送最后一帧 my_dict = { "business": { "cmd": "auw", "aus": 4, "aue": "lame" }, "data": { "status": 2, "data": str(base64.b64encode(buffer).decode()) } } ws.send(json.dumps(my_dict)) #print("发送最后一帧") time.sleep(1) break # 发送中间音频帧 send_dict = { "business": { "cmd": "auw", "aus": 1, "aue": "lame" }, "data": { "status": 1, "data": str(base64.b64encode(buffer).decode()), "data_type": 1, "encoding": "raw" } } ws.send(json.dumps(send_dict)) time.sleep(0.04) def parse_evaluation_results(self, xml_content): """解析评测结果XML并提取得分信息""" try: root = ET.fromstring(xml_content) # 查找read_chapter节点 read_chapter = root.find('.//read_chapter') if read_chapter is not None: self.evaluation_results = { 'accuracy_score': float(read_chapter.get('accuracy_score', 0)), 'fluency_score': float(read_chapter.get('fluency_score', 0)), 'completeness_score': float(read_chapter.get('integrity_score', 0)), # 修复字段名 'standard_score': float(read_chapter.get('standard_score', 0)), 'total_score': float(read_chapter.get('total_score', 0)), 'word_count': int(read_chapter.get('word_count', 0)), 'is_rejected': read_chapter.get('is_rejected', 'false') == 'true' } # 提取句子级别得分 sentence = read_chapter.find('.//sentence') if sentence is not None: self.evaluation_results['sentence'] = { 'accuracy_score': float(sentence.get('accuracy_score', 0)), 'fluency_score': float(sentence.get('fluency_score', 0)), 'total_score': float(sentence.get('total_score', 0)) } # 提取单词级别得分 words = [] for word in read_chapter.findall('.//word'): word_data = { 'content': word.get('content', ''), 'total_score': float(word.get('total_score', 0)), 'dp_message': int(word.get('dp_message', 0)) } words.append(word_data) self.evaluation_results['words'] = words except ET.ParseError as e: print(f"XML解析错误: {e}") def get_evaluation_summary(self): """获取评测结果摘要""" if not self.evaluation_results: return "暂无评测结果" summary = "=== 语音评测结果摘要 ===\n" summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n" summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n" summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n" summary += f"完整度得分: {self.evaluation_results.get('integrity_score', 0):.4f}\n" summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n" summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n" summary += f"是否被拒绝: {'是' if self.evaluation_results.get('is_rejected', False) else '否'}\n" if 'sentence' in self.evaluation_results: sentence = self.evaluation_results['sentence'] summary += f"\n=== 句子级别得分 ===\n" summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n" summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n" summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n" if 'words' in self.evaluation_results: summary += f"\n=== 单词级别得分 ===\n" for i, word in enumerate(self.evaluation_results['words']): dp_msg = self._get_dp_message_description(word['dp_message']) summary += f"{i+1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n" return summary def _get_dp_message_description(self, dp_message): """获取dp_message描述""" descriptions = { 0: "正常", 16: "漏读", 32: "增读", 64: "回读", 128: "替换" } return descriptions.get(dp_message, f"未知({dp_message})") def run_evaluation(self): """运行评测""" start_time = datetime.now() websocket.enableTrace(False) ws_url = self.generate_auth_url() ws_entity = websocket.WebSocketApp( ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close, on_open=self.on_open ) ws_entity.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) end_time = datetime.now() evaluation_time = end_time - start_time print(f"评测耗时: {evaluation_time}") return self.evaluation_results, evaluation_time # 使用示例 if __name__ == '__main__': # 配置参数 # appid = "5b83f8d6" # api_secret = "604fa6cb9c5ab664a0d153fe0ccc6802" # api_key = "5beb887923204000bfcb402046bb05a6" appid = XF_APPID api_secret = XF_APISECRET api_key = XF_APIKEY audio_file = "./1.mp3" # 创建评测器实例 evaluator = XunFeiAudioEvaluator(appid, api_key, api_secret, audio_file) # 运行评测 results, eval_time = evaluator.run_evaluation() # 输出评测结果摘要 print("\n" + "="*50) print(evaluator.get_evaluation_summary()) print(f"总评测时间: {eval_time}") print("="*50)