作業中のメモ

よく「計算機」を使って作業をする.知らなかったことを中心にまとめるつもり.

音声認識による赤外線機器の操作 その 7【全体制御編】

どうも,筆者です.

前回までで,認識した単語の解析まで行えるようになった.後は,選択した単語に対応する応答メッセージ(音声)を流しつつ,赤外線信号を送信すればよい.

前回までの記事は以下にある.

workspacememory.hatenablog.com

今回の対象

今回の実装対象を以下に示す.

f:id:mathematicsphysical:20180603202916p:plain
今回の実装対象

実装対象一覧

実装するクラスを以下に示す.

  • audioPlayer.py(応答メッセージ再生用)
  • getWeather.py(天気予報取得用)
  • voiceRecognition.py(音声認識用,メイン部分)

これらを順に実装する.

audioPlayer

音声再生には,Linux の aplay コマンドを利用する.「audioPlayer.py」の実装を以下に示す.

~/juliusKit $ touch audioPlayer.py
~/juliusKit $ vim audioPlayer.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# audioPlayer.py

import subprocess as sb
from VR_ConstClass import CONST_CLASS

class audioClass():
    def __init__(self):
        self.__audioFile = ''
        self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL)
        self.__process.communicate()

    def setAudioFile(self, audio_file):
        self.__audioFile = audio_file

    def audioExitHandler(self):
        if self.__process.poll() is None:
            self.__process.terminate()

    def playAudio(self):
        if self.__audioFile == '':
            return
        self.audioExitHandler()
        useCmd = ['/usr/bin/aplay', CONST_CLASS.WAVE_DIR + self.__audioFile]
        self.__process = sb.Popen(useCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)

getWeather

天気予報の取得には,Weather Hacks API を利用する.使い方は,以下のサイトを参考にした.

tarao-mendo.blogspot.com

また,受信した情報を読み上げたいと考えたため,「Open JTalk」を利用して音声再生を行った.ここだけ,天気予報受信→再生という流れを取っているため,非同期で処理できていない.インストール方法は以下を参考にした.

www.taneyats.com

インストール後,実装したコードを以下に示す.ファイル名を「getWeather.py」として保存した.

~/juliusKit $ touch getWeather.py
~/juliusKit $ vim getWeather.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# getWeather.py

import requests
import subprocess as sb
from VR_ConstClass import CONST_CLASS

class weatherClass():
    def __init__(self):
        self.__url = 'http://weather.livedoor.com/forecast/webservice/json/v1'
        self.__city = 130010
        self.__titleStr = 'title'
        self.__loopStr = 'forecasts'
        self.__dateStr = 'date'
        self.__telopStr = 'telop'
        self.__dicDir = '/usr/local/share/openJTalk/dic/'
        self.__voiceFile = '/usr/local/share/openJTalk/voice/mei_normal.htsvoice'
        self.__process = sb.Popen(['/bin/echo', '0.001'], shell=True, stdout=sb.DEVNULL, stderr=sb.DEVNULL)
        self.__process.communicate()

    def __getWeather(self):
        getURL = '{0}?city={1}'.format(self.__url, self.__city)
        apiData = requests.get(getURL).json()
        retList = [apiData[self.__titleStr]]

        for weather in apiData[self.__loopStr]:
            tmpData = list(map(lambda x: int(x, 10), weather[self.__dateStr].split('-')))
            weatherDate = '{0}年{1}月{2}日'.format(tmpData[0], tmpData[1], tmpData[2])
            weatherForecasts = weather[self.__telopStr]
            retList.append('{0},{1}'.format(weatherDate, weatherForecasts))
        return retList

    # 天気予報取得関数の終了処理
    def weatherExitHandler(self):
        if self.__process.poll() is None:
            self.__process.terminate()

    def run(self, wavFile='weather.wav'):
        listData = self.__getWeather()
        # 読み上げるテキスト
        text = ','.join(listData).replace(' ', '')
        output = CONST_CLASS.WAVE_DIR + wavFile
        outputList = ['/bin/echo', '"' + str(text) + '"', '|']
        outputList.extend(['/usr/local/bin/open_jtalk', '-m', self.__voiceFile, '-ow', output, '-x', self.__dicDir])
        # 実行コマンド
        command = ' '.join(outputList)
        try:
            # コマンドの実行
            self.__process = sb.Popen(command, shell=True, stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE)
            # 処理の完了待ち
            self.__process.communicate()
        except:
            self.weatherExitHandler()
        return text

