123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- # -*- coding: utf-8 -*-
- from demeter.core import *
- from datetime import *
- import time
- import uuid
- import os
- import os.path
- import wave
- import tempfile
- import cProfile
- import base64
- from pydub import AudioSegment
- from uuid import getnode as get_mac
- from abc import ABCMeta, abstractmethod
- from ctypes import CFUNCTYPE, c_char_p, c_int, cdll
- from contextlib import contextmanager
- def py_error_handler(filename, line, function, err, fmt):
- pass
- ERROR_HANDLER_FUNC = CFUNCTYPE(None, c_char_p, c_int, c_char_p, c_int, c_char_p)
- c_error_handler = ERROR_HANDLER_FUNC(py_error_handler)
- @contextmanager
- def no_alsa_error():
- try:
- asound = cdll.LoadLibrary('libasound.so')
- asound.snd_lib_error_set_handler(c_error_handler)
- yield
- asound.snd_lib_error_set_handler(None)
- except:
- yield
- pass
- def getPlayerByFileName(fname):
- foo, ext = os.path.splitext(fname)
- if ext == '.mp3':
- return Demeter.service('sox', 'player')
- elif ext == '.wav':
- return Demeter.service('wav', 'player')
- """
- 将 mp3 文件转成 wav
- :param file: mp3 文件路径
- :return: wav 文件路径
- """
- def convertMp3ToWav(file):
- target = file.replace(".mp3", ".wav")
- if not os.path.exists(file):
- Demeter.logger.critical("文件错误 {}".format(file))
- return None
- AudioSegment.from_mp3(file).export(target, format="wav")
- return target
- """
- 将 wav 文件转成 mp3
- :param file: wav 文件路径
- :return: mp3 文件路径
- """
- def convertWavToMp3(file):
- target = file.replace('.wav', '.mp3')
- if not os.path.exists(file):
- Demeter.logger.critical("文件错误 {}".format(file))
- return None
- AudioSegment.from_wav(file).export(target, format="mp3")
- return target
- """
- 从 wav 文件中读取 pcm
- :param file: wav 文件路径
- :return: pcm 数据
- """
- def getPcmFromWav(file):
- wav = wave.open(file, 'rb')
- return wav.readframes(wav.getnframes())
- """
- 二进制形式写入临时文件
- :param data: 二进制数据
- :param suffix: 后缀名
- :return: 文件保存后的路径
- """
- def writeTempFile(data, suffix):
-
- with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f:
- f.write(data)
- tmpfile = f.name
- return tmpfile
- # 清理目录
- def clean():
- temp = Demeter.config['vecan']['temp']
- temp_files = os.listdir(temp)
- for f in temp_files:
- if os.path.isfile(os.path.join(temp, f)) and re.match(r''+Demeter.config['vecan']['outname']+'[\d]*\.wav', os.path.basename(f)):
- os.remove(os.path.join(temp, f))
- """ 获取缓存的语音 """
- def getCache(msg):
- md5 = Demeter.md5(msg)
- mp3_cache = os.path.join(Demeter.config['vecan']['temp'], md5 + '.mp3')
- wav_cache = os.path.join(Demeter.config['vecan']['temp'], md5 + '.wav')
- if os.path.exists(mp3_cache):
- return mp3_cache
- elif os.path.exists(wav_cache):
- return wav_cache
- return None
- """ 获取缓存的语音 """
- def saveCache(voice, msg):
- foo, ext = os.path.splitext(voice)
- md5 = Demeter.md5(msg)
- shutil.copyfile(voice, os.path.join(Demeter.config['vecan']['temp'], md5+ext))
|