# -*- coding: utf-8 -*- from __future__ import division from .__load__ import * from snowboy import snowboydecoder class Snowboy(object): def init(self, robot): # 勿扰模式控制 勿扰模式关闭 self.bother = False # snowboy detector self.detector = None # 中断控制 self.interrupted = False # vecan机器人 self.robot = robot self.hotword() # 启动hotword def hotword(self): if self.detector is not None: self.detector.terminate() self.getConfig() self.detector = snowboydecoder.HotwordDetector(self.config['models'], sensitivity=self.config['sensitivity'], outname=Demeter.config['vecan']['outname'], temp=Demeter.config['vecan']['temp']) # main loop try: self.detector.start(detected_callback=self.config['callbacks'], audio_recorder_callback=self.robot.talk, interrupt_check=self.interruptCallback, silent_count_threshold=int(self.config['silent_threshold']), recording_timeout=int(self.config['recording_timeout']) * 4, sleep_time=0.03) self.detector.terminate() except Exception as e: Demeter.logger.critical('离线唤醒机制初始化失败:{}'.format(e)) # active def listen(self, silent=False): Demeter.logger.debug('activeListen') try: if not silent: time.sleep(1) self.robot.play(self.robot.data + 'beep_hi.wav') listener = snowboydecoder.ActiveListener([self.robot.data + self.config['hotword']], outname=Demeter.config['vecan']['outname'], temp=Demeter.config['vecan']['temp']) voice = listener.listen( silent_count_threshold=int(self.config['silent_threshold']), recording_timeout=int(self.config['recording_timeout']) * 4 ) if not silent: self.robot.play(self.robot.data + 'beep_lo.wav') if voice: query = self.robot.tool['asr'].asr(voice) Demeter.remove(voice) return query except Exception as e: Demeter.logger.error(e) return '' # snowboy配置,后续改成从数据库读取 def getConfig(self): self.config = Demeter.config['snowboy'] if self.config['hotword_switch'] == True: self.config['models'] = [ self.robot.data + self.config['hotword'], self.robot.data + self.config['on_hotword'], self.robot.data + self.config['off_hotword'] ] self.config['callbacks'] = [ self.detectedCallback, self.onHotwordCallback, self.offHotwordCallback ] else: self.config['models'] = self.robot.data + self.config['hotword'] self.config['callbacks'] = self.detectedCallback # stop def stop(self): self.interrupted = True clean() # callback def detectedCallback(self): if not self.isProperTime(): Demeter.logger.warning('勿扰模式开启中') return if self.robot.isRecording: Demeter.logger.warning('正在录音中,跳过') return self.robot.player.play(self.robot.data + 'beep_hi.wav') Demeter.logger.info('开始录音') self.robot.interrupt() self.robot.isRecording = True; def onHotwordCallback(self): if self.config['hotword_switch'] == True: self.bother = True self.robot.player.play(self.robot.data + 'off.wav') Demeter.logger.info('勿扰模式打开') def offHotwordCallback(self): if self.config['hotword_switch'] == True: self.bother = False self.robot.player.play(self.robot.data + 'on.wav') Demeter.logger.info('勿扰模式关闭') def interruptCallback(self): return self.interrupted def isProperTime(): if self.bother == True: return False if 'bother_enable' not in self.config: return True if self.config['bother_enable'] == False: return True if 'bother_since' not in self.config or 'bother_till' not in self.config: return True since = self.config['bother_since'] till = self.config['bother_till'] current = time.localtime(time.time()).tm_hour if till > since: return current not in range(since, till) else: return not (current in range(since, 25) or current in range(-1, till))