5.4. PikaStdTask 多任务

PikaStdTask 多任务库提供了 Task (任务循环)的异步多任务功能。

5.4.1. 安装

  1. 在 requestment.txt 中加入 PikaStdLib 的依赖,PikaStdLib 的版本号应当与内核的版本号相同。
  1. PikaStdLib==v1.6.1
  1. 运行 pikaPackage.exe

5.4.2. class Task():

Task 类提供了任务循环功能,由 Task 类创建对象,即可创建一个任务循环。

5.4.2.1. Task 类的方法

  1. from PikaObj import *
  2. import PikaStdData
  3. class Task(TinyObj):
  4. calls = PikaStdData.List()
  5. def __init__(self):
  6. pass
  7. # regist a function to be called always
  8. def call_always(self, fun_todo: any):
  9. pass
  10. # regist a function to be called when fun_when() return 'True'
  11. def call_when(self, fun_todo: any, fun_when: any):
  12. pass
  13. # regist a function to be called periodically
  14. def call_period_ms(self, fun_todo: any, period_ms: int):
  15. pass
  16. # run all registed function once
  17. def run_once(self):
  18. pass
  19. # run all registed function forever
  20. def run_forever(self):
  21. pass
  22. # run all registed function until time is up
  23. def run_until_ms(self, until_ms: int):
  24. pass
  25. # need be overried to supply the system tick
  26. def platformGetTick(self):
  27. pass

5.4.2.2. 使用方法:

  1. 使用 call_xxx() 方法指定调用的方式,并将要执行的函数注册进 task 对象中。

  2. 使用 run_xxx() 方法指定任务循环执行的方式,并执行 task 对象中的所有函数。

  3. 和时间相关的功能,如 call_period_ms()run_until_ms()需要提供系统时钟,提供的方式为新建一个继承自 PikaStdTask的类,然后重写platformGetTick()方法。

5.4.2.3. 注意:

  1. 所有被注册的函数应当是 非阻塞 的,否则会导致整个任务循环堵死。

  2. 任务循环不是实时的。

5.4.2.4. 用例:

  • 新建继承自 PikaStdTask的类。
  1. # STM32G0.py
  2. class Task(PikaStdTask.Task):
  3. # override
  4. def platformGetTick():
  5. pass
  • 重写platformGetTick()方法。
  1. /* STM32G0_Task.c */
  2. void STM32G0_Task_platformGetTick(PikaObj* self) {
  3. obj_setInt(self, "tick", HAL_GetTick());
  4. }
  • python 用例
  1. import STM32G0
  2. import PikaPiZero
  3. import PikaStdLib
  4. pin = STM32G0.GPIO()
  5. rgb = PikaPiZero.RGB()
  6. mem = PikaStdLib.MemChecker()
  7. pin.setPin('PA8')
  8. pin.setMode('out')
  9. pin.enalbe()
  10. rgb.init()
  11. rgb.enable()
  12. print('task demo')
  13. print('mem used max:')
  14. mem.max()
  15. def rgb_task():
  16. rgb.flow()
  17. def led_task():
  18. if pin.read():
  19. pin.low()
  20. else:
  21. pin.high()
  22. task = STM32G0.Task()
  23. task.call_period_ms(rgb_task, 50)
  24. task.call_period_ms(led_task, 500)
  25. task.run_forever()