# -*- coding: utf8 -*- import json import time from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest # 录音文件识别 # https://common-buy.aliyun.com/?spm=5176.11801677.0.0.44e77a33oMMyT6&commodityCode=nlsService&orderType=UPGRADE&instanceId=1546399445482588 # 1. 开通商用即为付费使用服务,根据后付费梯度计费,标准单价为2.5元/小时; # 2. 商用服务可在最长3小时内完成识别并返回识别文本; # 3. 注意:已经“商用”后再切换“试用”,等同于立即关停线上服务,请谨慎操作; def fileTrans(akId, akSecret, appKey, fileLink): # 服务配置 REGION_ID = "cn-shanghai" PRODUCT = "nls-filetrans" DOMAIN = "filetrans.cn-shanghai.aliyuncs.com" API_VERSION = "2018-08-17" # 请求参数 KEY_APP_KEY = "appkey" KEY_FILE_LINK = "file_link" KEY_VERSION = "version" KEY_ENABLE_WORDS = "enable_words" KEY_TASK = "Task" KEY_TASK_ID = "TaskId" KEY_STATUS_TEXT = "StatusText" KEY_RESULT = "Result" # 状态码 STATUS_SUCCESS = "SUCCESS" STATUS_RUNNING = "RUNNING" STATUS_QUEUEING = "QUEUEING" # 初始化客户端 client = AcsClient(akId, akSecret, REGION_ID) # 构建任务参数 task_config = { KEY_APP_KEY: appKey, KEY_FILE_LINK: fileLink, KEY_VERSION: "4.0", KEY_ENABLE_WORDS: False } task_json = json.dumps(task_config) print("任务配置:", task_json) # 提交识别请求 postRequest = CommonRequest() postRequest.set_domain(DOMAIN) postRequest.set_version(API_VERSION) postRequest.set_product(PRODUCT) postRequest.set_action_name("SubmitTask") postRequest.set_method('POST') postRequest.add_body_params(KEY_TASK, task_json) try: postResponse = client.do_action_with_exception(postRequest) postData = json.loads(postResponse) print("提交响应:", postData) if postData.get(KEY_STATUS_TEXT) != STATUS_SUCCESS: print(f"任务提交失败: {postData.get(KEY_STATUS_TEXT, '未知错误')}") return None taskId = postData[KEY_TASK_ID] print(f"任务ID: {taskId}") except (ServerException, ClientException) as e: print(f"API请求异常: {e}") return None except KeyError: print("响应缺少关键字段") return None # 查询任务结果 getRequest = CommonRequest() getRequest.set_domain(DOMAIN) getRequest.set_version(API_VERSION) getRequest.set_product(PRODUCT) getRequest.set_action_name("GetTaskResult") getRequest.set_method('GET') getRequest.add_query_param(KEY_TASK_ID, taskId) retry_count = 0 max_retries = 20 # 最多等待200秒 (20次*10秒) while retry_count < max_retries: try: getResponse = client.do_action_with_exception(getRequest) resultData = json.loads(getResponse) print(f"轮询结果({retry_count + 1}/{max_retries}):", resultData) status = resultData.get(KEY_STATUS_TEXT, "") if status == STATUS_SUCCESS: return resultData.get(KEY_RESULT, {}) elif status in (STATUS_RUNNING, STATUS_QUEUEING): time.sleep(10) retry_count += 1 else: print(f"识别失败,最终状态: {status}") return None except (ServerException, ClientException) as e: print(f"查询异常: {e}") return None print("超过最大重试次数,任务未完成") return None # 配置信息(请替换为实际值) accessKeyId = "LTAI5tE4tgpGcKWhbZg6C4bh" accessKeySecret = "oizcTOZ8izbGUouboC00RcmGE8vBQ1" appKey = "OIpiw501l4o6MYEe" fileLink = "https://ylt.oss-cn-hangzhou.aliyuncs.com/HuangHai/123.wav" # 执行识别 result = fileTrans(accessKeyId, accessKeySecret, appKey, fileLink) # 处理结果 if not result: print("未获取到有效结果") exit(1) text_content = [] if 'Sentences' in result: for sentence in result['Sentences']: text_content.append(sentence.get("Text", "")) full_text = ''.join(text_content) else: print("响应中未包含有效文本") exit(1) # 保存结果 output_file = "识别结果.txt" try: with open(output_file, 'w', encoding='utf-8') as f: f.write(full_text) print(f"结果已保存至 {output_file}") except Exception as e: print(f"文件保存失败: {str(e)}") exit(1)