voiceRecognition

メイン処理を実装している部分である.以前導入した「SimpleWebSocketServer」をメインで動作させる.メッセージを受信したら,対象の関数を呼び出し単語解析を実施する.解析結果から赤外線データの送信等の処理を行う.

以下のスクリプトを「voiceRecognition.py」として保存する.

~/juliusKit $ touch voiceRecognition.py
~/juliusKit $ vim voiceRecognition.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# voiceRecognition.py

from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
import signal
import parseJuliusData, adrsirlib, getWeather
import configuration, audioPlayer
from VR_ConstClass import CONST_CLASS

class voiceRecognitionClass():
    def __init__(self):
        # julius クラスのインスタンス
        self.__julius = parseJuliusData.parseJuliusDataClass()
        # configuration クラスのインスタンス
        self.__configure = configuration.voiceConfigClass()
        # voice の設定
        self.__audio = audioPlayer.audioClass()
        # weather の設定
        self.__weather = getWeather.weatherClass()

    # 処理開始
    def startExecution(self):
        self.__julius.startThread()

    # 処理終了
    def stopExecution(self):
        self.__julius.stopThread()          # julius の停止
        self.__audio.audioExitHandler()     # 音声の停止
        self.__weather.weatherExitHandler() # 天気予報のデータ生成停止

    # 実行用関数
    def analysis(self, wordData):
        # ToDo: スリープモード移行時
        if False:
            retVal = self.__configure.setJuliusState(False)
            # 動作中だった場合
            if retVal:
                # 停止状態に移行
                pass
        # ToDo: マニュアル操作時
        elif False:
            local_text = wordData
            sendCmd = self.__configure.getIrCmd(local_text)

            if sendCmd is not None:
                # ADRSIR にコマンドを送信
                adrsirlib.write(sendCmd)
        # それ以外
        else:
            retStatus, sendCmd, audioFile = self.__configure.chkCmdExection(wordData)
            retVal = (retStatus != self.__configure.juliusReturnState[False])
            self.__audio.setAudioFile(audioFile)

            if retVal:
                outputText = self.__printMsg[retVal]
                # 天気を読み上げる場合
                if retStatus == self.__configure.juliusReturnState['weather']:
                    outputText = self.__weather.run(wavFile=audioFile)
                self.__audio.playAudio()

                if sendCmd is not None:
                    # ADRSIR にコマンドを送信
                    adrsirlib.write(sendCmd)

class processStatusClass():
    def __init__(self):
        self.__running = True
        self.__stopped = False
        self.__status = self.__running
        signal.signal(signal.SIGINT, self.changeState)  # Ctrl + C の監視
        signal.signal(signal.SIGTERM, self.changeState) # kill コマンドの監視

    def changeState(self, signum, frame):
        self.__status = self.__stopped

    def getStatus(self):
        return self.__status

if __name__ == '__main__':
    # processStatus クラスのインスタンス
    procStat = processStatusClass()
    # voiceRecognitiond クラスのインスタンス
    vrc = voiceRecognitionClass()
    # 処理開始
    vrc.startExecution()
    # SimpleWebSocketServer 用のクラス定義
    class webSocketProcessClass(WebSocket):
        def handleMessage(self):
            vrc.analysis(self.data)
    # WebSocket 用のサーバのインスタンス生成
    server = SimpleWebSocketServer(
        CONST_CLASS.WEBSOCKET_HOST,
        CONST_CLASS.WEBSOCKET_PORT,
        webSocketProcessClass
    )

    # main loop
    while procStat.getStatus():
        server.serveonce()

    # 処理終了
    server.close()
    vrc.stopExecution()

