5.4. PikaStdTask 多任务

PikaStdTask 多任务库提供了 Task (任务循环)的异步多任务功能。

5.4.1. 安装

  1. 在 requestment.txt 中加入 PikaStdLib 的依赖,PikaStdLib 的版本号应当与内核的版本号相同。
  1. PikaStdLib==v1.11.0
  1. 运行 pikaPackage.exe

5.4.2. class Task():

Task 类提供了任务循环功能,由 Task 类创建对象,即可创建一个任务循环。

5.4.2.1. Task 类的方法

  1. import PikaStdData
  2. class Task:
  3. calls = PikaStdData.List()
  4. def __init__(self):
  5. pass
  6. # regist a function to be called always
  7. def call_always(self, fun_todo: any):
  8. pass
  9. # regist a function to be called when fun_when() return 'True'
  10. def call_when(self, fun_todo: any, fun_when: any):
  11. pass
  12. # regist a function to be called periodically
  13. def call_period_ms(self, fun_todo: any, period_ms: int):
  14. pass
  15. # run all registed function once
  16. def run_once(self):
  17. pass
  18. # run all registed function forever
  19. def run_forever(self):
  20. pass
  21. # run all registed function until time is up
  22. def run_until_ms(self, until_ms: int):
  23. pass
  24. # need be overried to supply the system tick
  25. def platformGetTick(self):
  26. pass

5.4.2.2. 使用方法:

  1. 使用 call_xxx() 方法指定调用的方式,并将要执行的函数注册进 task 对象中。

  2. 使用 run_xxx() 方法指定任务循环执行的方式,并执行 task 对象中的所有函数。

  3. 和时间相关的功能,如 call_period_ms()run_until_ms()需要提供系统时钟,提供的方式为新建一个继承自 PikaStdTask的类,然后重写platformGetTick()方法。

5.4.2.3. 注意:

  1. 所有被注册的函数应当是 非阻塞 的,否则会导致整个任务循环堵死。

  2. 任务循环不是实时的。

5.4.2.4. 用例:

  • 新建继承自 PikaStdTask的类。
  1. # STM32G0.py
  2. class Task(PikaStdTask.Task):
  3. # override
  4. def platformGetTick():
  5. pass
  • 重写platformGetTick()方法。
  1. /* STM32G0_Task.c */
  2. void STM32G0_Task_platformGetTick(PikaObj* self) {
  3. obj_setInt(self, "tick", HAL_GetTick());
  4. }
  • python 用例
  1. import STM32G0
  2. import PikaPiZero
  3. import PikaStdLib
  4. pin = STM32G0.GPIO()
  5. rgb = PikaPiZero.RGB()
  6. mem = PikaStdLib.MemChecker()
  7. pin.setPin('PA8')
  8. pin.setMode('out')
  9. pin.enalbe()
  10. rgb.init()
  11. rgb.enable()
  12. print('task demo')
  13. print('mem used max:')
  14. mem.max()
  15. def rgb_task():
  16. rgb.flow()
  17. def led_task():
  18. if pin.read():
  19. pin.low()
  20. else:
  21. pin.high()
  22. task = STM32G0.Task()
  23. task.call_period_ms(rgb_task, 50)
  24. task.call_period_ms(led_task, 500)
  25. task.run_forever()