1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- import json
- import time
- from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
- from aliyunsdkcore.client import AcsClient
- from aliyunsdkcore.request import CommonRequest
- class Audio(Base):
- def __init__(self):
- self.akId = 'LTAI5tCFiVxuXz39MMkXFcMm'
- self.akSecret = 'sdK3jVSrrqzz2nONAGyd6kvXZZwkie'
- self.appKey = 'm0mDna21AWao7b0A'
- self.region = 'cn-beijing'
- self.client = AcsClient(self.akId, self.akSecret, self.region)
- # 固定配置
- self.PRODUCT = 'nls-filetrans'
- self.DOMAIN = f'filetrans.{self.region}.aliyuncs.com'
- self.API_VERSION = '2018-08-17'
- self.POST_REQUEST_ACTION = 'SubmitTask'
- self.GET_REQUEST_ACTION = 'GetTaskResult'
- def json(self):
- if not self.file:
- return False
- # 提交任务
- task = {
- 'appkey': self.appKey,
- 'file_link': self.file,
- 'version': '4.0',
- 'enable_words': True,
- 'auto_split': True,
- }
- postRequest = CommonRequest()
- postRequest.set_domain(self.DOMAIN)
- postRequest.set_version(self.API_VERSION)
- postRequest.set_product(self.PRODUCT)
- postRequest.set_action_name(self.POST_REQUEST_ACTION)
- postRequest.set_method('POST')
- postRequest.add_body_params('Task', json.dumps(task))
- try:
- postResponse = self.client.do_action_with_exception(postRequest)
- postResponse = json.loads(postResponse)
- if postResponse.get('StatusText') != 'SUCCESS':
- raise Exception(f'提交任务失败: {postResponse}')
- taskId = postResponse['TaskId']
- except (ServerException, ClientException) as e:
- raise Exception(f'提交任务异常: {str(e)}')
- # 查询任务结果
- getRequest = CommonRequest()
- getRequest.set_domain(self.DOMAIN)
- getRequest.set_version(self.API_VERSION)
- getRequest.set_product(self.PRODUCT)
- getRequest.set_action_name(self.GET_REQUEST_ACTION)
- getRequest.set_method('GET')
- getRequest.add_query_param('TaskId', taskId)
- statusText = ''
- result_json = {}
- while True:
- try:
- getResponse = self.client.do_action_with_exception(getRequest)
- getResponse = json.loads(getResponse)
- statusText = getResponse.get('StatusText', '')
- if statusText in ['RUNNING', 'QUEUEING']:
- time.sleep(5)
- continue
- else:
- result_json = getResponse
- break
- except (ServerException, ClientException) as e:
- raise Exception(f'查询任务异常: {str(e)}')
- if statusText != 'SUCCESS':
- raise Exception(f'识别失败: {result_json}')
- # 处理成单词级别 SRT/WebVTT 风格 JSON
- final_result = []
- if 'Result' in result_json and 'Words' in result_json['Result']:
- for w in result_json['Result']['Words']:
- start = w.get('BeginTime', 0) / 1000.0 # 毫秒转秒
- end = w.get('EndTime', 0) / 1000.0
- text = w.get('Word', '').strip()
- if text:
- final_result.append({
- "start": start,
- "end": end,
- "text": text
- })
- return final_result
|