# -*- coding: utf-8 -*-
from __future__ import division
from .__load__ import *
class Robot(object):
tool = {}
def init(self, profiling=False):
self.start()
# 历史记录
self.history = []
# 数据文件夹
self.data = Demeter.config['vecan']['data']
# 日志
self.logger = Log.init(__name__)
# 是否正在录音
self.isRecording = False
# 插件验证--暂时无用
self.matchPlugin = None
# 勿扰模式
self.immersiveMode = None
# 优化
self.profiling = profiling
# 询问模式
self.hasPardon = False
# 播放器
self.player = None
# snowboy
self.snowboy = None
# 对话
self.onSay = None
# 技能
self.skill = None
def initSnowboy(self, snowboy):
self.snowboy = snowboy
def getSnowBoy(self):
if self.snowboy:
return self.snowboy
else:
self.snowboy = Demeter.service('snowboy')
self.snowboy.init(self)
return self.snowboy
def getHistory(self):
return self.history
def say(self, msg, cache=False, plugin='', completed=None):
print msg
if self.onSay:
self.logger.info('onSay: {}'.format(msg))
if plugin != '':
self.onSay("[{}] {}".format(plugin, msg))
else:
self.onSay(msg)
self.onSay = None
if plugin != '':
self.appendHistory(1, "[{}] {}".format(plugin, msg))
else:
self.appendHistory(1, msg)
pattern = r'^https?://.+'
if re.match(pattern, msg):
self.logger.info('内容包含URL,无法读取')
return
voice = ''
voice = self.tool['tts'].tts(msg)
if cache:
saveCache(voice, msg)
'''
if getCache(msg):
self.logger.info('命中缓存,播放缓存语音')
voice = getCache(msg)
else:
try:
voice = self.tool['tts'].tts(msg)
if cache:
saveCache(voice, msg)
except Exception as e:
self.logger.error('保存缓存失败:{}'.format(e))
'''
if completed is None:
completed = lambda: self.completed(msg)
self.play(voice, not cache, completed=completed)
def play(self, voice, delete=False, completed=None):
if self.player == None:
self.player = Demeter.service('sox', 'player')
self.player.play(voice, delete=delete, onCompleted=completed)
def start(self):
# 机器人配置,后续改成从数据库读取
self.config = Demeter.config['robot']
self.reload()
def reload(self):
try:
for item in self.config:
setting = self.setting(self.config[item])
if item not in self.tool:
self.tool[item] = Demeter.service(self.config[item], 'api')
self.tool[item].setting(**setting)
self.player = None
self.skill = Demeter.service('skill')
self.skill.init(self)
self.skill.printPlugins()
except Exception as e:
self.logger.critical('机器人初始化失败:{}'.format(e))
def checkRestore(self):
if self.immersiveMode:
self.skill.restore()
def checkPlayer(self):
return self.player is not None and self.player.is_playing()
def setting(self, name):
# 后续要改成数据库读取
config = Demeter.config[name]
return config
# 中断、停止
def interrupt(self):
if self.checkPlayer():
self.player.stop()
self.player = None
if self.immersiveMode:
self.skill.pause()
def talk(self, fp, callback=None):
self.play(self.data + 'beep_lo.wav')
self.logger.info('结束录音')
self.isRecording = False
if self.profiling:
self.logger.info('性能调试已打开')
pr = cProfile.Profile()
pr.enable()
self.onTalk(fp, callback)
pr.disable()
s = io.StringIO()
sortby = 'cumulative'
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())
else:
self.onTalk(fp, callback)
def onTalk(self, fp, callback=None, onSay=None):
try:
self.interrupt()
query = self.tool['asr'].asr(fp)
File.remove(fp)
self.response(query, callback, onSay)
except Exception as e:
self.logger.critical(e)
clean()
def doParse(self, query, **args):
return self.tool['nlu'].nlu(query, **args)
def setImmersiveMode(self, slug):
self.immersiveMode = slug
def getImmersiveMode(self):
return self.immersiveMode
def response(self, query, uid='', onSay=None):
# 统计
#statistic.report(1)
self.interrupt()
self.appendHistory(0, query, uid)
if onSay:
self.onSay = onSay
if query.strip() == '':
self.pardon()
return
lastImmersiveMode = self.immersiveMode
if not self.skill.query(query):
# 没命中技能,使用机器人回复
msg = self.tool['ai'].ai(query)
self.say(msg, True, completed=self.checkRestore)
else:
if lastImmersiveMode is not None and lastImmersiveMode != self.matchPlugin:
time.sleep(1)
if self.checkPlayer():
self.logger.debug('等说完再checkRestore')
self.player.appendOnCompleted(lambda: self.checkRestore())
else:
self.logger.debug('checkRestore')
self.checkRestore()
#将会话历史加进历史记录
def appendHistory(self, t, text, uid=''):
if t in (0, 1) and text != '':
if text.endswith(',') or text.endswith(','):
text = text[:-1]
if uid == '' or uid == None or uid == 'null':
uid = Demeter.uuid(__name__)
# 将图片处理成HTML
pattern = r'https?://.+\.(?:png|jpg|jpeg|bmp|gif|JPG|PNG|JPEG|BMP|GIF)'
url_pattern = r'^https?://.+'
imgs = re.findall(pattern, text)
for img in imgs:
text = text.replace(img, '
'.format(img))
urls = re.findall(url_pattern, text)
for url in urls:
text = text.replace(url, '{}'.format(url, url))
self.history.append({'type': t, 'text': text, 'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), 'uid': uid})
def completed(self, msg):
if Demeter.config['snowboy']['active_mode'] and (msg.endswith('?') or msg.endswith(u'?') or u'告诉我' in msg or u'请回答' in msg):
query = self.getSnowBoy().listen()
self.response(query)
def pardon(self):
if not self.hasPardon:
self.say('抱歉,刚刚没听清,能再说一遍吗?', completed=lambda: self.response(self.getSnowBoy().listen()))
self.hasPardon = True
else:
self.say('没听清呢')
self.hasPardon = False