Linux 内核中的同步原语. 第一部分.

Introduction

这一部分为 linux-insides 这本书开启了新的章节。定时器和时间管理相关的概念在上一个章节已经描述过了。现在是时候继续了。就像你可能从这一部分的标题所了解的那样,本章节将会描述 Linux 内核中的同步原语。

像往常一样,在考虑一些同步相关的事情之前,我们会尝试去概括地了解什么是同步原语。事实上,同步原语是一种软件机制,提供了两个或者多个并行进程或者线程在不同时刻执行一段相同的代码段的能力。例如下面的代码片段:

  1. mutex_lock(&clocksource_mutex);
  2. ...
  3. ...
  4. ...
  5. clocksource_enqueue(cs);
  6. clocksource_enqueue_watchdog(cs);
  7. clocksource_select();
  8. ...
  9. ...
  10. ...
  11. mutex_unlock(&clocksource_mutex);

出自 kernel/time/clocksource.c 源文件。这段代码来自于 __clocksource_register_scale 函数,此函数添加给定的 clocksource 到时钟源列表中。这个函数在注册时钟源列表中生成两个不同的操作。例如 clocksource_enqueue 函数就是添加给定时钟源到注册时钟源列表——clocksource_list 中。注意这几行代码被两个函数所包围:mutex_lockmutex_unlock,这两个函数都带有一个参数——在本例中为 clocksource_mutex

这些函数展示了基于互斥锁 (mutex) 同步原语的加锁和解锁。当 mutex_lock 被执行,允许我们阻止两个或两个以上线程执行这段代码,而 mute_unlock 还没有被互斥锁的处理拥有者锁执行。换句话说,就是阻止在 clocksource_list上的并行操作。为什么在这里需要使用互斥锁? 如果两个并行处理尝试去注册一个时钟源会怎样。正如我们已经知道的那样,其中具有最大的等级(其具有最高的频率在系统中注册的时钟源)的列表中选择一个时钟源后,clocksource_enqueue 函数立即将一个给定的时钟源到 clocksource_list 列表:

  1. static void clocksource_enqueue(struct clocksource *cs)
  2. {
  3. struct list_head *entry = &clocksource_list;
  4. struct clocksource *tmp;
  5. list_for_each_entry(tmp, &clocksource_list, list)
  6. if (tmp->rating >= cs->rating)
  7. entry = &tmp->list;
  8. list_add(&cs->list, entry);
  9. }

如果两个并行处理尝试同时去执行这个函数,那么这两个处理可能会找到相同的 入口 (entry) 可能发生竞态条件 (race condition) 或者换句话说,第二个执行 list_add 的处理程序,将会重写第一个线程写入的时钟源。

