线程管理器

线程状态

从调度器的角度来看,每个线程都有一个独一无二的 Tid 来区分它和其他线程。

  1. // in process/mod.rs
  2. pub type Tid = usize;
  3. pub type ExitCode = usize;

同时,线程的状态有下面几种:

  1. // src/process/struct.rs
  2. #[derive(Clone)]
  3. pub enum Status {
  4. // 就绪:可以运行,但是要等到 CPU 的资源分配给它
  5. Ready,
  6. // 正在运行
  7. Running(Tid),
  8. // 睡眠:当前被阻塞,要满足某些条件才能继续运行
  9. Sleeping,
  10. // 退出:该线程执行完毕并退出
  11. Exited(ExitCode),
  12. }

调度算法接口设计

在一个线程运行的过程中,调度器需要定期查看当前线程的已运行时间,如果已经达到一个阈值,那么出于公平起见,应该将 CPU 资源交给其他线程,也即切换到其他线程。 查看的间隔不能太长,这样的话等于线程调度根本没起到作用;但是也不能过于频繁, CPU 的资源大量投资在调度器上更是得不偿失。 我们的线程调度算法基于时钟中断,我们会在时钟中断中进入调度器看看当前线程是否需要切换出去。 因此,调度算法的接口 Scheduler 如下:

  1. // src/process/scheduler.rs
  2. pub trait Scheduler {
  3. // 如果 tid 不存在,表明将一个新线程加入线程调度
  4. // 否则表明一个已有的线程要继续运行
  5. fn push(&mut self, tid: Tid);
  6. // 从若干可运行线程中选择一个运行
  7. fn pop(&mut self) -> Option<Tid>;
  8. // 时钟中断中,提醒调度算法当前线程又运行了一个 tick
  9. // 返回的 bool 表示调度算法认为当前线程是否需要被切换出去
  10. fn tick(&mut self) -> bool;
  11. // 告诉调度算法一个线程已经结束
  12. fn exit(&mut self, tid: Tid);
  13. }

线程池接口设计

调度算法 Scheduler 只管理 Tid ,和线程并没有关系。因此,我们使用线程池 ThreadPool 来给线程和 Tid 建立联系,将 Scheduler 的 Tid 调度变成线程调度。 事实上,每个线程刚被创建时并没有一个 Tid ,这是线程池给线程分配的。

  1. // src/process/thread_pool.rs
  2. // 线程池每个位置的信息
  3. struct ThreadInfo {
  4. // 占据这个位置的线程当前运行状态
  5. status: Status,
  6. // 占据这个位置的线程
  7. thread: Option<Box<Thread>>,
  8. }
  9. pub struct ThreadPool {
  10. // 线程池
  11. // 如果一个位置是 None 表示未被线程占据
  12. threads: Vec<Option<ThreadInfo>>,
  13. // 调度算法
  14. // 这里的 dyn Scheduler 是 Trait object 语法
  15. // 表明 Box 里面的类型实现了 Scheduler Trait
  16. scheduler: Box<dyn Scheduler>,
  17. }

线程池的方法

作为一个线程池,需要实现调度相关的一系列操作:

  • alloc_tid:为新线程分配一个新的 Tid
  • add:添加一个可立即开始运行的线程
  • acquire:从线程池中取一个线程开始运行
  • retrieve:让当前线程交出 CPU 资源
  • tick:时钟中断时查看当前所运行线程是否要切换出去
  • exit:退出线程

下面,我们依次来看看线程池的方法:

  1. // src/process/thread_pool.rs
  2. impl ThreadPool {
  3. // 新建一个线程池,其最大可容纳 size 个线程,使用调度器 scheduler
  4. pub fn new(size: usize, scheduler: Box<dyn Scheduler>) -> ThreadPool {
  5. ThreadPool {
  6. threads: {
  7. let mut v = Vec::new();
  8. v.resize_with(size, Default::default);
  9. v
  10. },
  11. scheduler,
  12. }
  13. }
  14. // 在线程池中找一个编号最小的空着的位置
  15. // 将编号作为 Tid 返回
  16. fn alloc_tid(&self) -> Tid {
  17. for (i, info) in self.threads.iter().enumerate() {
  18. if info.is_none() {
  19. return i;
  20. }
  21. }
  22. panic!("alloc tid failed!");
  23. }
  24. // 加入一个可立即开始运行的线程
  25. // 线程状态 Uninitialized -> Ready
  26. pub fn add(&mut self, _thread: Box<Thread>) {
  27. // 分配 Tid
  28. let tid = self.alloc_tid();
  29. // 修改线程池对应位置的信息
  30. self.threads[tid] = Some(
  31. ThreadInfo {
  32. // 状态:随时准备运行,等待 CPU 资源中
  33. status: Status::Ready,
  34. // 传入线程
  35. thread: Some(_thread),
  36. }
  37. );
  38. // 将线程的 Tid 加入调度器
  39. // 提醒调度器给这个线程分配 CPU 资源
  40. self.scheduler.push(tid);
  41. }
  42. // 从线程池中取一个线程开始运行
  43. // 线程状态 Ready -> Running
  44. pub fn acquire(&mut self) -> Option<(Tid, Box<Thread>)> {
  45. // 调用 Scheduler::pop ,从调度算法中获取接下来要运行的 Tid
  46. if let Some(tid) = self.scheduler.pop() {
  47. // 获取并更新线程池对应位置的信息
  48. let mut thread_info = self.threads[tid].as_mut().expect("thread not exist!");
  49. // 将线程状态改为 Running
  50. thread_info.status = Status::Running(tid);
  51. return Some((tid, thread_info.thread.take().expect("thread not exist!")));
  52. }
  53. else {
  54. return None;
  55. }
  56. }
  57. // 这个线程已运行了太长时间或者已运行结束,需要交出CPU资源
  58. // 但是要提醒线程池它仍需要分配 CPU 资源
  59. pub fn retrieve(&mut self, tid: Tid, thread: Box<Thread>) {
  60. // 线程池位置为空,表明这个线程刚刚通过 exit 退出
  61. if self.threads[tid].is_none() {
  62. // 不需要 CPU 资源了,退出
  63. return;
  64. }
  65. // 获取并修改线程池对应位置的信息
  66. let mut thread_info = self.threads[tid].as_mut().expect("thread not exist!");
  67. thread_info.thread = Some(thread);
  68. // 此时状态可能是 Status::Sleeping(线程可能会自动放弃 CPU 资源,进入睡眠状态),
  69. // 直到被唤醒之前都不必给它分配。
  70. // 而如果此时状态是Running,就说明只是单纯的耗尽了这次分配CPU资源,但还要占用CPU资源继续执行。
  71. if let Status::Running(_) = thread_info.status {
  72. // Running -> Ready
  73. thread_info.status = Status::Ready;
  74. // 通知线程池继续给此线程分配资源
  75. self.scheduler.push(tid);
  76. }
  77. }
  78. // Scheduler 的简单包装:时钟中断时查看当前所运行线程是否要切换出去
  79. pub fn tick(&mut self) -> bool {
  80. let ret = self.scheduler.tick();
  81. ret
  82. }
  83. // 这个线程已经退出了,线程状态 Running -> Exited
  84. pub fn exit(&mut self, tid: Tid) {
  85. // 清空线程池对应位置
  86. self.threads[tid] = None;
  87. // 通知调度器
  88. self.scheduler.exit(tid);
  89. }
  90. }

现在我们有了一个线程池 ThreadPool ,它内含调度器,是一个不错的线程管理器。下一节我们将介绍调度线程 idle 以及调度单元 Processor