管道

本节导读

在上一节,我们实现了基于文件接口的标准输入和输出,这样一个进程可以根据不同的输入产生对应的输出。本节我们将基于上一节介绍的文件接口 File 来把不同进程的输入和输出连接起来,从而在不改变应用程序代码的情况下,让操作系统具有进程间信息交换和功能组合的能力。这需要我们实现实现一种父子进程间的单向进程间通信机制——管道,并为此实现两个新的系统调用 sys_pipesys_close

管道机制简介

首先来介绍什么是 管道 (Pipe) 。管道是一种进程间通信机制,由操作系统提供,并可通过直接编程或在shell程序的帮助下轻松地把不同进程(目前是父子进程之间或子子进程之间)的输入和输出对接起来。我们也可以将管道看成一个有一定缓冲区大小的字节队列,它分为读和写两端,需要通过不同的文件描述符来访问。读端只能用来从管道中读取,而写端只能用来将数据写入管道。由于管道是一个队列,读取的时候会从队头读取并弹出,而写入的时候则会写入到队列的队尾。同时,管道的缓冲区大小是有限的,一旦整个缓冲区都被填满就不能再继续写入,需要等到读端读取并从队列中弹出一些字符之后才能继续写入。当缓冲区为空的时候自然也不能继续从里面读取,需要等到写端写入了一些数据之后才能继续读取。

比如,一般在shell程序中, “|” 是管道符号,即两个命令之间的一道竖杠。我们通过管道符号组合的命令,就可以了解登录Linux的用户的各种情况:

  1. who # 登录Linux的用户信息
  2. who | grep chyyuu # 是否用户ID为chyyuu的用户登录了
  3. who | grep chyyuu | wc # chyyuu用户目前在线登录的个数

管道的系统调用原型及使用方法

接下来,我们将逐步尝试实现上面描述的管道的初步效果。我们新增一个系统调用来为当前进程打开一个代表管道的文件集(包含一个只读文件,一个只写文件):

  1. /// 功能:为当前进程打开一个管道。
  2. /// 参数:pipe 表示应用地址空间中的一个长度为 2 的 usize 数组的起始地址,内核需要按顺序将管道读端
  3. /// 和写端的文件描述符写入到数组中。
  4. /// 返回值:如果出现了错误则返回 -1,否则返回 0 。可能的错误原因是:传入的地址不合法。
  5. /// syscall ID:59
  6. pub fn sys_pipe(pipe: *mut usize) -> isize;

在用户库中会将其包装为 pipe 函数:

  1. // user/src/syscall.rs
  2. const SYSCALL_PIPE: usize = 59;
  3. pub fn sys_pipe(pipe: &mut [usize]) -> isize {
  4. syscall(SYSCALL_PIPE, [pipe.as_mut_ptr() as usize, 0, 0])
  5. }
  6. // user/src/lib.rs
  7. pub fn pipe(pipe_fd: &mut [usize]) -> isize { sys_pipe(pipe_fd) }

只有当一个管道的所有读端文件/写端文件都被关闭之后,管道占用的资源才会被回收,因此我们需要通过关闭文件的系统调用 sys_close 来尽可能早的关闭之后不再用到的读端的文件和写端的文件。

  1. /// 功能:当前进程关闭一个文件。
  2. /// 参数:fd 表示要关闭的文件的文件描述符。
  3. /// 返回值:如果成功关闭则返回 0 ,否则返回 -1 。可能的出错原因:传入的文件描述符并不对应一个打开的文件。
  4. /// syscall ID:57
  5. pub fn sys_close(fd: usize) -> isize;

它会在用户库中被包装为 close 函数。

