作業中のメモ

よく「計算機」を使って作業をする.知らなかったことを中心にまとめるつもり.

音声認識による赤外線機器の操作 その 5【認識結果パース編】

どうも,筆者です.

前回までで,赤外線操作ができた.ここからは,これらを組み合わせて音声認識結果から赤外線操作を行う.

前回までの記事は以下にある.

workspacememory.hatenablog.com

全体像

ここで,全体像を示しておく.細かい部分や他との関連は,各々のコードを使いつつ確認する.

f:id:mathematicsphysical:20180527175400p:plain
全体像

現状の処理の流れとしては,以下のようになる.

  1. マイクから音声を拾う.
  2. Julius により音声認識を行い,認識結果を得る.
  3. 認識結果から,利用する赤外線データを選択する.
  4. 赤外線データを ADRSIR を用いて外部に出力する.

製作状況

現状では,音声認識結果と赤外線送信は機能としてある.(どちらも,外部ツールに頼っているため,自分で触った部分はほとんどないが...) 今回は,Julius の音声認識結果をパースし,パース結果(単語情報)から赤外線データを選択する部分までを解説する.

f:id:mathematicsphysical:20180527175413p:plain
今回の対象

認識結果解析の中身

ここでは,認識結果の解析を行うための方法を考える.将来的に別の外部入力から単語情報を入力しても操作できるようにしたいため,以下のように処理を分割する.

  • Julius から単語情報を抽出する.
    • Julius の起動モードを変更することで,プログラムから結果を受信できる.
    • データの受信には socket 通信を用いる.
  • 単語情報から赤外線情報を選択する.
    • 単語の受信には WebSocket を用いる.

f:id:mathematicsphysical:20180527175420p:plain
認識結果解析

Julius から単語情報を抽出

さて,Python を用いて実際にプログラミングをしていく.これまで,Julius を「-demo」モードで起動してきたが,プログラムからデータを受信するためには,「-module」モードで起動する必要がある.これは単に引数を変更するだけで対応できる. また,「-module」モードで起動した Julius は,localhost の 10500 番に結果を出力する仕様であるため,このホストとポートで入力待ちをすればよい.

グローバル変数の定義

今後,外部ツールや JSON ファイルを読み出すことになるため,必要なパスをグローバル変数として保持しておく.ここでは,以下のようなクラスをもつ Python スクリプトを作成した.ファイル名は「VR_ConstClass.py」である.

~/juliusKit $ touch VR_ConstClass.py
~/juliusKit $ vim VR_ConstClass.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# VR_ConstClass.py

class CONST_CLASS:
    # ==========================
    # = common const parameter =
    # ==========================
    ROOT_DIR = '/home/pi/juliusKit/' # root directory
    JSON_DIR = ROOT_DIR + 'jsonData/'  # json directory
    WAVE_DIR = ROOT_DIR + 'wavFile/'   # wave directory
    WEBSOCKET_HOST = 'localhost'       # WebSocket host name
    WEBSOCKET_PORT = 10510             # WebSocket port number

ここにあるように,最終的には発音に対し音声で対応できるようにしたいと考えている.

Julius の起動と接続の確立

Julius を起動と Julius に接続するまでの処理を記したスクリプト「parseJuliusData.py」を以下に示す.今後,これに変更を加えていく.

~/juliusKit $ touch parseJuliusData.py
~/juliusKit $ vim parseJuliusData.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# parseJuliusData.py

import subprocess as sb
import time, socket, threading, select
from VR_ConstClass import CONST_CLASS

class parseJuliusDataClass():
    def __init__(self):
        # julius ホスト名
        self.__juliusHost = 'localhost'
        # julius ポート番号
        self.__juliusPort = 10500
        # タイムアウトの最大時間 [sec]
        self.__maxTimeOut = 20.0
        # 受信データサイズ
        self.__receiveSize = 2048
        # Julius サーバ起動コマンド
        #jconfFile = 'dictationKit_v4.3.1/word.jconf' # 単語認識版
        jconfFile = 'grammarKit/control.jconf' # 文法認識版
        self.__juliusServerCmd = [
            '/usr/local/bin/julius', '-C',
            CONST_CLASS.ROOT_DIR + jconfFile,
            '-module', '-nostrip'
        ]
        self.__process = None
        self.__socket = None
        self.__isRunning = True

    # Julius の起動
    def __wakeUpJuliusServer(self):
        # Julius Server の起動
        self.__process = sb.Popen(self.__juliusServerCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)

    # julius サーバに接続
    def __connectionJuliusServer(self, arg_maxTimeout):
        while True:
            try:
                # ソケットの生成
                self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.__socket.connect((self.__juliusHost, self.__juliusPort))
                break
            except:
                self.__socket = None
                time.sleep(arg_maxTimeout)

    # 実行用関数
    def __parseText(self):
        local_maxTimeOut = 3
        # Julius の起動
        self.__wakeUpJuliusServer()
        # サーバに接続
        self.__connectionJuliusServer(local_maxTimeOut)

