先日はジェネレータベースのコールチンを実装してみましたが、Python 3.10からはこのジェネレータベースのコールチンは廃止が予定されているようです。
MINDSTORMSのMicroPythonは3.4.0をベースとしたもののようです。
MicroPython v1.14-893-gbae6ff2ee on 2021-11-04; LEGO Technic Large Hub with STM32F413xx
Type "help()" for more information.
>>> import sys
>>> sys.version
'3.4.0'
>>>
現在のPythonの最新バージョンが3.10なのでMINDSTORMSのMicroPythonが3.10になることはまずないとは思いますが、プログラマを目指す子供たちはasyncioベースのコールチンに慣れておくべきかなと思います。(あまり使用することはないとは思いますがw)
コールチンをしっかりと理解しておくとスレッドやタスクといった概念についても理解しやすくなるでしょう。最近ではAndroidアプリの開発でもHTTPアクセス処理などでコールチンを利用した非同期処理が推奨されているようです。
uasyncioと一緒に使うコールチン
基本的なプログラムの作り方はジェネレータベースのコールチンとほぼ同じです。
async defで定義した関数の中でawaitでコールチンの処理を待つようにプログラミングするのが基本になります。実例を見た方が早いですね。
「yield True」を「await uasyncio.sleep_ms(1)」に書き換えた感じです。尚awaitするのはコールチンであれば良いので「uasyncio.sleep_ms(1)」でなく独自のメソッドを指定することも可能です。
コールチンの処理を終了させる方法はジェネレータベースのコールチンと同じ方法(コメントにしてあります)とuasyncio.Taskクラスが提供するcancel()メソッドを利用する方法があります。
uasyncioと一緒に使う方法ではメイン処理もコールチンになっているので適時awaitで処理を切り替えることが可能です。
from mindstorms import MSHub, Motor, MotorPair, ColorSensor, DistanceSensor, App
from mindstorms.control import wait_for_seconds, wait_until, Timer
from mindstorms.operator import greater_than, greater_than_or_equal_to, less_than, less_than_or_equal_to, equal_to, not_equal_to
import math
import uasyncio
# オブジェクトの定義
app = App()
hub = MSHub()
wheels = MotorPair(MSHub.PORT_B, MSHub.PORT_A)
arm = Motor(MSHub.PORT_C)
distance_sensor = DistanceSensor(MSHub.PORT_D)
color_sensor = ColorSensor(MSHub.PORT_E)
# ステートの定義
STATE_START = 1
STATE_EXIT = -1
# コールチンクラス
class Coroutine1:
def __init__(self):
self.__distance = -1
self.__loop = True
@property
def distance(self):
return self.__distance
def stop(self):
self.__loop = False
async def run(self):
try:
while self.__loop:
await self.__done()
finally:
print("Coroutine1 end!")
async def __done(self):
self.__distance = distance_sensor.get_distance_cm()
print(self.__distance)
await uasyncio.sleep_ms(1)
if self.__distance and self.__distance < 15:
print("The wall is near!")
await uasyncio.sleep_ms(1)
# メイン処理
class Main:
def __init__(self):
self.state = STATE_START
self.setup()
uasyncio.run(self.run())
def setup(self):
self.__coroutines = []
self.__coroutines.append(Coroutine1())
async def run(self):
self.__tasks = []
for c in self.__coroutines:
self.__tasks.append(uasyncio.create_task(c.run()))
while self.state != STATE_EXIT:
if hub.right_button.is_pressed():
print("right button is pressed !")
#for task in self.__coroutines:
# task.stop()
for task in self.__tasks:
task.cancel()
await uasyncio.sleep_ms(30)
self.state = STATE_EXIT
else:
self.__done()
await uasyncio.sleep_ms(1) # コールチンに処理を譲る
def __done(self):
print("Main keep alive!")
# プログラム開始指示を待つ
hub.left_button.wait_until_pressed()
hub.speaker.beep()
# プログラム開始
Main()
# プログラム終了(メニューに戻る)
raise SystemExit
0 件のコメント:
コメントを投稿