除了这个简答的例子,同步原语在 Linux 内核无处不在。如果再翻阅之前的[章节] (https://xinqiu.gitbooks.io/linux-insides-cn/content/Timers/index.html) 或者其他章节或者如果大概看看 Linux 内核源码,就会发现许多地方都使用同步原语。我们不考虑 mutex 在 Linux 内核是如何实现的。事实上,Linux 内核提供了一系列不同的同步原语:

  • mutex;
  • semaphores;
  • seqlocks;
  • atomic operations;
  • 等等。

现在从自旋锁 (spinlock) 这个章节开始。

Linux 内核中的自旋锁。

自旋锁简单来说是一种低级的同步机制,表示了一个变量可能的两个状态:

  • acquired;
  • released.

每一个想要获取自旋锁的处理,必须为这个变量写入一个表示自旋锁获取 (spinlock acquire)状态的值,并且为这个变量写入锁释放 (spinlock released)状态。如果一个处理程序尝试执行受自旋锁保护的代码,那么代码将会被锁住,直到占有锁的处理程序释放掉。在本例中,所有相关的操作必须是 原子的 (atomic),来阻止竞态条件状态。自旋锁在 Linux 内核中使用 spinlock_t 类型来表示。如果我们查看 Linux 内核代码,我们会看到,这个类型被广泛地 (widely) 使用。spinlock_t 的定义如下:

  1. typedef struct spinlock {
  2. union {
  3. struct raw_spinlock rlock;
  4. #ifdef CONFIG_DEBUG_LOCK_ALLOC
  5. # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
  6. struct {
  7. u8 __padding[LOCK_PADSIZE];
  8. struct lockdep_map dep_map;
  9. };
  10. #endif
  11. };
  12. } spinlock_t;

这段代码在 include/linux/spinlock_types.h 头文件中定义。可以看出,它的实现依赖于 CONFIG_DEBUG_LOCK_ALLOC 内核配置选项这个状态。现在我们跳过这一块,因为所有的调试相关的事情都将会在这一部分的最后。所以,如果 CONFIG_DEBUG_LOCK_ALLOC 内核配置选项不可用,那么 spinlock_t 则包含联合体 (union),这个联合体有一个字段——raw_spinlock

  1. typedef struct spinlock {
  2. union {
  3. struct raw_spinlock rlock;
  4. };
  5. } spinlock_t;

raw_spinlock 结构的定义在相同的头文件中并且表达了普通 (normal) 自旋锁的实现。让我们看看 raw_spinlock结构是如何定义的:

  1. typedef struct raw_spinlock {
  2. arch_spinlock_t raw_lock;
  3. #ifdef CONFIG_GENERIC_LOCKBREAK
  4. unsigned int break_lock;
  5. #endif
  6. } raw_spinlock_t;

这里的 arch_spinlock_t 表示了体系结构指定的自旋锁实现并且 break_lock 字段持有值—— 为1,当一个处理器开始等待而锁被另一个处理器持有时,使用的对称多处理器 (SMP) 系统的例子中。这样就可以防止长时间加锁。考虑本书的 x86_64 架构,因此 arch_spinlock_t 被定义在 arch/x86/include/asm/spinlock_types.h 头文件中,并且看上去是这样:

  1. #ifdef CONFIG_QUEUED_SPINLOCKS
  2. #include <asm-generic/qspinlock_types.h>
  3. #else
  4. typedef struct arch_spinlock {
  5. union {
  6. __ticketpair_t head_tail;
  7. struct __raw_tickets {
  8. __ticket_t head, tail;
  9. } tickets;
  10. };
  11. } arch_spinlock_t;

正如我们所看到的,arch_spinlock 结构的定义依赖于 CONFIG_QUEUED_SPINLOCKS 内核配置选项的值。这个 Linux内核配置选项支持使用队列的 自旋锁。这个自旋锁的特殊类型替代了 acquiredreleased 原子值,在队列上使用原子操作。如果 CONFIG_QUEUED_SPINLOCKS 内核配置选项启动,那么 arch_spinlock_t 将会被表示成如下的结构:

  1. typedef struct qspinlock {
  2. atomic_t val;
  3. } arch_spinlock_t;

来自于 include/asm-generic/qspinlock_types.h 头文件。

目前我们不会在这个结构上停止探索,在考虑 arch_spinlockqspinlock 之前,先看看自旋锁上的操作。 Linux内核在自旋锁上提供了一下主要的操作:

  • spin_lock_init ——给定的自旋锁进行初始化;
  • spin_lock ——获取给定的自旋锁
  • spin_lock_bh ——禁止软件中断并且获取给定的自旋锁
  • spin_lock_irqsavespin_lock_irq——禁止本地处理器上的中断,并且保存/不保存之前的中断状态的标识 (flag)
  • spin_unlock ——释放给定的自旋锁;
  • spin_unlock_bh ——释放给定的自旋锁并且启动软件中断;
  • spin_is_locked - 返回给定的自旋锁的状态;
  • 等等。

来看看 spin_lock_init 宏的实现。就如我已经写过的一样,这个宏和其他宏定义都在 include/linux/spinlock.h 头文件里,并且 spin_lock_init 宏如下所示:

  1. #define spin_lock_init(_lock) \
  2. do { \
  3. spinlock_check(_lock); \
  4. raw_spin_lock_init(&(_lock)->rlock); \
  5. } while (0)

正如所看到的,spin_lock_init 宏有一个自旋锁,执行两步操作:检查我们看到的给定的自旋锁和执行 raw_spin_lock_initspinlock_check的实现相当简单,实现的函数仅仅返回已知的自旋锁raw_spinlock_t,来确保我们精确获得正常 (normal) 原生自旋锁:

  1. static __always_inline raw_spinlock_t *spinlock_check(spinlock_t *lock)
  2. {
  3. return &lock->rlock;
  4. }

raw_spin_lock_init 宏:

  1. # define raw_spin_lock_init(lock) \
  2. do { \
  3. *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); \
  4. } while (0) \