我们来从简单的管道测例 pipetest 中介绍管道的使用方法:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  28. 28
  29. 29
  30. 30
  31. 31
  32. 32
  33. 33
  34. 34
  35. 35
  36. 36
  37. 37
  38. 38
  39. 39
  40. 40
  41. 41
  42. 42
  43. 43
  44. 44
  45. 45
  46. 46
  1. // user/src/bin/pipetest.rs
  2. #![no_std]
  3. #![no_main]
  4. #[macro_use]
  5. extern crate user_lib;
  6. use user_lib::{fork, close, pipe, read, write, wait};
  7. static STR: &str = Hello, world!”;
  8. #[no_mangle]
  9. pub fn main() -> i32 {
  10. // create pipe
  11. let mut pipe_fd = [0usize; 2];
  12. pipe(&mut pipe_fd);
  13. // read end
  14. assert_eq!(pipe_fd[0], 3);
  15. // write end
  16. assert_eq!(pipe_fd[1], 4);
  17. if fork() == 0 {
  18. // child process, read from parent
  19. // close write_end
  20. close(pipe_fd[1]);
  21. let mut buffer = [0u8; 32];
  22. let len_read = read(pipe_fd[0], &mut buffer) as usize;
  23. // close read_end
  24. close(pipe_fd[0]);
  25. assert_eq!(core::str::from_utf8(&buffer[..len_read]).unwrap(), STR);
  26. println!(“Read OK, child process exited!”);
  27. 0
  28. } else {
  29. // parent process, write to child
  30. // close read end
  31. close(pipe_fd[0]);
  32. assert_eq!(write(pipe_fd[1], STR.as_bytes()), STR.len() as isize);
  33. // close write end
  34. close(pipe_fd[1]);
  35. let mut child_exit_code: i32 = 0;
  36. wait(&mut child_exit_code);
  37. assert_eq!(child_exit_code, 0);
  38. println!(“pipetest passed!”);
  39. 0
  40. }
  41. }

在父进程中,我们通过 pipe 打开一个管道文件数组,其中 pipe_fd[0] 保存了管道读端的文件描述符,而 pipe_fd[1] 保存了管道写端的文件描述符。在 fork 之后,子进程会完全继承父进程的文件描述符表,于是子进程也可以通过同样的文件描述符来访问同一个管道的读端和写端。之前提到过管道是单向的,在这个测例中我们希望管道中的数据从父进程流向子进程,也即父进程仅通过管道的写端写入数据,而子进程仅通过管道的读端读取数据。

因此,在第 25 和第 34 行,分别第一时间在子进程中关闭管道的写端和在父进程中关闭管道的读端。父进程在第 35 行将字符串 STR 写入管道的写端,随后在第 37 行关闭管道的写端;子进程在第 27 行从管道的读端读取字符串,并在第 29 行关闭。

如果想在父子进程之间实现双向通信,我们就必须创建两个管道。有兴趣的读者可以参考测例 pipe_large_test

通过 sys_close 关闭文件

关闭文件的系统调用 sys_close 实现非常简单,我们只需将进程控制块中的文件描述符表对应的一项改为 None 代表它已经空闲即可,同时这也会导致内层的引用计数类型 Arc 被销毁,会减少一个文件的引用计数,当引用计数减少到 0 之后文件所占用的资源就会被自动回收。

  1. // os/src/syscall/fs.rs
  2. pub fn sys_close(fd: usize) -> isize {
  3. let task = current_task().unwrap();
  4. let mut inner = task.acquire_inner_lock();
  5. if fd >= inner.fd_table.len() {
  6. return -1;
  7. }
  8. if inner.fd_table[fd].is_none() {
  9. return -1;
  10. }
  11. inner.fd_table[fd].take();
  12. 0
  13. }

基于文件的管道

我们将管道的一端(读端或写端)抽象为 Pipe 类型:

  1. // os/src/fs/pipe.rs
  2. pub struct Pipe {
  3. readable: bool,
  4. writable: bool,
  5. buffer: Arc<Mutex<PipeRingBuffer>>,
  6. }

readablewritable 分别指出该管道端可否支持读取/写入,通过 buffer 字段还可以找到该管道端所在的管道自身。后续我们将为它实现 File Trait ,之后它便可以通过文件描述符来访问。