subprocess や socket の使い方は各自調べてほしい.

WebSocket 利用のための準備

今回は,Julius から受信したデータを単語解析処理に送信する際に,WebSocket を利用するため,コマンドラインで以下をインストールする.

~/juliusKit $ sudo pip3 install git+https://github.com/dpallot/simple-websocket-server.git
~/juliusKit $ sudo pip3 install websocket-client

インストール後,まずは,client 側のコードを作成する.以下のように「webSocketClient.py」として保存する.

~/juliusKit $ touch webSocketClient.py
~/juliusKit $ vim webSocketClient.py # エディタは自分の使いやすいものを利用する
#!/usr/bin/python3
# -*- coding: utf-8 -*-

# webSocketClient.py

from websocket import create_connection

class wsClientClass():
    def __init__(self, host, port):
        self.__connectAddr = 'ws://{0}:{1}/'.format(host, port)

    def sendMsg(self, message):
        try:
            ws = create_connection(self.__connectAddr)
            ws.send(message)
            ws.close()
        except:
            pass

WebSocket の処理の追加

上記のコードを先程のコードに組み込むと,以下のようになる.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# parseJuliusData.py

import subprocess as sb
import webSocketClient as wsClient # 追加部分
import time, socket, threading, select
from VR_ConstClass import CONST_CLASS

class parseJuliusDataClass():
    def __init__(self):
        # julius ホスト名
        self.__juliusHost = 'localhost'
        # julius ポート番号
        self.__juliusPort = 10500
        # タイムアウトの最大時間 [sec]
        self.__maxTimeOut = 20.0
        # 受信データサイズ
        self.__receiveSize = 2048
        # Julius サーバ起動コマンド
        #jconfFile = 'dictationKit_v4.3.1/word.jconf' # 単語認識版
        jconfFile = 'grammarKit/control.jconf' # 文法認識版
        self.__juliusServerCmd = [
            '/usr/local/bin/julius', '-C',
            CONST_CLASS.ROOT_DIR + jconfFile,
            '-module', '-nostrip'
        ]
        self.__process = None
        self.__socket = None
        self.__isRunning = True

    # Julius の起動
    def __wakeUpJuliusServer(self):
        # Julius Server の起動
        self.__process = sb.Popen(self.__juliusServerCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)

    # julius サーバに接続
    def __connectionJuliusServer(self, arg_maxTimeout):
        while True:
            try:
                # ソケットの生成
                self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.__socket.connect((self.__juliusHost, self.__juliusPort))
                break
            except:
                self.__socket = None
                time.sleep(arg_maxTimeout)

    # 実行用関数
    def __parseText(self):
        local_maxTimeOut = 3
        # Julius の起動
        self.__wakeUpJuliusServer()
        # サーバに接続
        self.__connectionJuliusServer(local_maxTimeOut)
        # WebSocket Client のインスタンス生成
        client = wsClient.wsClientClass(
            CONST_CLASS.WEBSOCKET_HOST,
            CONST_CLASS.WEBSOCKET_PORT
        )

受信処理関数の定義

実際に Julius からデータを受信し処理する部分を以下に示す.

       readData = ''
        while self.__isRunning:
            # ソケットが読み込み可能状態になるまで待機
            inputReady, _, _ = select.select([self.__socket], [], [], self.__maxTimeOut)

            # 空リストの場合
            if len(inputReady) == 0:
                # タイムアウトしたため,スリープモードへ遷移
                # ToDo: タイムアウト処理
                pass
            else:
                # 読み込み可能状態のソケットが含まれる場合
                if self.__socket in inputReady:
                    readData += str(self.__socket.recv(self.__receiveSize).decode('utf-8'))

                    # 「認識結果」のデータがある場合
                    if '</RECOGOUT>\n.' in readData:
                        wordData = ''

                        for line in readData.split('\n'):
                            # WORD という単語を探す
                            searchStr = 'WORD="'
                            matchIndex = line.find(searchStr)

                            if matchIndex >= 0:
                                # 単語が存在する場合,その単語を抽出
                                startIdx = matchIndex + len(searchStr)
                                wordData += str(line[startIdx:line.find('"', startIdx)])
                        # 単語の入力があった場合
                        if wordData != '':
                            # サーバにデータを送信
                            client.sendMsg(wordData)
                        readData = ''