ここまでのディレクトリ構成を以下に示す.

~/juliusKit
   |--dictationKit_v4.3.1
       |--word.dic
       |--word.jconf
   |--grammarKit
   |   |--controller
   |        |--compile.sh
   |        |--mkdfa.pl
   |        |--mkfa
   |        |--utf8_controller.grammar
   |        |--utf8_controller.voca
   |--outYomi.sh
   |--word.yomi
   |--VR_ConstClass.py
   |--parseJuliusData.py
   |--webSocketClient.py
   |--configuration.py
   |--adrsirlib.py         # 追加部分
   |--audioPlayer.py       # 追加部分
   |--getWeather.py        # 追加部分
   |--voiceRecognitiond.py # 追加部分
   |--jsonData
   |   |--TVData.json
   |   |--confParam.json
   |   |--lightData.json
   |--wavFile
        |--TVOff.wav
        |--TVOn.wav
        |--lightDown.wav
        |--lightOff.wav
        |--lightOn.wav
        |--lightUp.wav
        |--nightLight.wa

現状,Ctrl + C または,kill コマンドによりプログラムが終了するようになっている.メイン処理の「webSocketProcessClass」というクラスの定義方法がこれで正しいかどうかは分からないが,現状では,この方法でしか WebSocket のサーバを立てることができなかった.他によい方法があればそちらを採用したい.

関数の呼び出し方法等が分かりづらいため,何かしら資料を作成し可視化しておきたい.その前に,この Python スクリプトをデーモン化して,起動時に自動実行するような設定をしようと思っている.

余談

音声認識に記述文法を用いているため,誤認識が時々ある.この認識精度を上げるよい方法はないのだろうか.信頼度とかを使うべきか?

音声認識による赤外線機器の操作 その 6【単語解析編】

どうも,筆者です.

前回は,以下の 3 つを実装した.

  • Julius の起動
  • Julius からの認識結果の取得
  • WebSocket を用いて認識結果を送信

前回までの記事は以下にある.

workspacememory.hatenablog.com

製作状況

製作状況を以下に示す.パーサーの部分は製作が完了している.今回は,単語解析の部分を実装する.

f:id:mathematicsphysical:20180603192707p:plain
製作状況

今回の対象

今回対象とする単語解析の部分の構成を以下に示す.

f:id:mathematicsphysical:20180603192721p:plain
単語解析

流れとして,受信した単語を単語解析関数に入力し,戻り値として,解析結果,赤外線コマンド,音声ファイルのデータを返すものとする.これらの結果から,音声を流すかどうか,赤外線コマンドを送信するかどうかを決める.

単語の認識方法

さて,問題となる単語の認識方法であるが,ここでは,受信した単語に規定の単語が含まれている場合,その単語が発音されたものとする.具体例を以下に示す.

f:id:mathematicsphysical:20180603192752p:plain
単語認識用データ

ここで,必須項目は,解析をするにあたり,必ず含まれている必要がある単語となる.必要項目は,列挙されているもののうち,ひとつでも含めばよいものである.空リストになっているものは,解析対象に含まれないことを意味する.それぞれの単語が見つかった場合の具体的な処理方法も別途データとして用意しておく必要がある.今回は,以下のようなデータを用意した.

f:id:mathematicsphysical:20180603192746p:plain
解析時に利用するデータ

起動時に上記のデータを読み込み,処理に利用する.それぞれの項目の働きを以下に示す.

  • machine:機器の状態を管理するために利用する(Boolean 型).
  • json:読み込む JSON ファイル
  • cmd:JSON ファイルに記述されているコマンドのうち,実行するコマンド
  • voice:コマンド実行時に再生する音声ファイル
  • chkState:現在の状態から次の内部状態を決めるパラメータ

「machine」と「chkState」は関連しており,例えば,テレビがついているのに「テレビつけて」と発声した場合,ほとんどのテレビはオン/オフが同一ボタンであるため,テレビの電源がオフになってしまう.これを防ぐために,現状のテレビの状態を監視しておく必要がある.その際に利用するのが「machine」の部分となる.