而管道自身,也就是那个带有一定大小缓冲区的字节队列,我们抽象为 PipeRingBuffer 类型:

  1. // os/src/fs/pipe.rs
  2. const RING_BUFFER_SIZE: usize = 32;
  3. #[derive(Copy, Clone, PartialEq)]
  4. enum RingBufferStatus {
  5. FULL,
  6. EMPTY,
  7. NORMAL,
  8. }
  9. pub struct PipeRingBuffer {
  10. arr: [u8; RING_BUFFER_SIZE],
  11. head: usize,
  12. tail: usize,
  13. status: RingBufferStatus,
  14. write_end: Option<Weak<Pipe>>,
  15. }
  • RingBufferStatus 记录了缓冲区目前的状态:FULL 表示缓冲区已满不能再继续写入; EMPTY 表示缓冲区为空无法从里面读取;而 NORMAL 则表示除了 FULLEMPTY 之外的其他状态。

  • PipeRingBufferarr/head/tail 三个字段用来维护一个循环队列,其中 arr 为存放数据的数组, head 为循环队列队头的下标, tail 为循环队列队尾的下标。

  • PipeRingBufferwrite_end 字段还保存了它的写端的一个弱引用计数,这是由于在某些情况下需要确认该管道所有的写端是否都已经被关闭了,通过这个字段很容易确认这一点。

从内存管理的角度,每个读端或写端中都保存着所属管道自身的强引用计数,且我们确保这些引用计数只会出现在管道端口 Pipe 结构体中。于是,一旦一个管道所有的读端和写端均被关闭,便会导致它们所属管道的引用计数变为 0 ,循环队列缓冲区所占用的资源被自动回收。虽然 PipeRingBuffer 中保存了一个指向写端的引用计数,但是它是一个弱引用,也就不会出现循环引用的情况导致内存泄露。

管道创建

通过 PipeRingBuffer::new 可以创建一个新的管道:

  1. // os/src/fs/pipe.rs
  2. impl PipeRingBuffer {
  3. pub fn new() -> Self {
  4. Self {
  5. arr: [0; RING_BUFFER_SIZE],
  6. head: 0,
  7. tail: 0,
  8. status: RingBufferStatus::EMPTY,
  9. write_end: None,
  10. }
  11. }
  12. }

Piperead/write_end_with_buffer 方法可以分别从一个已有的管道创建它的读端和写端:

  1. // os/src/fs/pipe.rs
  2. impl Pipe {
  3. pub fn read_end_with_buffer(buffer: Arc<Mutex<PipeRingBuffer>>) -> Self {
  4. Self {
  5. readable: true,
  6. writable: false,
  7. buffer,
  8. }
  9. }
  10. pub fn write_end_with_buffer(buffer: Arc<Mutex<PipeRingBuffer>>) -> Self {
  11. Self {
  12. readable: false,
  13. writable: true,
  14. buffer,
  15. }
  16. }
  17. }

可以看到,读端和写端的访问权限进行了相应设置:不允许向读端写入,也不允许从写端读取。

通过 make_pipe 方法可以创建一个管道并返回它的读端和写端:

  1. // os/src/fs/pipe.rs
  2. impl PipeRingBuffer {
  3. pub fn set_write_end(&mut self, write_end: &Arc<Pipe>) {
  4. self.write_end = Some(Arc::downgrade(write_end));
  5. }
  6. }
  7. /// Return (read_end, write_end)
  8. pub fn make_pipe() -> (Arc<Pipe>, Arc<Pipe>) {
  9. let buffer = Arc::new(Mutex::new(PipeRingBuffer::new()));
  10. let read_end = Arc::new(
  11. Pipe::read_end_with_buffer(buffer.clone())
  12. );
  13. let write_end = Arc::new(
  14. Pipe::write_end_with_buffer(buffer.clone())
  15. );
  16. buffer.lock().set_write_end(&write_end);
  17. (read_end, write_end)
  18. }

注意,我们调用 PipeRingBuffer::set_write_end 在管道中保留它的写端的弱引用计数。

