main
黄海 5 months ago
parent 525bd85c0a
commit dd481c2d04

@ -0,0 +1,143 @@
# -*- coding: utf8 -*-
import json
import time
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
# 录音文件识别
# https://common-buy.aliyun.com/?spm=5176.11801677.0.0.44e77a33oMMyT6&commodityCode=nlsService&orderType=UPGRADE&instanceId=1546399445482588
# 1. 开通商用即为付费使用服务根据后付费梯度计费标准单价为2.5元/小时;
# 2. 商用服务可在最长3小时内完成识别并返回识别文本
# 3. 注意:已经“商用”后再切换“试用”,等同于立即关停线上服务,请谨慎操作;
def fileTrans(akId, akSecret, appKey, fileLink):
# 服务配置
REGION_ID = "cn-shanghai"
PRODUCT = "nls-filetrans"
DOMAIN = "filetrans.cn-shanghai.aliyuncs.com"
API_VERSION = "2018-08-17"
# 请求参数
KEY_APP_KEY = "appkey"
KEY_FILE_LINK = "file_link"
KEY_VERSION = "version"
KEY_ENABLE_WORDS = "enable_words"
KEY_TASK = "Task"
KEY_TASK_ID = "TaskId"
KEY_STATUS_TEXT = "StatusText"
KEY_RESULT = "Result"
# 状态码
STATUS_SUCCESS = "SUCCESS"
STATUS_RUNNING = "RUNNING"
STATUS_QUEUEING = "QUEUEING"
# 初始化客户端
client = AcsClient(akId, akSecret, REGION_ID)
# 构建任务参数
task_config = {
KEY_APP_KEY: appKey,
KEY_FILE_LINK: fileLink,
KEY_VERSION: "4.0",
KEY_ENABLE_WORDS: False
}
task_json = json.dumps(task_config)
print("任务配置:", task_json)
# 提交识别请求
postRequest = CommonRequest()
postRequest.set_domain(DOMAIN)
postRequest.set_version(API_VERSION)
postRequest.set_product(PRODUCT)
postRequest.set_action_name("SubmitTask")
postRequest.set_method('POST')
postRequest.add_body_params(KEY_TASK, task_json)
try:
postResponse = client.do_action_with_exception(postRequest)
postData = json.loads(postResponse)
print("提交响应:", postData)
if postData.get(KEY_STATUS_TEXT) != STATUS_SUCCESS:
print(f"任务提交失败: {postData.get(KEY_STATUS_TEXT, '未知错误')}")
return None
taskId = postData[KEY_TASK_ID]
print(f"任务ID: {taskId}")
except (ServerException, ClientException) as e:
print(f"API请求异常: {e}")
return None
except KeyError:
print("响应缺少关键字段")
return None
# 查询任务结果
getRequest = CommonRequest()
getRequest.set_domain(DOMAIN)
getRequest.set_version(API_VERSION)
getRequest.set_product(PRODUCT)
getRequest.set_action_name("GetTaskResult")
getRequest.set_method('GET')
getRequest.add_query_param(KEY_TASK_ID, taskId)
retry_count = 0
max_retries = 20 # 最多等待200秒 (20次*10秒)
while retry_count < max_retries:
try:
getResponse = client.do_action_with_exception(getRequest)
resultData = json.loads(getResponse)
print(f"轮询结果({retry_count + 1}/{max_retries}):", resultData)
status = resultData.get(KEY_STATUS_TEXT, "")
if status == STATUS_SUCCESS:
return resultData.get(KEY_RESULT, {})
elif status in (STATUS_RUNNING, STATUS_QUEUEING):
time.sleep(10)
retry_count += 1
else:
print(f"识别失败,最终状态: {status}")
return None
except (ServerException, ClientException) as e:
print(f"查询异常: {e}")
return None
print("超过最大重试次数,任务未完成")
return None
# 配置信息(请替换为实际值)
accessKeyId = "LTAI5tE4tgpGcKWhbZg6C4bh"
accessKeySecret = "oizcTOZ8izbGUouboC00RcmGE8vBQ1"
appKey = "OIpiw501l4o6MYEe"
fileLink = "https://ylt.oss-cn-hangzhou.aliyuncs.com/HuangHai/123.wav"
# 执行识别
result = fileTrans(accessKeyId, accessKeySecret, appKey, fileLink)
# 处理结果
if not result:
print("未获取到有效结果")
exit(1)
text_content = []
if 'Sentences' in result:
for sentence in result['Sentences']:
text_content.append(sentence.get("Text", ""))
full_text = ''.join(text_content)
else:
print("响应中未包含有效文本")
exit(1)
# 保存结果
output_file = "识别结果.txt"
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(full_text)
print(f"结果已保存至 {output_file}")
except Exception as e:
print(f"文件保存失败: {str(e)}")
exit(1)

@ -1,13 +0,0 @@
package Tools.TestUnit;
import cn.hutool.core.io.FileUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
public class ReadSentence {
public static void main(String[] args) {
String content = FileUtil.readUtf8String(new File("D:\\dsWork\\QingLong\\音频文本.json"));
}
}
Loading…
Cancel
Save