作業中のメモ

よく「計算機」を使って作業をする.知らなかったことを中心にまとめるつもり.

音声認識による赤外線機器の操作 その 7【全体制御編】

どうも,筆者です.

前回までで,認識した単語の解析まで行えるようになった.後は,選択した単語に対応する応答メッセージ(音声)を流しつつ,赤外線信号を送信すればよい.

前回までの記事は以下にある.

workspacememory.hatenablog.com

今回の対象

今回の実装対象を以下に示す.

今回の実装対象

実装対象一覧

実装するクラスを以下に示す.

  • audioPlayer.py(応答メッセージ再生用)
  • getWeather.py(天気予報取得用)
  • voiceRecognition.py(音声認識用,メイン部分)

これらを順に実装する.

audioPlayer

音声再生には,Linux の aplay コマンドを利用する.「audioPlayer.py」の実装を以下に示す.

~/juliusKit $ touch audioPlayer.py
~/juliusKit $ vim audioPlayer.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# audioPlayer.py

import subprocess as sb
from VR_ConstClass import CONST_CLASS

class audioClass():
    def __init__(self):
        self.__audioFile = ''
        self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL)
        self.__process.communicate()

    def setAudioFile(self, audio_file):
        self.__audioFile = audio_file

    def audioExitHandler(self):
        if self.__process.poll() is None:
            self.__process.terminate()

    def playAudio(self):
        if self.__audioFile == '':
            return
        self.audioExitHandler()
        useCmd = ['/usr/bin/aplay', CONST_CLASS.WAVE_DIR + self.__audioFile]
        self.__process = sb.Popen(useCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)

getWeather

天気予報の取得には,Weather Hacks API を利用する.使い方は,以下のサイトを参考にした.

tarao-mendo.blogspot.com

また,受信した情報を読み上げたいと考えたため,「Open JTalk」を利用して音声再生を行った.ここだけ,天気予報受信→再生という流れを取っているため,非同期で処理できていない.インストール方法は以下を参考にした.

www.taneyats.com

インストール後,実装したコードを以下に示す.ファイル名を「getWeather.py」として保存した.

~/juliusKit $ touch getWeather.py
~/juliusKit $ vim getWeather.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# getWeather.py

import requests
import subprocess as sb
from VR_ConstClass import CONST_CLASS

class weatherClass():
    def __init__(self):
        self.__url = 'http://weather.livedoor.com/forecast/webservice/json/v1'
        self.__city = 130010
        self.__titleStr = 'title'
        self.__loopStr = 'forecasts'
        self.__dateStr = 'date'
        self.__telopStr = 'telop'
        self.__dicDir = '/usr/local/share/openJTalk/dic/'
        self.__voiceFile = '/usr/local/share/openJTalk/voice/mei_normal.htsvoice'
        self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL)
        self.__process.communicate()

    def __getWeather(self):
        getURL = '{0}?city={1}'.format(self.__url, self.__city)
        apiData = requests.get(getURL).json()
        retList = [apiData[self.__titleStr]]

        for weather in apiData[self.__loopStr]:
            tmpData = list(map(lambda x: int(x, 10), weather[self.__dateStr].split('-')))
            weatherDate = '{0}年{1}月{2}日'.format(tmpData[0], tmpData[1], tmpData[2])
            weatherForecasts = weather[self.__telopStr]
            retList.append('{0},{1}'.format(weatherDate, weatherForecasts))
        return retList

    # 天気予報取得関数の終了処理
    def weatherExitHandler(self):
        if self.__process.poll() is None:
            self.__process.terminate()

    def run(self, wavFile='weather.wav'):
        listData = self.__getWeather()
        # 読み上げるテキスト
        text = ','.join(listData).replace(' ', '')
        output = CONST_CLASS.WAVE_DIR + wavFile
        outputList = ['/bin/echo', '"' + str(text) + '"', '|']
        outputList.extend(['/usr/local/bin/open_jtalk', '-m', self.__voiceFile, '-ow', output, '-x', self.__dicDir])
        # 実行コマンド
        command = ' '.join(outputList)
        try:
            # コマンドの実行
            self.__process = sb.Popen(command, shell=True, stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)
            # 処理の完了待ち
            self.__process.communicate()
        except:
            self.weatherExitHandler()
        return text

voiceRecognition

メイン処理を実装している部分である.以前導入した「SimpleWebSocketServer」をメインで動作させる.メッセージを受信したら,対象の関数を呼び出し単語解析を実施する.解析結果から赤外線データの送信等の処理を行う.

以下のスクリプトを「voiceRecognition.py」として保存する.

~/juliusKit $ touch voiceRecognition.py
~/juliusKit $ vim voiceRecognition.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# voiceRecognition.py

from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
import signal
import parseJuliusData, adrsirlib, getWeather
import configuration, audioPlayer
from VR_ConstClass import CONST_CLASS