现在来实现创建管道的系统调用 sys_pipe

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  28. 28
  29. 29
  1. // os/src/task/task.rs
  2. impl TaskControlBlockInner {
  3. pub fn alloc_fd(&mut self) -> usize {
  4. if let Some(fd) = (0..self.fd_table.len())
  5. .find(|fd| self.fd_table[fd].is_none()) {
  6. fd
  7. } else {
  8. self.fd_table.push(None);
  9. self.fd_table.len() - 1
  10. }
  11. }
  12. }
  13. // os/src/syscall/fs.rs
  14. pub fn sys_pipe(pipe: mut usize) -> isize {
  15. let task = current_task().unwrap();
  16. let token = current_user_token();
  17. let mut inner = task.acquire_inner_lock();
  18. let (pipe_read, pipe_write) = make_pipe();
  19. let read_fd = inner.alloc_fd();
  20. inner.fd_table[read_fd] = Some(pipe_read);
  21. let write_fd = inner.alloc_fd();
  22. inner.fd_table[write_fd] = Some(pipe_write);
  23. translated_refmut(token, pipe) = read_fd;
  24. translated_refmut(token, unsafe { pipe.add(1) }) = write_fd;
  25. 0
  26. }

TaskControlBlockInner::alloc_fd 可以在进程控制块中分配一个最小的空闲文件描述符来访问一个新打开的文件。它先从小到大遍历所有曾经被分配过的文件描述符尝试找到一个空闲的,如果没有的话就需要拓展文件描述符表的长度并新分配一个。

sys_pipe 中,第 21 行我们调用 make_pipe 创建一个管道并获取其读端和写端,第 22~25 行我们分别为读端和写端分配文件描述符并将它们放置在文件描述符表中的相应位置中。第 26~27 行我们则是将读端和写端的文件描述符写回到应用地址空间。

管道读写

首先来看如何为 Pipe 实现 File Trait 的 read 方法,即从管道的读端读取数据。在此之前,我们需要对于管道循环队列进行封装来让它更易于使用:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  1. // os/src/fs/pipe.rs
  2. impl PipeRingBuffer {
  3. pub fn read_byte(&mut self) -> u8 {
  4. self.status = RingBufferStatus::NORMAL;
  5. let c = self.arr[self.head];
  6. self.head = (self.head + 1) % RING_BUFFER_SIZE;
  7. if self.head == self.tail {
  8. self.status = RingBufferStatus::EMPTY;
  9. }
  10. c
  11. }
  12. pub fn available_read(&self) -> usize {
  13. if self.status == RingBufferStatus::EMPTY {
  14. 0
  15. } else {
  16. if self.tail > self.head {
  17. self.tail - self.head
  18. } else {
  19. self.tail + RING_BUFFER_SIZE - self.head
  20. }
  21. }
  22. }
  23. pub fn all_write_ends_closed(&self) -> bool {
  24. self.write_end.as_ref().unwrap().upgrade().is_none()
  25. }
  26. }

PipeRingBuffer::read_byte 方法可以从管道中读取一个字节,注意在调用它之前需要确保管道缓冲区中不是空的。它会更新循环队列队头的位置,并比较队头和队尾是否相同,如果相同的话则说明管道的状态变为空 EMPTY 。仅仅通过比较队头和队尾是否相同不能确定循环队列是否为空,因为它既有可能表示队列为空,也有可能表示队列已满。因此我们需要在 read_byte 的同时进行状态更新。

PipeRingBuffer::available_read 可以计算管道中还有多少个字符可以读取。我们首先需要需要判断队列是否为空,因为队头和队尾相等可能表示队列为空或队列已满,两种情况 available_read 的返回值截然不同。如果队列为空的话直接返回 0,否则根据队头和队尾的相对位置进行计算。

PipeRingBuffer::all_write_ends_closed 可以判断管道的所有写端是否都被关闭了,这是通过尝试将管道中保存的写端的弱引用计数升级为强引用计数来实现的。如果升级失败的话,说明管道写端的强引用计数为 0 ,也就意味着管道所有写端都被关闭了,从而管道中的数据不会再得到补充,待管道中仅剩的数据被读取完毕之后,管道就可以被销毁了。

