Compare commits

...

2 Commits

Author SHA1 Message Date
a12373b6d5 'commit' 2025-09-06 10:15:18 +08:00
94d9a34c63 'commit' 2025-09-06 10:14:59 +08:00
8 changed files with 409 additions and 9 deletions

View File

@@ -6,17 +6,17 @@ import json
import ssl
import time
import xml.etree.ElementTree as ET
import base64
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator:
class XunFeiAudioEvaluator_cn:
"""讯飞语音评测类"""
def __init__(self, appid, api_key, api_secret, audio_file, language, txt):
@@ -301,7 +301,7 @@ if __name__ == '__main__':
audio_file = r'D:\dsWork\dsProject\dsLightRag\static\audio\audio_afc0a96e382c428cba2f00e3f71e4e8f.mp3'
# 创建评测器实例
txt="Hello everyone! Nice to meet you. Today is a beautiful day. I am learning English pronunciation with this tool."
evaluator = XunFeiAudioEvaluator(appid, api_key, api_secret, audio_file, "english",txt)
evaluator = XunFeiAudioEvaluator_cn(appid, api_key, api_secret, audio_file, "english", txt)
# 运行评测
results, eval_time = evaluator.run_evaluation()

View File

@@ -0,0 +1,313 @@
import base64
import datetime
import hashlib
import hmac
import json
import ssl
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from Config.Config import XF_APPID, XF_APISECRET, XF_APIKEY
class XunFeiAudioEvaluator_en:
"""讯飞语音评测类"""
def __init__(self, appid, api_key, api_secret, audio_file, language, txt):
self.appid = appid
self.api_key = api_key
self.api_secret = api_secret
self.audio_file = audio_file
self.language = language
self.host_url = "wss://ise-api.xfyun.cn/v2/open-ise"
self.websocket_url = ""
self.evaluation_results = {}
self.txt = txt
def generate_auth_url(self):
"""生成鉴权URL"""
now_time = datetime.now()
now_date = format_date_time(mktime(now_time.timetuple()))
origin_base = "host: " + "ise-api.xfyun.cn" + "\n"
origin_base += "date: " + now_date + "\n"
origin_base += "GET " + "/v2/open-ise " + "HTTP/1.1"
# SHA256加密
signature_sha = hmac.new(
self.api_secret.encode('utf-8'),
origin_base.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.api_key}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
dict_data = {
"authorization": authorization,
"date": now_date,
"host": "ise-api.xfyun.cn"
}
return self.host_url + '?' + urlencode(dict_data)
def on_message(self, ws, message):
"""WebSocket消息处理"""
print(f"Received message: {message}")
try:
data = json.loads(message)
if data.get('code') != 0:
self.results = {'error': data.get('message', 'Unknown error')}
return
# 修复移除状态码为2的硬性检查只要有数据就解析
inner_data = data.get('data', {})
xml_b64 = inner_data.get('data', '')
if xml_b64:
xml_data = base64.b64decode(xml_b64)
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content) # 确保中文也能进入解析
except Exception as e:
print(f"Error processing message: {e}")
self.results = {'error': str(e)}
# 安全获取data字段
response_data = data.get("data", {})
status = response_data.get("status")
if status == 2 and response_data.get("data"):
# 解析评测结果
xml_data = base64.b64decode(response_data["data"])
xml_content = xml_data.decode("utf-8")
self.parse_evaluation_results(xml_content)
ws.close()
def on_error(self, ws, error):
"""错误处理"""
print(f"Error: {error},{ws}")
def on_close(self, ws, reason, res):
"""连接关闭处理"""
print(f"WebSocket connection closed,{ws}")
def on_open(self, ws):
"""连接建立处理"""
print(f"WebSocket connection opened,{ws},ws连接建立成功...")
# 发送初始参数
send_dict = {
"common": {
"app_id": self.appid
},
"business": {
"category": "read_sentence",
"rstcd": "utf8",
"sub": "ise",
"group": "pupil",
"ent": "cn_vip" if self.language == "chinese" else "en_vip",
"tte": "utf-8",
"cmd": "ssb",
"auf": "audio/L16;rate=16000",
"aue": "lame",
"text": '\uFEFF' + "[content]\n" + self.txt
},
"data": {
"status": 0,
"data": ""
}
}
ws.send(json.dumps(send_dict))
# 发送音频数据
with open(self.audio_file, "rb") as file_flag:
total_sent = 0
while True:
buffer = file_flag.read(1280)
if not buffer:
# 发送最后一帧(仅在有数据发送时)
if total_sent > 0:
my_dict = {
"business": {
"cmd": "auw",
"aus": 4,
"aue": "lame"
},
"data": {
"status": 2,
"data": "" # 空字符串表示结束
}
}
ws.send(json.dumps(my_dict))
time.sleep(1)
break
# 发送中间音频帧
send_dict = {
"business": {
"cmd": "auw",
"aus": 1,
"aue": "lame"
},
"data": {
"status": 1,
"data": str(base64.b64encode(buffer).decode()),
"data_type": 1,
"encoding": "raw"
}
}
ws.send(json.dumps(send_dict))
total_sent += len(buffer)
time.sleep(0.04)
def parse_evaluation_results(self, xml_content):
root = ET.fromstring(xml_content)
# 兼容中英文不同的根节点
read_node = root.find('.//read_sentence') or root.find('.//read_chapter')
if read_node is not None:
self.results = {
'total_score': read_node.attrib.get('total_score', '0'),
'accuracy': read_node.attrib.get('accuracy_score', '0'),
'fluency': read_node.attrib.get('fluency_score', '0'),
'integrity': read_node.attrib.get('integrity_score', '0')
}
else:
# 打印未匹配的XML结构用于调试
print(f"No evaluation nodes found in XML: {xml_content}")
self.results = {'error': '未找到有效评分节点'}
"""解析评测结果XML并提取得分信息"""
try:
print(xml_content)
root = ET.fromstring(xml_content)
# 查找read_chapter节点
read_chapter = root.find('.//read_chapter')
if read_chapter is not None:
# 保持字段名一致使用completeness_score
self.evaluation_results = {
'accuracy_score': float(read_chapter.get('accuracy_score', 0)),
'fluency_score': float(read_chapter.get('fluency_score', 0)),
'completeness_score': float(read_chapter.get('integrity_score', 0)),
'standard_score': float(read_chapter.get('standard_score', 0)),
'total_score': float(read_chapter.get('total_score', 0)),
'word_count': int(read_chapter.get('word_count', 0)),
'is_rejected': read_chapter.get('is_rejected', 'false') == 'true'
}
# 提取句子级别得分
sentence = read_chapter.find('.//sentence')
if sentence is not None:
self.evaluation_results['sentence'] = {
'accuracy_score': float(sentence.get('accuracy_score', 0)),
'fluency_score': float(sentence.get('fluency_score', 0)),
'total_score': float(sentence.get('total_score', 0))
}
# 提取单词级别得分
words = []
for word in read_chapter.findall('.//word'):
word_data = {
'content': word.get('content', ''),
'total_score': float(word.get('total_score', 0)),
'dp_message': int(word.get('dp_message', 0))
}
words.append(word_data)
self.evaluation_results['words'] = words
except ET.ParseError as e:
print(f"XML解析错误: {e}")
def get_evaluation_summary(self):
"""获取评测结果摘要"""
if not self.evaluation_results:
return "暂无评测结果"
summary = "=== 语音评测结果摘要 ===\n"
summary += f"总得分: {self.evaluation_results.get('total_score', 0):.4f}\n"
summary += f"准确度得分: {self.evaluation_results.get('accuracy_score', 0):.4f}\n"
summary += f"流畅度得分: {self.evaluation_results.get('fluency_score', 0):.4f}\n"
# 修复这里使用completeness_score
summary += f"完整度得分: {self.evaluation_results.get('completeness_score', 0):.4f}\n"
summary += f"标准度得分: {self.evaluation_results.get('standard_score', 0):.4f}\n"
summary += f"单词数量: {self.evaluation_results.get('word_count', 0)}\n"
summary += f"是否被拒绝: {'' if self.evaluation_results.get('is_rejected', False) else ''}\n"
if 'sentence' in self.evaluation_results:
sentence = self.evaluation_results['sentence']
summary += f"\n=== 句子级别得分 ===\n"
summary += f"句子准确度: {sentence.get('accuracy_score', 0):.4f}\n"
summary += f"句子流畅度: {sentence.get('fluency_score', 0):.4f}\n"
summary += f"句子总分: {sentence.get('total_score', 0):.4f}\n"
if 'words' in self.evaluation_results:
summary += f"\n=== 单词级别得分 ===\n"
for i, word in enumerate(self.evaluation_results['words']):
dp_msg = self._get_dp_message_description(word['dp_message'])
summary += f"{i + 1}. {word['content']}: {word['total_score']:.4f} ({dp_msg})\n"
return summary
def _get_dp_message_description(self, dp_message):
"""获取dp_message描述"""
descriptions = {
0: "正常",
16: "漏读",
32: "增读",
64: "回读",
128: "替换"
}
return descriptions.get(dp_message, f"未知({dp_message})")
def run_evaluation(self):
"""运行评测"""
start_time = datetime.now()
websocket.enableTrace(False)
ws_url = self.generate_auth_url()
ws_entity = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
ws_entity.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
end_time = datetime.now()
evaluation_time = end_time - start_time
print(f"评测耗时: {evaluation_time}")
return self.evaluation_results, evaluation_time
# 使用示例
if __name__ == '__main__':
# 配置参数
# appid = "5b83f8d6"
# api_secret = "604fa6cb9c5ab664a0d153fe0ccc6802"
# api_key = "5beb887923204000bfcb402046bb05a6"
appid = XF_APPID
api_secret = XF_APISECRET
api_key = XF_APIKEY
# audio_file = "./1.mp3"
audio_file = r'D:\dsWork\dsProject\dsLightRag\static\audio\audio_afc0a96e382c428cba2f00e3f71e4e8f.mp3'
# 创建评测器实例
txt="Hello everyone! Nice to meet you. Today is a beautiful day. I am learning English pronunciation with this tool."
evaluator = XunFeiAudioEvaluator_en(appid, api_key, api_secret, audio_file, "english", txt)
# 运行评测
results, eval_time = evaluator.run_evaluation()
# 输出评测结果摘要
print("\n" + "=" * 50)
print(evaluator.get_evaluation_summary())
print(f"总评测时间: {eval_time}")
print("=" * 50)

