2021年12月31日金曜日

LEGO MINDSTORMS でステートマシン

なんちゃってスレッドができるようになりましたが、プログラムを勉強していくとまずはモジュール化という考え方が必要になってきます。

Scratchではスプライトというオブジェクト毎にプログラムを作成するということで自然とモジュール化されるようになっていますし、ブロック化やメッセージ送信・受信ということで意識的にモジュール化することも可能です。

私がプログラミングを始めた頃は構造化プログラミングという言葉が流行っていましたがその後、オブジェクト指向、イベント駆動、データ駆動、ドメイン駆動にテスト駆動とプログラミング界隈でもいろいろな言葉が流行りましたが、基本は構造化プログラミングかなと個人的には思っています。

おそらくなんでもそうなんでしょうがある程度のところまでは才能だけで行くことが出来ますが、その上に行くにはやはり基本が大切なのかなと思います。

以下にMINDSTORMSのMicroPythonでのステートマシンのプログラム例を示しますが、ステートマシンは複雑な振る舞いの制御によく使われていて、ROS(Robot Operating System)ではsmachパッケージとしてPythonで利用できるようになっています。

MINDSTORMS用ステートマシン

必要に応じてuasyncioを利用した非同期処理を組み込めば複雑な処理も可能になりますが、その場合はイベント(uasyncuo.Event)によるタスク同期なども考える必要が出てきます。

from mindstorms import MSHub, Motor, MotorPair, ColorSensor, DistanceSensor, App
from mindstorms.control import wait_for_seconds, wait_until, Timer
from mindstorms.operator import greater_than, greater_than_or_equal_to, less_than, less_than_or_equal_to, equal_to, not_equal_to
import math

# オブジェクトの定義
app = App()
hub = MSHub()
wheels = MotorPair(MSHub.PORT_B, MSHub.PORT_A)
arm = Motor(MSHub.PORT_C)
distance_sensor = DistanceSensor(MSHub.PORT_D)
color_sensor = ColorSensor(MSHub.PORT_E)

# ステートの定義
STATE_EXIT = "exit"
STATE_GO_STRAIGHT = "GoStraight"

# ステートクラス(基底クラス)
class State(object):
    def __init__(self, name: String):
        self.__name = name
        self.__action = None

    @property
    def name(self):
        return self.__name

    def entry(self):
        pass

    def do(self) -> String:
        return None

    def exit(self):
        pass

# ステートマシンクラス
class StateMachine(object):
    def __init__(self):
        self.__stateList = {}
        self.__now = None

    @property
    def currentState(self):
        return self.__now.name if self.__now else ""

    def add(self, state: State):
        self.__stateList[state.name] = state

    def changeTo(self, tag: String):
        if self.__now:
            self.__now.exit()
        self.__now = self.__stateList[tag]
        if self.__now:
            self.__now.entry()

    def run(self) -> String:
        if self.__now:
            return self.__now.do()
        return None

# 直進
class GoStraight(State):
    def __init__(self, name: String = STATE_GO_STRAIGHT):
        super().__init__(name)

    def entry(self):
        wheels.start(0, -20)

    def do(self) -> String:
        distance = distance_sensor.get_distance_cm()
        if distance and distance < 20: # 測定不能の場合None
            return STATE_EXIT
        return None

    def exit(self):
        wheels.stop()

# メイン処理
class Main:
    def __init__(self):
        self.__stateMachine = StateMachine()
        self.__stateMachine.add(State(STATE_EXIT))
        self.setup()
        self.loop()

    def setup(self):
        # ステートの登録
        self.__stateMachine.add(GoStraight())
        # 開始ステートへ切り替える
        self.__stateMachine.changeTo(STATE_GO_STRAIGHT)
        
    def loop(self):
        while self.__stateMachine.currentState != STATE_EXIT:
            nextState = self.__stateMachine.run()
            if nextState:
                self.__stateMachine.changeTo(nextState)
            # 外部よりのステートマシン終了
            if hub.right_button.is_pressed():
                print("right button is pressed !")
                self.__stateMachine.changeTo(STATE_EXIT)

# プログラム開始指示を待つ
hub.left_button.wait_until_pressed()
hub.speaker.beep()
# プログラム開始
Main()
# プログラム終了(メニューに戻る)
raise SystemExit

2021年12月30日木曜日

LEGO MINDSTORMS Pythonでなんちゃってスレッド2

 先日はジェネレータベースのコールチンを実装してみましたが、Python 3.10からはこのジェネレータベースのコールチンは廃止が予定されているようです。
MINDSTORMSのMicroPythonは3.4.0をベースとしたもののようです。

MicroPython v1.14-893-gbae6ff2ee on 2021-11-04; LEGO Technic Large Hub with STM32F413xx
Type "help()" for more information.
>>> import sys
>>> sys.version
'3.4.0'
>>>
現在のPythonの最新バージョンが3.10なのでMINDSTORMSのMicroPythonが3.10になることはまずないとは思いますが、プログラマを目指す子供たちはasyncioベースのコールチンに慣れておくべきかなと思います。(あまり使用することはないとは思いますがw)