下面是 Piperead 方法的实现:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  28. 28
  29. 29
  30. 30
  1. // os/src/fs/pipe.rs
  2. impl File for Pipe {
  3. fn read(&self, buf: UserBuffer) -> usize {
  4. asserteq!(self.readable, true);
  5. let mut buf_iter = buf.into_iter();
  6. let mut read_size = 0usize;
  7. loop {
  8. let mut ring_buffer = self.buffer.lock();
  9. let loop_read = ring_buffer.available_read();
  10. if loop_read == 0 {
  11. if ring_buffer.all_write_ends_closed() {
  12. return read_size;
  13. }
  14. drop(ring_buffer);
  15. suspend_current_and_run_next();
  16. continue;
  17. }
  18. // read at most loop_read bytes
  19. for in 0..loop_read {
  20. if let Some(byte_ref) = buf_iter.next() {
  21. unsafe { *byte_ref = ring_buffer.read_byte(); }
  22. read_size += 1;
  23. } else {
  24. return read_size;
  25. }
  26. }
  27. }
  28. }
  29. }
  • 第 6 行的 buf_iter 将传入的应用缓冲区 buf 转化为一个能够逐字节对于缓冲区进行访问的迭代器,每次调用 buf_iter.next() 即可按顺序取出用于访问缓冲区中一个字节的裸指针。

  • 第 7 行的 read_size 用来维护实际有多少字节从管道读入应用的缓冲区。

  • File::read 的语义是要从文件中最多读取应用缓冲区大小那么多字符。这可能超出了循环队列的大小,或者由于尚未有进程从管道的写端写入足够的字符,因此我们需要将整个读取的过程放在一个循环中,当循环队列中不存在足够字符的时候暂时进行任务切换,等待循环队列中的字符得到补充之后再继续读取。

    这个循环从第 8 行开始,第 10 行我们用 loop_read 来保存循环这一轮次中可以从管道循环队列中读取多少字符。如果管道为空则会检查管道的所有写端是否都已经被关闭,如果是的话,说明我们已经没有任何字符可以读取了,这时可以直接返回;否则我们需要等管道的字符得到填充之后再继续读取,因此我们调用 suspend_current_and_run_next 切换到其他任务,等到切换回来之后回到循环开头再看一下管道中是否有字符了。在调用之前我们需要手动释放管道自身的锁,因为切换任务时候的 __switch 并不是一个正常的函数调用。

    如果 loop_read 不为 0 ,在这一轮次中管道中就有 loop_read 个字节可以读取。我们可以迭代应用缓冲区中的每个字节指针并调用 PipeRingBuffer::read_byte 方法来从管道中进行读取。如果这 loop_read 个字节均被读取之后还没有填满应用缓冲区就需要进入循环的下一个轮次,否则就可以直接返回了。

Pipewrite 方法——即通过管道的写端向管道中写入数据的实现和 read 的原理类似,篇幅所限在这里不再赘述,感兴趣的读者可自行参考其实现。

小结

这一章讲述的重点是一种有趣的进程间通信的机制–管道。通过管道,能够把不同进程的输入和输出连接在一起,实现进程功能的组合。为了能够统一表示输入,输出,以及管道,我们给出了与 地址空间进程 齐名的操作系统抽象 文件 ,并基于文件重构了操作系统的输入/输出机制。目前,仅仅实现了非常简单的基于父子进程的管道机制。在操作系统层面,还缺乏对命令行参数的支持,在应用层面,还缺少I/O重定向和shell程序中基于 “|” 管道符号的支持。但我们已经建立了基本的进程通信机制。

在下面一章,我们将在操作系统中实现支持数据持久化存储的文件系统,形成更完成的文件机制,并进步改进执行进程的系统调用,支持进程执行的命令行参数。在应用程序的层面,完善I/O重定向,并在shell中支持基于 “|” 管道符号,形成更加灵活的进程间通信能力和shell命令行支持。