View File

@@ -8,7 +8,7 @@ from fastapi import APIRouter, UploadFile, File, Form
# 配置日志
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/xunFei", tags=["讯飞"])
router = APIRouter(prefix="/api/xunFeiCn", tags=["讯飞"])
# 音频保存目录
UPLOAD_DIR = os.path.join(os.path.dirname(__file__), "..", "static", "audio")
@@ -54,8 +54,8 @@ async def save_audio(audio: UploadFile = File(...), txt: str = Form(...), langua
raise Exception(f"音频格式转换失败: {e.stderr.decode()}")
# 2. 讯飞评分
from KeDaXunFei.XunFeiAudioEvaluator import XunFeiAudioEvaluator
evaluator = XunFeiAudioEvaluator(
from KeDaXunFei.XunFeiAudioEvaluator_cn import XunFeiAudioEvaluator_cn
evaluator = XunFeiAudioEvaluator_cn(
appid=XF_APPID,
api_key=XF_APIKEY,
api_secret=XF_APISECRET,

View File

@@ -0,0 +1,85 @@
import logging
import os
import uuid
import tempfile
import shutil
import sys
from fastapi import APIRouter, UploadFile, File, Form
# 配置日志
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/xunFeiEn", tags=["讯飞"])
# 音频保存目录
UPLOAD_DIR = os.path.join(os.path.dirname(__file__), "..", "static", "audio")
os.makedirs(UPLOAD_DIR, exist_ok=True)
# 讯飞配置
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from Config.Config import XF_APPID, XF_APIKEY, XF_APISECRET
@router.post("/save-audio")
async def save_audio(audio: UploadFile = File(...), txt: str = Form(...), language: str = Form("english")): # 添加语言参数,默认英文
"""保存音频文件并评分"""
temp_file = None
try:
# 1. 保存音频到临时文件
temp_dir = tempfile.mkdtemp()
temp_file = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.wav")
content = await audio.read()
with open(temp_file, "wb") as f:
f.write(content)
# 3. 保存到正式目录
file_name = f"audio_{uuid.uuid4().hex}.wav"
file_path = os.path.join(UPLOAD_DIR, file_name)
shutil.copy2(temp_file, file_path)
logger.info(f"已保存文件到: {file_path}")
# 添加wav转mp3功能
import subprocess
mp3_temp_file = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex}.mp3")
try:
# 使用ffmpeg将wav转换为mp3
subprocess.run(
["ffmpeg", "-i", temp_file, "-y", mp3_temp_file],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"已将wav转换为mp3: {mp3_temp_file}")
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg转换失败: {e.stderr.decode()}")
raise Exception(f"音频格式转换失败: {e.stderr.decode()}")
# 2. 讯飞评分
from KeDaXunFei.XunFeiAudioEvaluator_en import XunFeiAudioEvaluator_en
evaluator = XunFeiAudioEvaluator_en(
appid=XF_APPID,
api_key=XF_APIKEY,
api_secret=XF_APISECRET,
audio_file=mp3_temp_file,
language=language, # 使用动态参数
txt=txt
)
results, eval_time = evaluator.run_evaluation()
print(evaluator.get_evaluation_summary())
return {
"success": True,
"file_name": file_name,
"file_path": f"/static/audio/{file_name}",
"evaluation": results,
"evaluation_time": str(eval_time)
}
except Exception as e:
logger.error(f"处理失败: {str(e)}")
return {"success": False, "error": str(e)}
finally:
# 4. 清理临时文件
if temp_file and os.path.exists(temp_file):
shutil.rmtree(os.path.dirname(temp_file), ignore_errors=True)

View File

@@ -32,7 +32,8 @@ from Routes.ttsRoute import router as tts_router
from Routes.CopyFaceRoute import router as copyFace_router
from Routes.WenShengTu import router as wenshengtu_router
from Routes.TeacherHelperRoute import router as teacherHelper_router
from Routes.XunFeiRoute import router as xunfei_router
from Routes.XunFeiRoute_en import router as xunfei_en_router
from Routes.XunFeiRoute_cn import router as xunfei_cn_router
# 控制日志输出
logger = logging.getLogger('lightrag')
logger.setLevel(logging.INFO)
@@ -91,7 +92,8 @@ app.include_router(tts_router) # 文本转语音
app.include_router(copyFace_router) # 抄脸
app.include_router(wenshengtu_router) # 文生图
app.include_router(teacherHelper_router) # 教师助手
app.include_router(xunfei_router) # 讯飞
app.include_router(xunfei_en_router) # 讯飞
app.include_router(xunfei_cn_router) # 讯飞
# Teaching Model 相关路由
# 登录相关(不用登录)

View File

@@ -747,7 +747,7 @@
try {
this.simulateUploadProgress();
const response = await fetch('/api/xunFei/save-audio', {
const response = await fetch('/api/xunFeiEn/save-audio', {
method: 'POST',
body: formData,
headers: {'Accept': 'application/json'}