受信処理関数のスレッド化

また,これだけが動作するわけではなく,今後作成するほかのプログラムと並列に動作してほしいため,スレッド化する.その処理を加えたものを以下に示す.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# parseJuliusData.py

import subprocess as sb
import webSocketClient as wsClient
import time, socket, threading, select
from VR_ConstClass import CONST_CLASS

class parseJuliusDataClass():
    def __init__(self):
        # julius ホスト名
        self.__juliusHost = 'localhost'
        # julius ポート番号
        self.__juliusPort = 10500
        # タイムアウトの最大時間 [sec]
        self.__maxTimeOut = 20.0
        # 受信データサイズ
        self.__receiveSize = 2048
        # Julius サーバ起動コマンド
        #jconfFile = 'dictationKit_v4.3.1/word.jconf' # 単語認識版
        jconfFile = 'grammarKit/control.jconf' # 文法認識版
        self.__juliusServerCmd = [
            '/usr/local/bin/julius', '-C',
            CONST_CLASS.ROOT_DIR + jconfFile,
            '-module', '-nostrip'
        ]
        self.__process = None
        self.__socket = None
        self.__isRunning = True
        self.__thread = threading.Thread(target=self.__parseText)

    # スレッドを開始する
    def startThread(self):
        self.__isRunning = True
        self.__thread.start()

    # スレッドを停止する
    def stopThread(self):
        self.__isRunning = False   # 停止イベントを設定
        self.__thread.join()       # スレッドが停止するのを待つ
        self.__process.terminate() # サブプロセスを終了する
        self.__socket.close()      # ソケットを閉じる

    # Julius の起動
    def __wakeUpJuliusServer(self):
        # Julius Server の起動
        self.__process = sb.Popen(self.__juliusServerCmd, stdout=sb.DEVNULL, stderr=sb.DEVNULL)

    # julius サーバに接続
    def __connectionJuliusServer(self, arg_maxTimeout):
        while True:
            try:
                # ソケットの生成
                self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.__socket.connect((self.__juliusHost, self.__juliusPort))
                break
            except:
                self.__socket = None
                time.sleep(arg_maxTimeout)

    # 実行用関数
    def __parseText(self):
        local_maxTimeOut = 3
        # Julius の起動
        self.__wakeUpJuliusServer()
        # サーバに接続
        self.__connectionJuliusServer(local_maxTimeOut)
        # WebSocket Client のインスタンス生成
        client = wsClient.wsClientClass(
            CONST_CLASS.WEBSOCKET_HOST,
            CONST_CLASS.WEBSOCKET_PORT
        )

        readData = ''
        while self.__isRunning:
            # ソケットが読み込み可能状態になるまで待機
            inputReady, _, _ = select.select([self.__socket], [], [], self.__maxTimeOut)

            # 空リストの場合
            if len(inputReady) == 0:
                # タイムアウトしたため,スリープモードへ遷移
                # ToDo: タイムアウト処理
                pass
            else:
                # 読み込み可能状態のソケットが含まれる場合
                if self.__socket in inputReady:
                    readData += str(self.__socket.recv(self.__receiveSize).decode('utf-8'))

                    # 「認識結果」のデータがある場合
                    if '</RECOGOUT>\n.' in readData:
                        wordData = ''

                        for line in readData.split('\n'):
                            # WORD という単語を探す
                            searchStr = 'WORD="'
                            matchIndex = line.find(searchStr)

                            if matchIndex >= 0:
                                # 単語が存在する場合,その単語を抽出
                                startIdx = matchIndex + len(searchStr)
                                wordData += str(line[startIdx:line.find('"', startIdx)])
                        # 単語の入力があった場合
                        if wordData != '':
                            # サーバにデータを送信
                            client.sendMsg(wordData)
                        readData = ''

ここまでのディレクトリ構成を以下に示す.

~/juliusKit
   |--dictationKit_v4.3.1
       |--word.dic
       |--word.jconf
   |--grammarKit
   |   |--controller
   |        |--compile.sh
   |        |--mkdfa.pl
   |        |--mkfa
   |        |--utf8_controller.grammar
   |        |--utf8_controller.voca
   |--outYomi.sh
   |--word.yomi
   |--VR_ConstClass.py   # 追加部分
   |--parseJuliusData.py # 追加部分
   |--webSocketClient.py # 追加部分

長くなったため,一旦ココまでとする.今後,単語解析処理側のプログラムを作成する.