また,誤認識防止のため,過去 1 回前までの単語のキーを記憶しておくようにしている.ただ,これに伴い,電気を明るくしたり,(ここにはないが)テレビの音量を上げる場合に必要となる「連続動作」ができなくなる.連続動作の対象となるものは,今回認識した単語を記憶しないようにしたいため,「chkState」を利用する.「chkState」は他にも,テレビがオンのとき「テレビをオンにして」という命令を棄却するのにも利用している.

内部状態を決めるパラメータの定義

現状,「chkState」では,1 Byte のデータを用いて,いくつかの状態を表せるようになっている.その内訳を以下に示す.

f:id:mathematicsphysical:20180603192727p:plainf:id:mathematicsphysical:20180603192733p:plain
内部状態の内訳

これらのうち,予約済みのものを以下に示す.

f:id:mathematicsphysical:20180603192740p:plain
予約済みの内部状態パラメータ

単語ファイルの更新が追いついていないが,最終的には,「Ok,Google」と話しかけたら認識開始,「ありがとう(仮)」と話しかけたら認識終了としようと考えている.

単語解析コード

上記の設定を実装したものを以下に示す.ここでは,「configuration.py」として保存する.また,実行には,「jsonData」のディレクトリに対象とするデータが追加されている必要がある.

~/juliusKit $ touch configuration.py
~/juliusKit $ vim configuration.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# configuration.py

import json
from VR_ConstClass import CONST_CLASS