__RAW_SPIN_LOCK_UNLOCKED 的值和给定的自旋锁赋值给给定的 raw_spinlock_t。就像我们能从 __RAW_SPIN_LOCK_UNLOCKED 宏的名字中了解的那样,这个宏为给定的自旋锁执行初始化操作,并且将锁设置为释放 (released) 状态。宏的定义在 include/linux/spinlock_types.h 头文件中,并且扩展了一下的宏:

  1. #define __RAW_SPIN_LOCK_UNLOCKED(lockname) \
  2. (raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)
  3. #define __RAW_SPIN_LOCK_INITIALIZER(lockname) \
  4. { \
  5. .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \
  6. SPIN_DEBUG_INIT(lockname) \
  7. SPIN_DEP_MAP_INIT(lockname) \
  8. }

正如之前所写的一样,我们不考虑同步原语调试相关的东西。在本例中也不考虑 SPIN_DEBUG_INITSPIN_DEP_MAP_INIT 宏。于是 __RAW_SPINLOCK_UNLOCKED 宏被扩展成:

  1. *(&(_lock)->rlock) = __ARCH_SPIN_LOCK_UNLOCKED;

__ARCH_SPIN_LOCK_UNLOCKED 宏是:

  1. #define __ARCH_SPIN_LOCK_UNLOCKED { { 0 } }

还有:

  1. #define __ARCH_SPIN_LOCK_UNLOCKED { ATOMIC_INIT(0) }

这是对于 [x86_64] 架构,如果 CONFIG_QUEUED_SPINLOCKS 内核配置选项启用的情况。那么,在 spin_lock_init 宏的扩展之后,给定的自旋锁将会初始化并且状态变为——解锁 (unlocked)

从这一时刻起我们了解了如何去初始化一个自旋锁,现在考虑 Linux 内核为自旋锁的操作提供的 API。首先是:

  1. static __always_inline void spin_lock(spinlock_t *lock)
  2. {
  3. raw_spin_lock(&lock->rlock);
  4. }

此函数允许我们获取一个自旋锁。raw_spin_lock 宏定义在同一个头文件中,并且扩展了 _raw_spin_lock 函数的调用:

  1. #define raw_spin_lock(lock) _raw_spin_lock(lock)

就像在 include/linux/spinlock.h 头文件所了解的那样,_raw_spin_lock 宏的定义依赖于 CONFIG_SMP 内核配置参数:

  1. #if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK)
  2. # include <linux/spinlock_api_smp.h>
  3. #else
  4. # include <linux/spinlock_api_up.h>
  5. #endif

因此,如果在 Linux内核中 SMP 启用了,那么 _raw_spin_lock 宏就在 arch/x86/include/asm/spinlock.h 头文件中定义,并且看起来像这样:

  1. #define _raw_spin_lock(lock) __raw_spin_lock(lock)

__raw_spin_lock 函数的定义:

  1. static inline void __raw_spin_lock(raw_spinlock_t *lock)
  2. {
  3. preempt_disable();
  4. spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
  5. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
  6. }

