音声認識による赤外線機器の操作 その 7【全体制御編】
どうも,筆者です.
前回までで,認識した単語の解析まで行えるようになった.後は,選択した単語に対応する応答メッセージ(音声)を流しつつ,赤外線信号を送信すればよい.
前回までの記事は以下にある.
workspacememory.hatenablog.com
今回の対象
今回の実装対象を以下に示す.
実装対象一覧
実装するクラスを以下に示す.
- audioPlayer.py(応答メッセージ再生用)
- getWeather.py(天気予報取得用)
- voiceRecognition.py(音声認識用,メイン部分)
これらを順に実装する.
audioPlayer
音声再生には,Linux の aplay コマンドを利用する.「audioPlayer.py」の実装を以下に示す.
~/juliusKit $ touch audioPlayer.py ~/juliusKit $ vim audioPlayer.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3 # -*- coding: utf-8 -*- # audioPlayer.py import subprocess as sb from VR_ConstClass import CONST_CLASS class audioClass(): def __init__(self): self.__audioFile = '' self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL) self.__process.communicate() def setAudioFile(self, audio_file): self.__audioFile = audio_file def audioExitHandler(self): if self.__process.poll() is None: self.__process.terminate() def playAudio(self): if self.__audioFile == '': return self.audioExitHandler() useCmd = ['/usr/bin/aplay', CONST_CLASS.WAVE_DIR + self.__audioFile] self.__process = sb.Popen(useCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)
getWeather
天気予報の取得には,Weather Hacks API を利用する.使い方は,以下のサイトを参考にした.
また,受信した情報を読み上げたいと考えたため,「Open JTalk」を利用して音声再生を行った.ここだけ,天気予報受信→再生という流れを取っているため,非同期で処理できていない.インストール方法は以下を参考にした.
インストール後,実装したコードを以下に示す.ファイル名を「getWeather.py」として保存した.
~/juliusKit $ touch getWeather.py ~/juliusKit $ vim getWeather.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3 # -*- coding: utf-8 -*- # getWeather.py import requests import subprocess as sb from VR_ConstClass import CONST_CLASS class weatherClass(): def __init__(self): self.__url = 'http://weather.livedoor.com/forecast/webservice/json/v1' self.__city = 130010 self.__titleStr = 'title' self.__loopStr = 'forecasts' self.__dateStr = 'date' self.__telopStr = 'telop' self.__dicDir = '/usr/local/share/openJTalk/dic/' self.__voiceFile = '/usr/local/share/openJTalk/voice/mei_normal.htsvoice' self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL) self.__process.communicate() def __getWeather(self): getURL = '{0}?city={1}'.format(self.__url, self.__city) apiData = requests.get(getURL).json() retList = [apiData[self.__titleStr]] for weather in apiData[self.__loopStr]: tmpData = list(map(lambda x: int(x, 10), weather[self.__dateStr].split('-'))) weatherDate = '{0}年{1}月{2}日'.format(tmpData[0], tmpData[1], tmpData[2]) weatherForecasts = weather[self.__telopStr] retList.append('{0},{1}'.format(weatherDate, weatherForecasts)) return retList # 天気予報取得関数の終了処理 def weatherExitHandler(self): if self.__process.poll() is None: self.__process.terminate() def run(self, wavFile='weather.wav'): listData = self.__getWeather() # 読み上げるテキスト text = ','.join(listData).replace(' ', '') output = CONST_CLASS.WAVE_DIR + wavFile outputList = ['/bin/echo', '"' + str(text) + '"', '|'] outputList.extend(['/usr/local/bin/open_jtalk', '-m', self.__voiceFile, '-ow', output, '-x', self.__dicDir]) # 実行コマンド command = ' '.join(outputList) try: # コマンドの実行 self.__process = sb.Popen(command, shell=True, stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE) # 処理の完了待ち self.__process.communicate() except: self.weatherExitHandler() return text
voiceRecognition
メイン処理を実装している部分である.以前導入した「SimpleWebSocketServer」をメインで動作させる.メッセージを受信したら,対象の関数を呼び出し単語解析を実施する.解析結果から赤外線データの送信等の処理を行う.
以下のスクリプトを「voiceRecognition.py」として保存する.
~/juliusKit $ touch voiceRecognition.py ~/juliusKit $ vim voiceRecognition.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3 # -*- coding: utf-8 -*- # voiceRecognition.py from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket import signal import parseJuliusData, adrsirlib, getWeather import configuration, audioPlayer from VR_ConstClass import CONST_CLASS class voiceRecognitionClass(): def __init__(self): # julius クラスのインスタンス self.__julius = parseJuliusData.parseJuliusDataClass() # configuration クラスのインスタンス self.__configure = configuration.voiceConfigClass() # voice の設定 self.__audio = audioPlayer.audioClass() # weather の設定 self.__weather = getWeather.weatherClass() # 処理開始 def startExecution(self): self.__julius.startThread() # 処理終了 def stopExecution(self): self.__julius.stopThread() # julius の停止 self.__audio.audioExitHandler() # 音声の停止 self.__weather.weatherExitHandler() # 天気予報のデータ生成停止 # 実行用関数 def analysis(self, wordData): # ToDo: スリープモード移行時 if False: retVal = self.__configure.setJuliusState(False) # 動作中だった場合 if retVal: # 停止状態に移行 pass # ToDo: マニュアル操作時 elif False: local_text = wordData sendCmd = self.__configure.getIrCmd(local_text) if sendCmd is not None: # ADRSIR にコマンドを送信 adrsirlib.write(sendCmd) # それ以外 else: retStatus, sendCmd, audioFile = self.__configure.chkCmdExection(wordData) retVal = (retStatus != self.__configure.juliusReturnState[False]) self.__audio.setAudioFile(audioFile) if retVal: outputText = self.__printMsg[retVal] # 天気を読み上げる場合 if retStatus == self.__configure.juliusReturnState['weather']: outputText = self.__weather.run(wavFile=audioFile) self.__audio.playAudio() if sendCmd is not None: # ADRSIR にコマンドを送信 adrsirlib.write(sendCmd) class processStatusClass(): def __init__(self): self.__running = True self.__stopped = False self.__status = self.__running signal.signal(signal.SIGINT, self.changeState) # Ctrl + C の監視 signal.signal(signal.SIGTERM, self.changeState) # kill コマンドの監視 def changeState(self, signum, frame): self.__status = self.__stopped def getStatus(self): return self.__status if __name__ == '__main__': # processStatus クラスのインスタンス procStat = processStatusClass() # voiceRecognitiond クラスのインスタンス vrc = voiceRecognitionClass() # 処理開始 vrc.startExecution() # SimpleWebSocketServer 用のクラス定義 class webSocketProcessClass(WebSocket): def handleMessage(self): vrc.analysis(self.data) # WebSocket 用のサーバのインスタンス生成 server = SimpleWebSocketServer( CONST_CLASS.WEBSOCKET_HOST, CONST_CLASS.WEBSOCKET_PORT, webSocketProcessClass ) # main loop while procStat.getStatus(): server.serveonce() # 処理終了 server.close() vrc.stopExecution()
ここまでのディレクトリ構成を以下に示す.
~/juliusKit |--dictationKit_v4.3.1 |--word.dic |--word.jconf |--grammarKit | |--controller | |--compile.sh | |--mkdfa.pl | |--mkfa | |--utf8_controller.grammar | |--utf8_controller.voca |--outYomi.sh |--word.yomi |--VR_ConstClass.py |--parseJuliusData.py |--webSocketClient.py |--configuration.py |--adrsirlib.py # 追加部分 |--audioPlayer.py # 追加部分 |--getWeather.py # 追加部分 |--voiceRecognitiond.py # 追加部分 |--jsonData | |--TVData.json | |--confParam.json | |--lightData.json |--wavFile |--TVOff.wav |--TVOn.wav |--lightDown.wav |--lightOff.wav |--lightOn.wav |--lightUp.wav |--nightLight.wa
現状,Ctrl + C または,kill コマンドによりプログラムが終了するようになっている.メイン処理の「webSocketProcessClass」というクラスの定義方法がこれで正しいかどうかは分からないが,現状では,この方法でしか WebSocket のサーバを立てることができなかった.他によい方法があればそちらを採用したい.
関数の呼び出し方法等が分かりづらいため,何かしら資料を作成し可視化しておきたい.その前に,この Python スクリプトをデーモン化して,起動時に自動実行するような設定をしようと思っている.
余談
音声認識に記述文法を用いているため,誤認識が時々ある.この認識精度を上げるよい方法はないのだろうか.信頼度とかを使うべきか?