class voiceConfigClass():
    def __init__(self):
        # 定数定義(public,結果解析で利用するため,外部から参照可能にしている)
        self.juliusReturnState = {
            True:      int('0x00', 16),
            False:     int('0x01', 16),
            'weather': int('0x02', 16)
        }

        # コンフィグ情報のマスク処理用パラメータ
        self.__maskDict = {'upper': int('0xF0', 16), 'lower': int('0x0F', 16)}
        # 機器の状態(True: 起動状態,False: 停止状態)
        self.__state = {'julius': False}
        # Julius の処理対象
        self.__JuliusConfig = {
            'invalid':     int('0x01', 16), # 無効値
            'wakeUp':      int('0xFF', 16), # 命令解析開始
            'sleep':       int('0xF8', 16), # 命令解析終了
            'weather':     int('0xF0', 16), # 天気予報読み上げ
            'init':        int('0xF1', 16), # 状態変数の初期化
            'active':      int('0x10', 16), # 実行中
            'stopped':     int('0x80', 16), # 停止中
            'update':      int('0x01', 16), # 状態を更新する
            'delPrevWord': int('0x02', 16)  # 直前の単語を記憶しない
        }
        # 語彙情報(定数)
        self.__confParamFile = 'confParam.json' # 図に示した「解析時に利用するデータ」が格納されている JSON ファイル
        self.__vocabInfo = None
        # 語彙リスト(図に示した「単語認識用データ」)
        # key: 単語ラベル
        # val: 語彙候補(mandatory: 必須項目,required: 必要項目)
        self.__vocabListData = {
            'lightOn': {
                'mandatory': [str('電気')],
                'required': [str('つけて'), str('オン')]
            },
            'lightOff': {
                'mandatory': [str('電気')],
                'required':  [str('オフ'), str('切って')]
            },
            'lightUp': {
                'mandatory': [str('明るくして')],
                'required':  []
            },
            'lightDown': {
                'mandatory': [str('暗くして')],
                'required':  []
            },
            'nightLight': {
                'mandatory': [str('こだまにして')],
                'required':  []
            },
            'TVOn': {
                'mandatory': [str('テレビ')],
                'required':  [str('つけて'), str('オン')]
            },
            'TVOff': {
                'mandatory': [str('テレビ')],
                'required':  [str('オフ'), str('切って')]
            }
        }
        # 作業用変数
        self.__jsonData = {}
        self.__prevWordIdx = ''
        self.__initConfigData()

    # 状態変数の初期化
    def __initStat(self, invalidList):
        keyList = [key for key, _ in self.__state.items()]
        for key in list(set(keyList) - set(invalidList)):
            self.__state[key] = False

    # 初期化処理関数
    def __initConfigData(self):
        # config データの取得
        jsonFileList = []
        machineTypeList = []
        # json file から config 情報を読み込む
        with open(CONST_CLASS.JSON_DIR + self.__confParamFile, 'r') as fin:
            self.__vocabInfo = json.load(fin)

        for key in self.__vocabInfo.keys():
            useMachine = self.__vocabInfo[key]['machine'].lower()
            self.__vocabInfo[key]['machine'] = useMachine
            jsonFileList.append(self.__vocabInfo[key]['json'])
            machineTypeList.append(useMachine)

        # 状態変数のキーの追加
        for key in list(set(machineTypeList)):
            self.__state[key] = False

        # json file の読み込み(list(set(---)) で重複を削除)
        for targetJsonFile in list(set(jsonFileList)):
            try:
                with open(CONST_CLASS.JSON_DIR + targetJsonFile, 'r') as fin:
                    self.__jsonData[targetJsonFile] = json.load(fin)
            except:
                # 該当する json file が存在しない場合,処理せず読み飛ばす
                pass

    # julius の状態の設定
    def setJuliusState(self, status):
        retVal = self.__state['julius']
        self.__state['julius'] = status
        return retVal

    # 状態の出力
    def getStatus(self):
        return [(key, self.__state[key]) for key in sorted(list(self.__state.keys()))]

    # 合致する単語を探す
    def __findMatchWord(self, wordData):
        retWordIdx = None

        # 単語の一覧から合致するパターンを抽出
        for key, wordDictList in self.__vocabListData.items():
            mandatoryData = wordDictList['mandatory']
            requiredData = wordDictList['required']
            # 必須項目の単語が含まれているか調査
            isExistMandatory = True
            for targetWord in mandatoryData:
                if targetWord not in wordData:
                    isExistMandatory = False
                    break
            # 必要項目の単語が含まれているか調査
            if len(requiredData) == 0:
                isExistRequired = True
            else:
                isExistRequired = False
                for targetWord in requiredData:
                    if targetWord in wordData:
                        isExistRequired = True

            if isExistMandatory and isExistRequired:
                retWordIdx = key
                break
        return retWordIdx

    # コマンドの実行可能判定
    def chkCmdExection(self, wordData):
        local_tmpVal = False
        returnState = self.juliusReturnState[local_tmpVal]
        retCmd = None
        wordIdx = self.__findMatchWord(wordData)
        retAudio = ''

        # 単語が候補に存在するかつ,前回と同じ単語でない
        if wordIdx is not None and wordIdx != self.__prevWordIdx:
            useMachine = self.__vocabInfo[wordIdx]['machine']
            useState = int(self.__vocabInfo[wordIdx]['chkState'], 16)
            useJsonFile = self.__vocabInfo[wordIdx]['json']
            useCmd = self.__vocabInfo[wordIdx]['cmd']
            tmpAudio = self.__vocabInfo[wordIdx]['voice']
            useAudio = {False: '', True: tmpAudio, 'weather': tmpAudio}

            try:
                retCmd = self.__jsonData[useJsonFile][useCmd]
            except:
                # コマンドが存在しない場合
                retCmd = None
            # 命令解析開始メッセージの場合
            if useState == self.__JuliusConfig['wakeUp']:
                self.setJuliusState(True)
                local_tmpVal = True
                self.__prevWordIdx = ''
            elif self.__state['julius']:
                # 命令解析終了メッセージの場合
                if useState == self.__JuliusConfig['sleep']:
                    self.setJuliusState(False)
                    local_tmpVal = True
                    self.__prevWordIdx = ''
                # Julius の初期化要求の場合
                elif useState == self.__JuliusConfig['init']:
                    # 状態を初期化
                    self.__initStat(['julius'])
                    local_tmpVal = True
                    self.__prevWordIdx = ''
                # 天気予報読み上げ要求の場合
                elif useState == self.__JuliusConfig['weather']:
                    local_tmpVal = 'weather'
                    self.__prevWordIdx = ''
                # 無効値でない場合
                elif useState != self.__JuliusConfig['invalid']:
                    # 入力コマンドの動作条件を確認し,コマンドを実行するか決める
                    upperInfo = (useState & self.__maskDict['upper'])
                    lowerInfo = (useState & self.__maskDict['lower'])

                    if upperInfo == self.__JuliusConfig['active']:
                        # [期待値]対象機器が動作中
                        local_tmpVal = self.__state[useMachine]
                    elif upperInfo == self.__JuliusConfig['stopped']:
                        # [期待値]対象機器が停止中
                        local_tmpVal = not self.__state[useMachine]
                    else:
                        # [期待値]なし で初期化
                        local_tmpVal = True

                    # コマンドを実行する場合
                    if local_tmpVal:
                        # 現在の入力単語を記録
                        self.__prevWordIdx = wordIdx
                        # 下位 4 ビットを確認し処理する
                        if lowerInfo == self.__JuliusConfig['update']:
                            # 状態を反転
                            self.__state[useMachine] = not self.__state[useMachine]
                        elif lowerInfo == self.__JuliusConfig['delPrevWord']:
                            # 記録した単語を削除
                            self.__prevWordIdx = ''
            retAudio = useAudio[local_tmpVal]
            returnState = self.juliusReturnState[local_tmpVal]
        return returnState, retCmd, retAudio