コールチンをしっかりと理解しておくとスレッドやタスクといった概念についても理解しやすくなるでしょう。最近ではAndroidアプリの開発でもHTTPアクセス処理などでコールチンを利用した非同期処理が推奨されているようです。

uasyncioと一緒に使うコールチン


基本的なプログラムの作り方はジェネレータベースのコールチンとほぼ同じです。
async defで定義した関数の中でawaitでコールチンの処理を待つようにプログラミングするのが基本になります。実例を見た方が早いですね。
「yield True」を「await uasyncio.sleep_ms(1)」に書き換えた感じです。尚awaitするのはコールチンであれば良いので「uasyncio.sleep_ms(1)」でなく独自のメソッドを指定することも可能です。

コールチンの処理を終了させる方法はジェネレータベースのコールチンと同じ方法(コメントにしてあります)とuasyncio.Taskクラスが提供するcancel()メソッドを利用する方法があります。

uasyncioと一緒に使う方法ではメイン処理もコールチンになっているので適時awaitで処理を切り替えることが可能です。

from mindstorms import MSHub, Motor, MotorPair, ColorSensor, DistanceSensor, App
from mindstorms.control import wait_for_seconds, wait_until, Timer
from mindstorms.operator import greater_than, greater_than_or_equal_to, less_than, less_than_or_equal_to, equal_to, not_equal_to
import math
import uasyncio

# オブジェクトの定義
app = App()
hub = MSHub()
wheels = MotorPair(MSHub.PORT_B, MSHub.PORT_A)
arm = Motor(MSHub.PORT_C)
distance_sensor = DistanceSensor(MSHub.PORT_D)
color_sensor = ColorSensor(MSHub.PORT_E)

# ステートの定義
STATE_START = 1
STATE_EXIT = -1

# コールチンクラス
class Coroutine1:
    def __init__(self):
        self.__distance = -1
        self.__loop = True

    @property
    def distance(self):
        return self.__distance

    def stop(self):
        self.__loop = False

    async def run(self):
        try:
            while self.__loop:
                await self.__done()
        finally:
            print("Coroutine1 end!")

    async def __done(self):
        self.__distance = distance_sensor.get_distance_cm()
        print(self.__distance)
        await uasyncio.sleep_ms(1)
        if self.__distance and self.__distance < 15:
            print("The wall is near!")
            await uasyncio.sleep_ms(1)

# メイン処理
class Main:
    def __init__(self):
        self.state = STATE_START
        self.setup()
        uasyncio.run(self.run())

    def setup(self):
        self.__coroutines = []
        self.__coroutines.append(Coroutine1())

    async def run(self):
        self.__tasks = []
        for c in self.__coroutines:
            self.__tasks.append(uasyncio.create_task(c.run()))
        while self.state != STATE_EXIT:
            if hub.right_button.is_pressed():
                print("right button is pressed !")
                #for task in self.__coroutines:
                #    task.stop()
                for task in self.__tasks:
                    task.cancel()
                await uasyncio.sleep_ms(30)
                self.state = STATE_EXIT
            else:
                self.__done()
                await uasyncio.sleep_ms(1) # コールチンに処理を譲る

    def __done(self):
        print("Main keep alive!")

# プログラム開始指示を待つ
hub.left_button.wait_until_pressed()
hub.speaker.beep()
# プログラム開始
Main()
# プログラム終了(メニューに戻る)
raise SystemExit

2021年12月29日水曜日

LEGO MINDSTORMS Pythonでなんちゃってスレッド処理

LEGO MINDSTORMS Robot Inventor


こちらに紹介ページはあるのですが「今すぐ購入」ボタンを押すとページは存在せず公式サイトで購入することはできません。日本では教育用のSPIKEプライムセットしか購入できません。SPIKEは教育用なのでパーツを入れるプラスチックケース入りなのに対してInventorはホビー向けなのでケースは紙製となっています。ただ、標準でモータが4つついていたりLEGOパーツの数が多かったりと個人で遊ぶのにはInventorの方が良いので、米国Amazonで購入することにしました。

本体の色がSPIKEは黄色なの対してInventorは青系の色となっています。またプログラム作成用ツールもSPIKE用とInventor用は別になっています。SPIKE用は日本語化されているのですがInventor用は日本で発売されていないこともあってか日本語化はされていません。
最近は小学校でも英語教育が必須化されたりしていますが、遊びの中で英語に触れる(LEGOなので殆ど言葉を利用することはありませんがw)ことが自然と英語を学ぶ良い方法なのかと思ったりします。