就像你们可能了解的那样, 首先我们禁用了抢占,通过 include/linux/preempt.h (在 Linux 内核初始化进程章节的第九部分会了解到更多关于抢占)中的 preempt_disable 调用实现禁用。当我们将要解开给定的自旋锁,抢占将会再次启用:

  1. static inline void __raw_spin_unlock(raw_spinlock_t *lock)
  2. {
  3. ...
  4. ...
  5. ...
  6. preempt_enable();
  7. }

当程序正在自旋锁时,这个已经获取锁的程序必须阻止其他程序方法的抢占。spin_acquire 宏通过其他宏宏展调用实现:

  1. #define spin_acquire(l, s, t, i) lock_acquire_exclusive(l, s, t, NULL, i)
  2. #define lock_acquire_exclusive(l, s, t, n, i) lock_acquire(l, s, t, 0, 1, n, i)

lock_acquire 函数:

  1. void lock_acquire(struct lockdep_map *lock, unsigned int subclass,
  2. int trylock, int read, int check,
  3. struct lockdep_map *nest_lock, unsigned long ip)
  4. {
  5. unsigned long flags;
  6. if (unlikely(current->lockdep_recursion))
  7. return;
  8. raw_local_irq_save(flags);
  9. check_flags(flags);
  10. current->lockdep_recursion = 1;
  11. trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip);
  12. __lock_acquire(lock, subclass, trylock, read, check,
  13. irqs_disabled_flags(flags), nest_lock, ip, 0, 0);
  14. current->lockdep_recursion = 0;
  15. raw_local_irq_restore(flags);
  16. }

就像之前所写的,我们不考虑这些调试或跟踪相关的东西。lock_acquire 函数的主要是通过 raw_local_irq_save 宏调用禁用硬件中断,因为给定的自旋锁可能被启用的硬件中断所获取。以这样的方式获取的话程序将不会被抢占。注意 lock_acquire 函数的最后将使用 raw_local_irq_restore 宏的帮助再次启动硬件中断。正如你们可能猜到的那样,主要工作将在 __lock_acquire 函数中定义,这个函数在 kernel/locking/lockdep.c 源代码文件中。

__lock_acquire 函数看起来很大。我们将试图去理解这个函数要做什么,但不是在这一部分。事实上这个函数于 Linux内核锁验证器 (lock validator) 密切相关,而这也不是此部分的主题。如果我们要返回 __raw_spin_lock 函数的定义,我们将会发现最终这个定义包含了以下的定义:

  1. LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);

LOCK_CONTENDED 宏的定义在 include/linux/lockdep.h 头文件中,而且只是使用给定自旋锁调用已知函数:

  1. #define LOCK_CONTENDED(_lock, try, lock) \
  2. lock(_lock)

在本例中,lock 就是 include/linux/spinlock.h 头文件中的 do_raw_spin_lock,而_lock 就是给定的 raw_spinlock_t

  1. static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
  2. {
  3. __acquire(lock);
  4. arch_spin_lock(&lock->raw_lock);
  5. }

这里的 __acquire 只是[稀疏(sparse)]相关宏,并且当前我们也对这些不感兴趣。arch_spin_lock 函数定义的位置依赖于两件事:第一是系统架构,第二是我们是否使用了队列自旋锁(queued spinlocks)。本例中我们仅以 x86_64 架构为例介绍,因此 arch_spin_lock 的定义的宏表示源自 include/asm-generic/qspinlock.h 头文件中:

  1. #define arch_spin_lock(l) queued_spin_lock(l)

如果使用 队列自旋锁,或者其他例子中,arch_spin_lock 函数定在 arch/x86/include/asm/spinlock.h 头文件中,如何处理?现在我们只考虑普通的自旋锁队列自旋锁相关的信息将在以后了解。来再看看 arch_spinlock 结构的定义,理解以下 arch_spin_lock 函数的实现:

  1. typedef struct arch_spinlock {
  2. union {
  3. __ticketpair_t head_tail;
  4. struct __raw_tickets {
  5. __ticket_t head, tail;
  6. } tickets;
  7. };
  8. } arch_spinlock_t;