自分が利用している「confParam.json」ファイルを以下に示す.

{
    "lightOn": {
        "cmd": "all_light",
        "machine": "light",
        "json": "lightData.json",
        "voice": "lightOn.wav",
        "chkState": "0x81"
    },
    "lightOff": {
        "cmd": "light_off",
        "machine": "light",
        "json": "lightData.json",
        "voice": "lightOff.wav",
        "chkState": "0x11"
    },
    "lightUp": {
        "cmd": "up",
        "machine": "light",
        "json": "lightData.json",
        "voice": "lightUp.wav",
        "chkState": "0x12"
    },
    "lightDown": {
        "cmd": "down",
        "machine": "light",
        "json": "lightData.json",
        "voice": "lightDown.wav",
        "chkState": "0x12"
    },
    "nightLight": {
        "cmd": "night_light",
        "machine": "light",
        "json": "lightData.json",
        "voice": "nightLight.wav",
        "chkState": "0x11"
    },
    "TVOn": {
        "cmd": "power",
        "machine": "tv",
        "json": "TVData.json",
        "voice": "TVOn.wav",
        "chkState": "0x81"
    },
    "TVOff": {
        "cmd": "power",
        "machine": "tv",
        "json": "TVData.json",
        "voice": "TVOff.wav",
        "chkState": "0x11"
    }
}

長くなってしまったため,main 関数部分と解析結果部分に関しては,次回にまわす.main 関数部分が軸となって動作するため,これまでの関数を利用してスレッドを立ち上げつつ動作する.ここまでのディレクトリ構成を以下に示す.

~/juliusKit
   |--dictationKit_v4.3.1
       |--word.dic
       |--word.jconf
   |--grammarKit
   |   |--controller
   |        |--compile.sh
   |        |--mkdfa.pl
   |        |--mkfa
   |        |--utf8_controller.grammar
   |        |--utf8_controller.voca
   |--outYomi.sh
   |--word.yomi
   |--VR_ConstClass.py
   |--parseJuliusData.py
   |--webSocketClient.py
   |--configuration.py    # 追加部分
   |--jsonData            # 追加部分
   |   |--TVData.json
   |   |--confParam.json
   |   |--lightData.json
   |--wavFile             # 追加部分
         |--TVOff.wav
         |--TVOn.wav
         |--lightDown.wav
         |--lightOff.wav
         |--lightOn.wav
         |--lightUp.wav
         |--nightLight.wav