class voiceRecognitionClass():
    def __init__(self):
        # julius クラスのインスタンス
        self.__julius = parseJuliusData.parseJuliusDataClass()
        # configuration クラスのインスタンス
        self.__configure = configuration.voiceConfigClass()
        # voice の設定
        self.__audio = audioPlayer.audioClass()
        # weather の設定
        self.__weather = getWeather.weatherClass()

    # 処理開始
    def startExecution(self):
        self.__julius.startThread()

    # 処理終了
    def stopExecution(self):
        self.__julius.stopThread()          # julius の停止
        self.__audio.audioExitHandler()     # 音声の停止
        self.__weather.weatherExitHandler() # 天気予報のデータ生成停止

    # 実行用関数
    def analysis(self, wordData):
        # ToDo: スリープモード移行時
        if False:
            retVal = self.__configure.setJuliusState(False)
            # 動作中だった場合
            if retVal:
                # 停止状態に移行
                pass
        # ToDo: マニュアル操作時
        elif False:
            local_text = wordData
            sendCmd = self.__configure.getIrCmd(local_text)

            if sendCmd is not None:
                # ADRSIR にコマンドを送信
                adrsirlib.write(sendCmd)
        # それ以外
        else:
            retStatus, sendCmd, audioFile = self.__configure.chkCmdExection(wordData)
            retVal = (retStatus != self.__configure.juliusReturnState[False])
            self.__audio.setAudioFile(audioFile)

            if retVal:
                outputText = self.__printMsg[retVal]
                # 天気を読み上げる場合
                if retStatus == self.__configure.juliusReturnState['weather']:
                    outputText = self.__weather.run(wavFile=audioFile)
                self.__audio.playAudio()

                if sendCmd is not None:
                    # ADRSIR にコマンドを送信
                    adrsirlib.write(sendCmd)

class processStatusClass():
    def __init__(self):
        self.__running = True
        self.__stopped = False
        self.__status = self.__running
        signal.signal(signal.SIGINT, self.changeState)  # Ctrl + C の監視
        signal.signal(signal.SIGTERM, self.changeState) # kill コマンドの監視

    def changeState(self, signum, frame):
        self.__status = self.__stopped

    def getStatus(self):
        return self.__status

if __name__ == '__main__':
    # processStatus クラスのインスタンス
    procStat = processStatusClass()
    # voiceRecognitiond クラスのインスタンス
    vrc = voiceRecognitionClass()
    # 処理開始
    vrc.startExecution()
    # SimpleWebSocketServer 用のクラス定義
    class webSocketProcessClass(WebSocket):
        def handleMessage(self):
            vrc.analysis(self.data)
    # WebSocket 用のサーバのインスタンス生成
    server = SimpleWebSocketServer(
        CONST_CLASS.WEBSOCKET_HOST,
        CONST_CLASS.WEBSOCKET_PORT,
        webSocketProcessClass
    )

    # main loop
    while procStat.getStatus():
        server.serveonce()

    # 処理終了
    server.close()
    vrc.stopExecution()

ここまでのディレクトリ構成を以下に示す.

~/juliusKit
   |--dictationKit_v4.3.1
       |--word.dic
       |--word.jconf
   |--grammarKit
   |   |--controller
   |        |--compile.sh
   |        |--mkdfa.pl
   |        |--mkfa
   |        |--utf8_controller.grammar
   |        |--utf8_controller.voca
   |--outYomi.sh
   |--word.yomi
   |--VR_ConstClass.py
   |--parseJuliusData.py
   |--webSocketClient.py
   |--configuration.py
   |--adrsirlib.py         # 追加部分
   |--audioPlayer.py       # 追加部分
   |--getWeather.py        # 追加部分
   |--voiceRecognitiond.py # 追加部分
   |--jsonData
   |   |--TVData.json
   |   |--confParam.json
   |   |--lightData.json
   |--wavFile
        |--TVOff.wav
        |--TVOn.wav
        |--lightDown.wav
        |--lightOff.wav
        |--lightOn.wav
        |--lightUp.wav
        |--nightLight.wa

現状,Ctrl + C または,kill コマンドによりプログラムが終了するようになっている.メイン処理の「webSocketProcessClass」というクラスの定義方法がこれで正しいかどうかは分からないが,現状では,この方法でしか WebSocket のサーバを立てることができなかった.他によい方法があればそちらを採用したい.

関数の呼び出し方法等が分かりづらいため,何かしら資料を作成し可視化しておきたい.その前に,この Python スクリプトをデーモン化して,起動時に自動実行するような設定をしようと思っている.

余談

音声認識に記述文法を用いているため,誤認識が時々ある.この認識精度を上げるよい方法はないのだろうか.信頼度とかを使うべきか?