这个自旋锁的变体被称为——标签自旋锁 (ticket spinlock)。 就像我们锁了解的,标签自旋锁包括两个部分。当锁被获取,如果有程序想要获取自旋锁它就会将尾部(tail)值加1。如果尾部不等于头部, 那么程序就会被锁住,直到这些变量的值不再相等。来看看arch_spin_lock函数上的实现:

  1. static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
  2. {
  3. register struct __raw_tickets inc = { .tail = TICKET_LOCK_INC };
  4. inc = xadd(&lock->tickets, inc);
  5. if (likely(inc.head == inc.tail))
  6. goto out;
  7. for (;;) {
  8. unsigned count = SPIN_THRESHOLD;
  9. do {
  10. inc.head = READ_ONCE(lock->tickets.head);
  11. if (__tickets_equal(inc.head, inc.tail))
  12. goto clear_slowpath;
  13. cpu_relax();
  14. } while (--count);
  15. __ticket_lock_spinning(lock, inc.tail);
  16. }
  17. clear_slowpath:
  18. __ticket_check_and_clear_slowpath(lock, inc.head);
  19. out:
  20. barrier();
  21. }

arch_spin_lock 函数在一开始能够使用尾部—— 1__raw_tickets 结构初始化:

  1. #define __TICKET_LOCK_INC 1

inclock->tickets 的下一行执行 xadd 操作。这个操作之后 inc将存储给定标签 (tickets) 的值,然后 tickets.tail 将增加 inc1尾部值增加 1 意味着一个程序开始尝试持有锁。下一步做检查,检查头部尾部是否有相同的值。如果值相等,这意味着没有程序持有锁并且我们去到了 out 标签。在 arch_spin_lock 函数的最后,我们可能了解了 barrier 宏表示 屏障指令 (barrier instruction),该指令保证了编译器将不更改进入内存操作的顺序(更多关于内存屏障的知识可以阅读内核文档 (documentation))。

如果前一个程序持有锁而第二个程序开始执行 arch_spin_lock 函数,那么 头部将不会等于``尾部,因为尾部头部1。这样,程序将循环发生。在每次循坏迭代的时候头部尾部的值进行比较。如果值不相等,cpu_relax ,也就是 NOP 指令将会被调用:

  1. #define cpu_relax() asm volatile("rep; nop")

然后将开始循环的下一次迭代。如果值相等,这意味着持有锁的程序,释放这个锁并且下一个程序获取这个锁。

spin_unlock 操作遍布所有有 spin_lock 的宏或函数中,当然,使用的是 unlock 前缀。最后,arch_spin_unlock 函数将会被调用。如果看看 arch_spin_lock 函数的实现,我们将了解到这个函数增加了 lock tickets 列表的头部

  1. __add(&lock->tickets.head, TICKET_LOCK_INC, UNLOCK_LOCK_PREFIX);

spin_lockspin_unlock 的组合使用中,我们得到一个队列,其头部包含了一个索引号,映射了当前执行的持有锁的程序,而尾部包含了一个索引号,映射了最后尝试持有锁的程序:

  1. +-------+ +-------+
  2. | | | |
  3. head | 7 | - - - | 7 | tail
  4. | | | |
  5. +-------+ +-------+
  6. |
  7. +-------+
  8. | |
  9. | 8 |
  10. | |
  11. +-------+
  12. |
  13. +-------+
  14. | |
  15. | 9 |
  16. | |
  17. +-------+

目前这就是全部。这一部分不涵盖所有的自旋锁 API,但我认为这个概念背后的主要思想现在一定清楚了。

结论

涵盖 Linux 内核中的同步原语的第一部分到此结束。在这一部分,我们遇见了第一个 Linux 内核提供的同步原语自旋锁。下一部分将会继续深入这个有趣的主题,而且将会了解到其他同步相关的知识。

如果您有疑问或者建议,请在twitter 0xAX 上联系我,通过 email 联系我,或者创建一个 issue

友情提示:英语不是我的母语,对于译文给您带来了的不便我感到非常抱歉。如果您发现任何错误请给我发送PR到 linux-insides

链接