# -*- coding: utf-8 -*- from .__load__ import * import json import time from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest class Audio(Base): def __init__(self): self.akId = 'LTAI5tCFiVxuXz39MMkXFcMm' self.akSecret = 'sdK3jVSrrqzz2nONAGyd6kvXZZwkie' self.appKey = 'm0mDna21AWao7b0A' self.region = 'cn-beijing' self.client = AcsClient(self.akId, self.akSecret, self.region) # 固定配置 self.PRODUCT = 'nls-filetrans' self.DOMAIN = f'filetrans.{self.region}.aliyuncs.com' self.API_VERSION = '2018-08-17' self.POST_REQUEST_ACTION = 'SubmitTask' self.GET_REQUEST_ACTION = 'GetTaskResult' def json(self): if not self.file: return False # 提交任务 task = { 'appkey': self.appKey, 'file_link': self.file, 'version': '4.0', 'enable_words': True, 'auto_split': True, } postRequest = CommonRequest() postRequest.set_domain(self.DOMAIN) postRequest.set_version(self.API_VERSION) postRequest.set_product(self.PRODUCT) postRequest.set_action_name(self.POST_REQUEST_ACTION) postRequest.set_method('POST') postRequest.add_body_params('Task', json.dumps(task)) try: postResponse = self.client.do_action_with_exception(postRequest) postResponse = json.loads(postResponse) if postResponse.get('StatusText') != 'SUCCESS': raise Exception(f'提交任务失败: {postResponse}') taskId = postResponse['TaskId'] except (ServerException, ClientException) as e: raise Exception(f'提交任务异常: {str(e)}') # 查询任务结果 getRequest = CommonRequest() getRequest.set_domain(self.DOMAIN) getRequest.set_version(self.API_VERSION) getRequest.set_product(self.PRODUCT) getRequest.set_action_name(self.GET_REQUEST_ACTION) getRequest.set_method('GET') getRequest.add_query_param('TaskId', taskId) statusText = '' result_json = {} while True: try: getResponse = self.client.do_action_with_exception(getRequest) getResponse = json.loads(getResponse) statusText = getResponse.get('StatusText', '') if statusText in ['RUNNING', 'QUEUEING']: time.sleep(5) continue else: result_json = getResponse break except (ServerException, ClientException) as e: raise Exception(f'查询任务异常: {str(e)}') if statusText != 'SUCCESS': raise Exception(f'识别失败: {result_json}') # 处理成单词级别 SRT/WebVTT 风格 JSON final_result = [] if 'Result' in result_json and 'Words' in result_json['Result']: for w in result_json['Result']['Words']: start = w.get('BeginTime', 0) / 1000.0 # 毫秒转秒 end = w.get('EndTime', 0) / 1000.0 text = w.get('Word', '').strip() if text: final_result.append({ "start": start, "end": end, "text": text }) return final_result