(05)PythonでTOF距離センサー(バッファについて)

 2nd January 2023 at 11:20am

Terabee社製の「TeraRanger Evo Mini USB」(以降、EvoMini)を、Pythonで利用する方法をこちらに書きました。

使っていると気づくと思いますが、計測値が最新ではないことがあります。

PySerialのread()を実行すると、EvoMiniのバッファ内の古い値から読み込んでいくことが原因です。

これは(03)Python+OpenCVでUVC規格のカメラ制御(バッファについて)と同じように対策することで解決します。別スレッドでEvoMiniから常に値を読み取り、Pythonのqueueに最新の値のみ入るようする方法です。

以下はサンプルプログラムです。

# -*- coding: utf-8 -*-

"""
Distance sensor by TeraRanger Evo Mini.

・PySerial is required.
・PRINT OUT MODE = TEXT / PIXEL MODE = PIXEL1 / RANGE MODE = LONG
"""

import serial, queue, threading, time

PRINTOUT_TEXT = bytearray(b'\x00\x11\x01\x45')
PX1 = bytearray(b'\x00\x21\x01\xBC')
RANGE_LONG = bytearray(b'\x00\x61\x03\xE9')

class TeraRangerEvoMini:
    def __init__(self, port='COM1', baudrate=115200, bytesize=8, parity='N', stopbits=1):
        """
        Initialize.
        """
        self.conn = serial.Serial(
            port=port,
            baudrate=baudrate,
            bytesize=bytesize,
            parity=parity,
            stopbits=stopbits)
        self.conn.write(PX1)
        self.conn.write(RANGE_LONG)
        self.conn.write(PRINTOUT_TEXT)
        print('------------------------')
        print('TeraRangerEvoMini.__init__')
        print('------------------------')
        print('Port = {0}'.format(port))
        print('Baud Rate = {0}'.format(baudrate))
        print('Byte Size = {0}'.format(bytesize))
        print('Parity = {0}'.format(parity))
        print('Stop Bits = {0}'.format(stopbits))
        print('PRINT OUT MODE = TEXT')
        print('PIXEL MODE = PIXEL1')
        print('RANGE MODE = LONG')
        for i in range(20):
            val = self._get_until_lf()
            print('Check value : {0}'.format(val))
        self.q = queue.Queue()
        self.t = threading.Thread(target=self._reader)
        self.t.daemon = True
        self.t.start()
    def __del__(self):
        """
        Shutdown.
        """
        self.t.join(0)
        self.conn.close()
    def _get_until_lf(self):
        """
        Read until LF.
        """
        try:
            val = ''
            res = ''
            while res != '\n':
                val = val + res
                res = self.conn.read().decode()
            return int(val)
        except:
            return -1
    def _reader(self):
        """
        The queue always contains the latest values.
        """
        while True:
            val = self._get_until_lf()
            if not self.q.empty():
                try:
                    self.q.get_nowait()
                except queue.Empty:
                    pass
            self.q.put(val)
    def open(self):
        """
        Open.
        """
        try:
            if self.conn.isOpen():
                return
            else:
                self.conn.open()
            print('Reopen.')
        except Exception as e:
            print(e)
    def isConnect(self):
        """
        Check connection.
        
        * Closes when an error occurs.
        """
        try:
            if not self.conn.isOpen():
                self.conn.open()
            bret = self.conn.write(PRINTOUT_TEXT) == 4
            return bret
        except Exception as e:
            print(e)
            self.conn.close()
            return False
    def getDistance(self):
        """
        Measure the distance.
        """
        try:
            if not self.conn.isOpen():
                self.conn.open()
            t_start = time.time()
            val = self.q.get()
            proc_time = time.time() - t_start
            print('calc time : {0} ms'.format(proc_time))
            return val
        except Exception as e:
            print(e)
            return -1

上のサンプルのポイントの1番目は、キューに最新の計測値が入るようにするところです。

    def _reader(self):
        """
        The queue always contains the latest values.
        """
        while True:
            val = self._get_until_lf()
            if not self.q.empty():
                try:
                    self.q.get_nowait()
                except queue.Empty:
                    pass
            self.q.put(val)

ポイントの2番目は、起動時に計測値が不安定になるので、初期化の部分で20回計測して安定させるところです。

        for i in range(20):
            val = self._get_until_lf()
            print('Check value : {0}'.format(val))

ポイントの3番目は、実際にシリアルコマンドを書き込みレスポンスを評価して接続確認するところです。PySerialのisOpen()は接続確認はしてくれません。

            bret = self.conn.write(PRINTOUT_TEXT) == 4

上のサンプルプログラムの実行例です。


Homeへプログラミングの記事Topへ