プログラミング用言語としてはScratchベースのブロックプログラミングとPythonが利用できます。PythonはMicroPythonなのでthreadingは入っていませんし、ローレベルのAPIである_threadも利用することはできません。利用できるのは非同期 I/O スケジューラuasyncio(非同期処理を扱うための標準ライブラリasyncioのサブセット)となるのでコールチンを利用した非同期処理のコードを書くことは可能となります。

ただ、ここで大切なのはマルチタスクやマルチスレッド、非同期処理といった概念ではなく複数の処理があたかも同時に処理されているように見えることが子供達にとっては大切だということだと思います。

MINDSTORMSでPythonは必要か?


Scratchでプログラムを作成する場合、まずはゲーム的なプログラムを作ることが多いと思いますが、その場合それぞれのスプライト用にプログラムを作成する考え方は直感的ですし、それを動かす場合に非同期処理(ScratchはNode.jsをベースに開発されているのでシングルスレッド・シングルプロセスが基本)を意識せずにマルチスレッド的な動作が自然と実践できるScratchの考え方は非常に優れていると思います。

さてそれではMINDSTORMSをPythonでプログラムする場合マルチスレッドは必要になるのかな?という疑問が出てきます。
ロボット工学は畑違いですが、MINDSTORMSのプログラムを作っていて感じるのはマルチスレッドよりもステートマシンでプログラムした方がすっきりとできるかなという感じです。

MINDSTORMS用のScratchは必要なブロックもきめ細かく用意されているのであえてPythonでプログラムを作成する必要はないかなと思います。Pythonを使ってプログラムしたい方はプログラミング言語を勉強したい方が取り組むべきかなと感じました。

Pythonジェネレータベースのコールチン


ということでまずはジェネレータベースの非同期処理を作ってみました。

Scratchの場合はスプライトという単位でプログラムが分けられますが、MINDSTORMS用のPythonではファイルの分割は出来なそうです。ある程度の規模のプログラムになるとできればファイル分割できた方が管理がやりやすいのですがね。

ノンプリエンティブなマルチタスクになりますので適時「yield True」でCPUを解放するのがポイントになります。またMainの__done()は非同期処理の対象外になっているので、ここで長時間CPUを占有すると非同期処理に影響を与えるので極力小さい処理にする必要があります。

LEGOでの注意事項としてはモータ制御はstart、stopで行うようにしてmoveは(同期処理になるので、短い距離では問題ないと思いますが)極力使用しないように、またセンサーのwait系も使用しないようにします。
from mindstorms import MSHub, Motor, MotorPair, ColorSensor, DistanceSensor, App
from mindstorms.control import wait_for_seconds, wait_until, Timer
from mindstorms.operator import greater_than, greater_than_or_equal_to, less_than, less_than_or_equal_to, equal_to, not_equal_to
import math

# オブジェクトの定義
app = App()
hub = MSHub()
wheels = MotorPair(MSHub.PORT_B, MSHub.PORT_A)
arm = Motor(MSHub.PORT_C)
distance_sensor = DistanceSensor(MSHub.PORT_D)
color_sensor = ColorSensor(MSHub.PORT_E)

# ステートの定義
STATE_START = 1
STATE_EXIT = -1

# ジェネレータクラス
class Generator1:
    def __init__(self):
        self.__distance = -1
        self.__loop = True

    @property
    def distance(self):
        return self.__distance

    def stop(self):
        self.__loop = False

    # ジェネレータ関数本体
    def loop(self):
        while self.__loop:
            yield from self.__done()
        yield False

    def __done(self):
        self.__distance = distance_sensor.get_distance_cm()
        print(self.__distance)
        yield True
        if self.__distance and self.__distance < 15:
            print("The wall is near!")
            yield True

# メイン処理
class Main:
    def __init__(self):
        self.setup()
        self.loop()

    def setup(self):
        # ジェネレータの登録
        self.generators = []
        self.distance_sensor = Generator1()
        self.generators.append(self.distance_sensor)
        # ジェネレータ関数の登録
        self.generatorLoops = []
        for g in self.generators:
            self.generatorLoops.append(g.loop())

    def loop(self):
        while self.state != STATE_EXIT:
            if hub.right_button.is_pressed():
                for i, g in enumerate(self.generators):
                    g.stop()
                    while (next(self.generatorLoops[i])):
                        pass
                self.state = STATE_EXIT
            else:
                # ジェネレータの次の要素を取得(処理の再開)
                for loop in self.generatorLoops:
                    next(loop)
                # 自身の処理
                self.__done()
        
    def __done(self):
        print("Main keep alive!")

# プログラム開始指示を待つ
hub.left_button.wait_until_pressed()
hub.speaker.beep()
# プログラム開始
Main()
# プログラム終了(メニューに戻る)
raise SystemExit

bullseyeでGrove Base Hat for Raspberry Piが動かない!

 Raspberry Pi OS bullseyeがリリース 2021年10月30日にbullseyeがリリースされましたが、その後Busterも引き続きサポートされていく事がアナウンスされました。本家の New は2021年12月2日に出されましたが、2021年12月10日には...