Linux 内核同步方式及原理
本文基于 linux kernel 3.10.105。
原子操作
原子操作是其他同步方法的基石。原子操作,可以保证指令以原子的方式执行——执行过程不被打断。
内核提供了两组原子操作接口——一组针对整数,一组针对单独的位。在Linux支持的所有体系结构上都实现了这两组接口。
原子整数操作
针对整数的原子操作只能对atomic_t类型的数据进行处理。引入特殊数据类型,主要是出于以下原因:
- 让原子函数只接收atomic_t类型的操作数,可以确保原子操作只与这种特殊类型数据一起使用。同时,这也保证了该类型的数据不会被传递给任何非原子函数。
- 使用atomic_t类型确保编译器不对相应的值进行访问优化——这点使得原子操作最终接收到正确的内存地址,而不只是一个别名(关于这一点,之前atomic_t定义是 volatile int counter,利用volatile读写时从内存取值。而现在定义只是 int counter,是在外部进行操作时指定volatile)。
最后,在不同体系结构上实现原子操作的时候,使用atomic_t可以屏蔽其间的差异。atomic_t类型定义如下:
1 | typedef struct { |
1 | Tips: |
使用原子整形操作需要的声明都在<asm/atomic.h>中。
原子整数操作最常见的用途就是实现计数器。原子操作通常是内联函数,往往是通过内联汇编指令来实现。如果某个函数本来就是原子的,那么它往往被定义成一个宏。 例如,在大部分体系结构上,读取一个字本身就是一种原子操作,也就是说,在对有个字进行写入操作期间不可能完成对该字的读取。所以,atomic_read()只需返回atomic_t类型的整数值就可以了。
1 | Tips: |
1 | /** |
下面分析几个函数的实现(x86架构):
atomic_add
1 |
|
两步分析
LOCK_PREFIX
参考 GNU Assembler (GAS)手册 ,如果连接失效,可点击 此处,Documentation 部分。
1
2
3
4
5
6.section name [, "flags"[, @type[,flag_specific_arguments]]]
使用.section指令将以下代码组装到名为name的节中。
flag "a" 表示 section is allocatable。
此指令会替代当前的section(.text)。
可以查看内核代码,以确定 smp_locks 的作用。简单来说是在单处理器中将锁变为空操作。1
2
3.balign 4 将当前section的位数计数器加到4字节对齐。
.long 671f - . .long(等于.int)是一个指令,告诉汇编程序在这里汇编一个32位的数量。当遇到的数据看起来不像任何已知的指令时,反汇编器通常发出这些数据。通常情况下,当存在一个文字池时就是这样,因为那些不包含机器码和数据的反汇编程序会打印它们包含的数据。.long指令对于.bss节无效。
.previous 指令继续处理上一节。1
2
3
4
5
6
7
8lock指令:多处理器环境中,"LOCK#"信号确保了处理器在信号有效时独占使用任何共享存储器。在所有的 X86 CPU 上都具有锁定一个特定内存地址的能力,当这个特定内存地址被锁定后,它就可以阻止其他的系统总线读取或修改这个内存地址。这种能力是通过 LOCK 指令前缀再加上下面的汇编指令来实现的。当使用 LOCK 指令前缀时,它会使 CPU 宣告一个 "LOCK#"信号,这样就能确保在多处理器系统或多线程竞争的环境下互斥地使用这个内存地址。当指令执行完毕,这个锁定动作也就会消失。
LOCK前缀只能作为以下指令的前缀:
ADD,ADC,AND,BTC,BTR,BTS,CMPXCHG,CMPXCH8B,CMPXCHG16B,DEC,INC,NEG,NOT,OR,SBB,SUB,XOR,XADD和XCHG。
如果LOCK前缀与其中一个一起使用这些指令和源操作数是内存操作数,未定义的操作码异常("#UD")可能产生。如果LOCK前缀不与任何指令一起使用,也会产生未定义的操作码异常。
注意:XCHG 和 XADD (以及所有以 'X' 开头的指令)都能够保证在多处理器系统下的原子操作,它们总会宣告一个 "LOCK#" 信号,而不管有没有 LOCK 前缀。在X86平台上,CPU提供了在指令执行期间对总线加锁的手段。CPU上有一根引线#HLOCK pin连到北桥,如果汇编语言的程序中在一条指令前面加上前缀“LOCK”,经过汇编以后的机器代码就使CPU在执行这条指令的时候把#HLOCK pin的电位拉低,持续到这条指令结束时放开,从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。
addl指令
1
addl %1,%0 : "+m" (v->counter) : "ir" (i)
addl指令含义为 将目的操作数和源操作数求和,并将值写到目的操作数。
DOS/Windows 下的汇编语言,是 Intel 风格的。但在 Unix 和 Linux 系统中,更多采用的还是 AT&T 格式。
AT&T 和 Intel 格式中的源操作数和目标操作数的位置正好相反。在 Intel 汇编格式中,目标操作数在源操作数的左边;而在 AT&T 汇编格式中,目标操作数在源操作数的右边。
GCC中asm格式为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15asm [volatile] ( AssemblerTemplate
: OutputOperands
[ : InputOperands
[ : Clobbers ] ])
asm [volatile] goto ( AssemblerTemplate
:
: InputOperands
: Clobbers
: GotoLabels)
OutputOperands(输出约束)必须以“=”(覆盖现有值的变量)或“+”(读写时)开始。
在前缀之后,必须有一个或多个附加约束来描述值所在的位置。常见的约束条件包括“r”表示寄存器,“m”表示内存。当列出多个可能的位置(例如,“= rm”)时,编译器将根据当前上下文选择最有效的位置。
InputOperands(输入约束)不能以'='或'+'开头。当列出多个可能位置(例如,“irm”)时,编译器将根据当前上下文选择最有效的位置。%0、%1可以简单的理解为将 OutputOperands和InputOperands按顺序排列。
“+m” 表示由内存中读取或直接写入内存。
“i” 立即整型操作数。
“r” 表示数据可以从寄存器读取或写入到寄存器。因此,代码中的语句就比较好理解了。
atomic_sub 与 atomic_add 类似。
atomic_sub_and_test
1 | static inline int atomic_sub_and_test(int i, atomic_t *v) |
atomic_add_return
1 | static inline int atomic_add_return(int i, atomic_t *v) |
原子位操作
内核也提供了一组针对位这一级数据进行操作的函数,定义在<asm/bitops.h>中。位操作函数是对普通的内存地址进行操作的,参数是一个指针和一个位号,第0位是给定地址的最低有效位。
set_bit
1 |
|
- GCC内置函数__builtin_constant_p用来检测值是否为常量。
- BITOP_ADDR 是将传入的指针进行截取,只保留需要操作的字节。
- CONST_MASK_ADDR 可理解为将 addr 以字节进行划分,根据 nr 找到需要操作的字节。
- CONST_MASK 字节偏移量。
- ‘orb’ 是 “位或”操作。’orb src, dest’,dest的第src位进行位或操作。
- ‘bts’ 是 ‘bit test and set’,’bts src, dest’,将所选位(dest 的 src位)的值写入CF寄存器,然后将所选位 置1。
- 约束符’I’表示值1到8的范围。可参考此处。
clear_bit 与 set_bit 类似,不过将’orb’ 变为 ‘andb’(位与),将’bts’ 变为 ‘btr’(bit test and reset)。
change_bit 使用了 ‘xorb’(位异或)和’btc’(bit test and complement,select bit <- not (select bit))。
test_and_set_bit
1 | static inline int test_and_set_bit(int nr, volatile unsigned long *addr) |
- ‘bts’ 第一步就是将所选位的值写入 CF 寄存器。
- ‘sbb’ 含义为 (DEST ← (DEST – (SRC + CF));),源目相同时,仅是将CF寄存器的值写入目的操作数。
其他原子位操作原理基本与这两种函数类似。
为方便起见内核还提供了一组与原子位操作对应的非原子位操作(无 ‘LOCK_PREFIX’ 锁操作)。非原子位操作函数名比原子位操作函数名前缀多两个下划线。如果已确定操作不需要原子性(已经用锁保护了自己的数据),那么这些非原子的操作可能会执行得更快些。
自旋锁
Linux内核中最常见的锁是自旋锁(spin lock)。自旋锁最多只能被一个可执行线程持有。如果一个执行线程试图获得一个被己经持有(即所谓的争用)的自旋锁,那么该线程就会一直进行忙循环——旋转——等待锁重新可用。要是锁未被争用,请求锁的执行线程便能立刻得到它,继续执行。在任意时间,自旋锁都可以防止多于一个的执行线程同时进入临界区。同一个锁可以用在多个位置。
一个被争用的自旋锁使得请求它的线程在等待锁重新可用时自旋(特别浪费处理器时间),这种行为是自旋锁的要点。所以自旋锁不应该被长时间持有。事实上,这点正是使用自旋锁的初衷:在短期间内进行轻量级加锁。还可以采取另外的方式来处理对锁的争用:让请求线程睡眠,直到锁重新可用时再唤醒它。这样处理器就不必循环等待,可以去执行其他代码。这也会带来一定的开销—这里有两次明显的上下文切换,被阻塞的线程要换出和换入,与实现自旋锁的少数几行代码相比,上下文切换当然有较多的代码。因此,持有自旋锁的时间最好小于完成两次上下文切换的耗时。当然我们大多数人都不会无聊到去测量上下文切换的耗时,所以我们让持有自旋锁的时间应尽可能的短就可以。
自旋锁的实现和体系结构密切相关,代码往往通过汇编实现。与体系结构相关的代码定义在文件<asm/spinlock.h>中,实际需要用到的接口定义在文件<linux/spinlock.h>中。
自旋锁在同一时刻只能被一个执行线程持有,因此一个时刻只能有一个线程位于临界区内,这就为多处理器机器提供了防止并发访问所需的保护机制。注意在单处理器机器上,编译的时候并不会加入自旋锁。它仅仅被当做一个设置内核抢占机制是否被启用的开关。如果禁止内核抢占,那么在编译时自旋锁会被完全剔除出内核。
警告:自旋锁是不可递归的!
内核实现的自旋锁是不可递归的,这点不同于自旋锁在其他操作系统中的实现。如果你试图得到一个你正持有的锁,你必须自旋,等待你自己释放这个锁。但你处于自旋忙等待中,所以你永远没有机会释放锁,于是你被自己锁死了。千万小心自旋锁!
自旋锁可以使用在中断处理程序中。在中断处理程序中使用自旋锁时,一定要在获取锁之前,首先禁止本地中断(在当前处理器上的中断请求),否则,中断处理程序就会打断正持有锁的内核代码,有可能会试图去争用这个已经被持有的自旋锁。这样一来,中断处理程序就会自旋,等待该锁重新可用,但是锁的持有者在这个中断处理程序执行完毕前不可能运行。这正是我们在前面的内容中提到的双重请求死锁。注意,需要关闭的只是当前处理器上的中断。如果中断发生在不同的处理器上,即使中断处理程序在同一锁上自旋,也不会妨碍锁的持有者(在不同处理器上)最终释放锁。
调调试自旋锁 配置选项CONFIG_DEBUG_SPINLOCK为使用自旋锁的代码加入了许多调试检测手段。例如,激活了该选项,内核就会检查是否使用了未初始化的锁,是否在还没加锁的时候就要对锁执行开锁操作。在测试代码时,总是应该激活这个选项。如果需要进一步全程调试锁,还应该打开CONFIG_DEBUG_LOCK_ALLOC选项。
方法
spin_lock_init
1 | include/linux/spinlock.h |
1 | include/linux/spinlock.h |
1 | include/linux/spinlock_types.h |
1 | /*分析x86架构*/ |
x86架构,初始化自旋锁,就是对最内层的lock置0。
spin_lock
1 | include/linux/spinlock.h |
单处理器
1
2
3//单处理器 UP
include/linux/spinlock_api_up.h1
2
3include/linux/spinlock_api_up.h
1
2
3
4
5
6
7
8
9/*
当前线程preempt_count加1
barrier()为内存屏障,以保证数据 顺序性
*/1
2
3
4
5
6/*
Sparse 的 GCC 扩展,利用 __context__ 来对代码进行检查,参数x 的引用计数 +1
*/
include/linux/compiler.h单处理器时,并没有发生真正的锁定,所以内核唯一要做的就是保持抢占计数和irq标志,来抑制未使用的锁变量的编译器警告,并添加适当的检查器注释。
多处理器
1
2
3
4
5
6//多处理器 SMP
kernel/spinlock.c
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
__raw_spin_lock(lock);
}1
2
3
4
5
6
7
8
9include/linux/spinlock_api_smp.h
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
/*禁止内核抢占,本CPU生效*/
preempt_disable();
/*未定义锁调试时,函数为空。todo*/
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14//提供锁的统计信息 todo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
设置了CONFIG_DEBUG_SPINLOCK, 引用文件 include/linux/spinlock.h 中的函数。
*/
extern void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock);
/*
不明白函数定义后加一个宏是什么意思?
todo
*/
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
__acquire(lock);
arch_spin_lock(&lock->raw_lock);
}
--->>>
arch/x86/include/asm/spinlock.h //选择x86架构分析
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
__ticket_spin_lock(lock);
}
--->>>
arch/x86/include/asm/spinlock.h
/*
CPU数量
32位环境中默认是 32
64位环境中默认是 5120
*/32位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock)
{
short inc = 0x0100;
asm volatile (
LOCK_PREFIX "xaddw %w0, %1\n"
"1:\t"
"cmpb %h0, %b0\n\t"
"je 2f\n\t"
"rep ; nop\n\t"
"movb %1, %b0\n\t"
/* don't need lfence here, because loads are in-order */
"jmp 1b\n"
"2:"
: "+Q" (inc), "+m" (lock->slock)
:
: "memory", "cc");
}“xaddw %w0, %1\n”
‘xaddw ‘指令含义为 交换后求和,’xaddw SRC , DEST’—>>> (TEMP ← SRC + DEST;SRC ← DEST;DEST ← TEMP;),’%w0’ 在 GCC 编译器中表示 ax 寄存器,’%h0’ 代表 ax的高8位,即ah; ‘%b0’ 代表 ax的低8位,即al。
此时, ax寄存器中的值为 inc的值,即 0x0100。
那么,此句含义即为: xaddw %ax, lock->slock。操作之后,tmp = lock->slock + inc; inc = lock->slock;lock->slock = tmp;
锁初始化时,值为0。经过操作之后,lock->slock = 0x0100,%ax = 0。
‘1:’
定义一个标号。
“cmpb %h0, %b0\n\t”
‘cmpb’ 位比较。’cmpb %ah, %al’,由于 %ax为0,源目操作数相同。
“je 2f\n\t”
如果上一步比较结果为真(相同),则跳转到标号为’2’的地方,本段中’2’表示退出。
“rep ; nop\n\t”
此条指令与’pause’指令相同,用于不支持’pause’指令的汇编程序。在不支持超线程的处理器上,就像’nop’一样不做任何事,但在支持超线程的处理器上,它被用作向处理器提示正在执行spinloop以提高性能。
“movb %1, %b0\n\t”
‘movb SRC, DEST’—>>> (DEST ← SRC),此处为 %al ← lock->slock。
“jmp 1b\n”
跳到标志’1’。
“2:”
定义一个标号。
此段含义简单概括为,将锁的值赋给 ax ,然后比较 ax 的高8位和低8位,相同则跳出(锁已经有了新值,其高位已加1,但是低位没变),不同则代表锁已经执行过加锁这一步,那么进入循环,循环中是将 内存中锁的值赋给 al,然后继续比较。为什么仅赋值低8位呢 ?因为 unlock 是 低位加1。
因此,可理解为,lock 高位加1,就是加锁,低位加1就是解锁。解锁之后不在分析。
64位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock)
{
int inc = 0x00010000;
int tmp;
asm volatile(LOCK_PREFIX "xaddl %0, %1\n"
"movzwl %w0, %2\n\t"
"shrl $16, %0\n\t"
"1:\t"
"cmpl %0, %2\n\t"
"je 2f\n\t"
"rep ; nop\n\t"
"movzwl %1, %2\n\t"
/* don't need lfence here, because loads are in-order */
"jmp 1b\n"
"2:"
: "+r" (inc), "+m" (lock->slock), "=&r" (tmp)
:
: "memory", "cc");
}64位与32位原理一样,不过32位是 高8位 和 低8位进行比较,而64位是 高16位 与 低16位 进行比较。下面仅仅说明几个指令的含义:
“movzwl %w0, %2\n\t”
将 %w0 即 eax (64位) 的值赋值到 tmp,但是高16位用0填充,即 低16位赋给 tmp。
“shrl $16, %0\n\t”
%0 逻辑右移 16位,即 %0 仅存 高16位。
之后流程和32位相同。
spin_lock_bh
之后的函数直接分析SMP系统(x86)。
1 | include/linux/spinlock.h |
1 | kernel/spinlock.c |
1 | include/linux/spinlock_api_smp.h |
BH 和 非BH区别主要是增加了 函数 “local_bh_disable” ,看一下其定义:
1 | kernel/softirq.c |
- “__builtin_return_address” 接收一个称为
level
的参数。这个参数定义希望获取返回地址的调用堆栈级别。例如,如果指定level
为0
,那么就是请求当前函数的返回地址。如果指定level
为1
,那么就是请求进行调用的函数的返回地址,依此类推。使用__builtin_return_address
捕捉返回地址,以便在以后进行跟踪时使用这个地址。 - preempt_count 加 本地软中断偏移(SOFTIRQ_OFFSET),之后可用 宏 ‘softirq_count()’ 进行判断是否已禁用软中断。
spin_lock_irq
irq 锁是禁止了硬中断。过程代码不在赘述,直接到最后 多的 “raw_local_irq_disable()”函数:
1 | arch/x86/include/asm/irqflags.h |
x86架构直接调用 “CLI” 指令。大多数情况下,”CLI”会清除EFLAGS寄存器中的IF标志,并且不会影响其他标志。 清除IF标志会导致处理器忽略可屏蔽的外部中断。
spin_lock_irqsave
1 |
|
1 | include/linux/typecheck.h |
1 | kernel/spinlock.c |
仅分析 宏’local_irq_save()’ 及 函数’do_raw_spin_lock_flags()’:
local_irq_save
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18include/linux/irqflags.h
--->>>
arch/x86/include/asm/irqflags.h
--->>>
static inline unsigned long __raw_local_irq_save(void)
{
unsigned long flags = __raw_local_save_flags();
raw_local_irq_disable();
return flags;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16static inline unsigned long __raw_local_save_flags(void)
{
return native_save_fl();
}
--->>>
static inline unsigned long native_save_fl(void)
{
unsigned long flags;
asm volatile("# __raw_save_flags\n\t"
"pushf ; pop %0"
: "=rm" (flags)
:
: "memory");
return flags;
}“# __raw_save_flags\n\t”
注释。
“pushf”
将eflags寄存器的内容入栈。
“pop %0”
栈顶内容载入 目的操作数中,此处为 flags。
1
2
3
4
5
6
7
8
9
10
11arch/x86/include/asm/irqflags.h
static inline void raw_local_irq_disable(void)
{
native_irq_disable();
}
--->>>
static inline void native_irq_disable(void)
{
/*CLI 指令前面已做介绍*/
asm volatile("cli": : :"memory");
}do_raw_spin_lock_flags
1
2
3
4
5
6static inline void
do_raw_spin_lock_flags(raw_spinlock_t *lock, unsigned long *flags) __acquires(lock)
{
__acquire(lock);
arch_spin_lock_flags(&lock->raw_lock, *flags);
}1
2
3
4
5
6
7
8arch/x86/include/asm/spinlock.h
static __always_inline void arch_spin_lock_flags(arch_spinlock_t *lock,
unsigned long flags)
{
arch_spin_lock(lock);
}
--->>>
arch_spin_lock() 之前已经分析过。
这些函数的对应函数都是其逆操作。
自旋锁和下半部
由于下半部可以抢占进程上下文中的代码,所以当下半部和进程上下文共享数据时,必须对进程上下文中的共享数据进行保护,所以需要加锁的同时还要禁止下半部执行。同样,由于中断处理程序可以抢占下半部,所以如果中断处理程序和下半部共享数据,那么就必须在获取恰当的锁的同时还要禁止中断。
同类的tasklet不可能同时运行,所以对于同类tasklet中的共享数据不需要保护。但是当数据被两个不同种类的tasklet共享时,就需要在访问下半部中的数据前先获得一个普通的自旋锁。这里不需要禁止下半部,因为在同一个处理器上绝不会有tasklet相互抢占的情况。
对于软中断,无论是否同种类型,如果数据被软中断共享,那么它必须得到锁的保护。这是因为,即使是同种类型的两个软中断也可以同时运行在一个系统的多个处理器上。但是,同一处理器上的一个软中断绝不会抢占另一个软中断,因此,根本投必要禁止下半部。
读写自旋锁
有时候,锁的用途明确的分为读取和写入两个场景。当更新(写入)链表时,不能有其他代码井发地写链表或从链表中读取数据,写操作要求完全互斥。另一方面,当对其检索(读取)链表时,只要其他程序不对链表进行写操作就行了。
只要没有写操作,多个并发的读操作都是安全的。
当对某个数据结构的操作可以像这样被划分为读/写或者消费者/生产者两种类别时,类似读/写锁这样的机制就很有帮助了。为此,Linux内核提供了专门的读一写自旋锁。这种自旋锁为读和写分别提供了不同的锁。一个或多个读任务可以并发地持有读者锁;相反,用于写的锁最多只能被一个写任务持有,而且此时不能有并发的读操作。有时把读/写锁叫做共享/排斥锁,或者并发/排斥锁,因为这种锁以共亨(对读者而言)和排斥(对写者而言)的形式获得使用。
下面开始分析原理。
rwlock_init
1 | include/linux/rwlock.h |
1 | include/linux/rwlock_types.h |
1 |
初始化结果为 将 ‘0x01000000’ 赋值给 raw_lock 。
read_lock
1 | include/linux/rwlock.h |
1 | include/linux/rwlock.h |
“ subl $1,(%0)\n\t”
目的操作数值减1。在 AT&T 汇编格式中,用 ‘$’ 前缀表示一个立即操作数;而在 Intel 汇编格式中,立即数的表示不用带任何前缀。
“jns 1f\n”
指令 JNS 表示 :如果符号位 (SF)不为1,就跳转。
“call __read_lock_failed\n\t”
调用符号 ‘__read_lock_failed’,此符号定义在文件”arch/x86/lib/semaphore_32.S”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15ENTRY(__read_lock_failed)
CFI_STARTPROC
FRAME
2: LOCK_PREFIX
incl (%eax)
1: rep; nop
cmpl $1,(%eax)
js 1b
LOCK_PREFIX
decl (%eax)
js 2b
ENDFRAME
ret
CFI_ENDPROC
ENDPROC(__read_lock_failed)1
2
3
4
5
6
7arch/x86/include/asm/dwarf2.h
.cfi_startproc用于每个函数的开头,这些函数应该在.eh_frame中有一个入口。 它初始化一些内部数据结构。 用.cfi_endproc关闭函数。
除非.cfi_startproc与参数"simple"一起使用,否则它还会发出一些与体系结构有关的初始CFI指令。1
2
3
4
5
6
7
8
9
10伪代码如下:
2:
incl (%eax); // eax 代表 lock ,因为之前减1没有加锁成功,所以先恢复原值。
1:
if(lock - 1 < 0) // write lock 直接减去 0x01000000, 为0,也就是说 write locked则一直循环。
goto 1;
decl lock;
if(lock < 0) // 如果小于0,则说明在decl 之前,又被write 把锁抢占了,那么从头开始
goto 2;
return
读锁是减1,值不为负则加锁成功,因此最多可同时有’0x01000000’个读锁,完全足够。但是按照底层代码分析,即使加了读锁,写数据也是有可能的,这就需要内核开发人员必须能够分清需要读还是写。
write_lock
1 | include/linux/rwlock.h |
1 | include/linux/rwlock.h |
1 | 伪代码如下: |
1 | ENTRY(__write_lock_failed) |
write_lock 伪代码和 read_lock 类似,可试着自己分析一下。
read_lock_bh
直接上最后的函数
1 | include/linux/rwlock_api_smp.h |
函数 ‘local_bh_disable()’ 和 宏 ‘do_raw_read_lock’ 为核心,上面已经分析过。
write_lock_bh
1 | include/linux/rwlock_api_smp.h |
read_lock_irq
1 | static inline void __raw_read_lock_irq(rwlock_t *lock) |
write_lock_irq
1 | static inline void __raw_write_lock_irq(rwlock_t *lock) |
信号量
Linux中的信号量是一种睡眠锁。如果有一个任务试图获得一个不可用(已经被占用)的信号量时,信号且会将其推进一个等待队列,然后让其睡眠。这时处理器能重获自由,从而去执行其他代码。当持有的信号量可用(被释放)后,处于等待队列中的那个任务将被唤醒,并获得该信号量。
- 由于争用信号量的进程在等待锁重新变为可用时会睡眠,所以信号量适用于锁会被长时间持有的情况。
- 相反,锁被短时间持有时,使用信号量就不太适宜了。因为睡眠、维护等待队列以及唤醒所花费的开销可能比锁被占用的全部时间还要一长。
- 由于执行线程在锁被争用时会睡眠,所以只能在进程上下文中才能获取信号量锁,因为在中断上下文中是不能进行调度的。
- 可以在持有信号量时去睡眠(当然你也可能并不需要睡眠),因为当其他进程试图获得同一信号量时不会因此而死锁(因为该进程也只是去睡眠而已,而你最终会继续执行的)。
- 在占用信号量的同时不能占用自旋锁。因为在你等待信号量时可能会睡眠,而在持有自旋锁时是不允许睡眠的。
以上这些结论阐明了信号量和自旋锁在使用上的差异。
信号量可以同时允许任意数量的锁持有者,而自旋锁在一个时刻最多允许一个任务持有它。信号量同时允许的持有者数量可以在声明信号量时指定。这个值称为使用者数量(usage count)或简单地叫数量(count)。通常情况下,信号量和自旋锁一样,在一个时刻仅允许有一个锁持有者。这时计数等于1,这样的信号量被称为二值信号量或互斥信号量(因为它强制进行互斥)。另一方面,初始化时也可以把数量设置为大于1的非0值。这种情况,信号量被称为计数信号童(counting semaphone),它允许在一个时刻至多有count个锁持有者。计数信号量不能用来进行强制互斥,因为它允许多个执行线程同时访问临界区。相反,这种信号量用来对特定代码加以限制,内核中使用它的机会不多。在使用信号量时,基本上用到的都是互斥信号量(计数等于1的信号量)。
信号量支持两个原子操作P()和V(),这两个名字来自荷兰语Proberen和Vershogen。前者叫做测试操作(字面意思是探查),后者叫做增加操作。后来的系统把两种操作分别叫做down()和up()。
down()操作通过对信号量计数减1来请求获得一个信号量。如果结果是0或大于0,获得信号量锁,任务就可以进入临界区。如果结果是负数,任务会被放入等待队列,处理器执行其他任务。相反,当临界区中的操作完成后,up()操作用来释放信号量。如果在该信号量上的等待队列不为空,那么处于队列中等待的任务在被唤醒的同时会获得该信号量。
下面开始看代码
sema_init
1 | include/linux/semaphore.h |
初始化仅仅是将结构体内的3个字段进行初始。
down
1 | kernel/semaphore.c |
down_interruptible、down_killable、down_trylock和down_timeout,这几个函数不在分析,内部函数都分析过,只是状态或timeout重设而已。
up
1 | void up(struct semaphore *sem) |
读写信号量
与自旋锁类似,信号量也可优化为读写信号量,直接开始分析代码:
init_rwsem
1 | /*此处定义了 两种结构体,根据是否开启RWSEM选项而用不同结构体及实现*/ |
rwsem.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
/*
* Make sure we are not reinitializing a held semaphore:
*/
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
sem->count = RWSEM_UNLOCKED_VALUE;
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
}和普通信号量相同的初始化。
rwsem-spinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14void __init_rwsem(struct rw_semaphore *sem, const char *name,
struct lock_class_key *key)
{
/*
* Make sure we are not reinitializing a held semaphore:
*/
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
sem->activity = 0;
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
}
down_read
x86架构 普通实现
1
2
3
4
5
6
7
8
9
10
11
12
13static inline void __down_read(struct rw_semaphore *sem)
{
asm volatile("# beginning down_read\n\t"
LOCK_PREFIX _ASM_INC "(%1)\n\t"
/* adds 0x00000001 */
" jns 1f\n"
" call call_rwsem_down_read_failed\n"
"1:\n\t"
"# ending down_read\n\t"
: "+m" (sem->count)
: "a" (sem)
: "memory", "cc");
}自加 1,结果为正则退出,为负则 跳转到函数 ‘call_rwsem_down_read_failed’,请参考之前的查找方式自己查找其实现。
专用锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45void __sched __down_read(struct rw_semaphore *sem)
{
struct rwsem_waiter waiter;
struct task_struct *tsk;
unsigned long flags;
raw_spin_lock_irqsave(&sem->wait_lock, flags);
/*初始化状态 或 仅有读者则 自加1 退出*/
if (sem->activity >= 0 && list_empty(&sem->wait_list)) {
/* granted */
sem->activity++;
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
goto out;
}
/*此处说明,有写者已经加锁*/
/*将任务设置为 TASK_UNINTERRUPTIBLE 状态*/
tsk = current;
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
/*初始化 waiter 并将其加入 读写信号量 链表*/
waiter.task = tsk;
waiter.type = RWSEM_WAITING_FOR_READ;
/*增加当前进程使用计数 usage*/
get_task_struct(tsk);
list_add_tail(&waiter.list, &sem->wait_list);
/*解锁之后继续等待*/
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
/*循环等锁,进程切换再次判断。
waiter列表无任务则退出。
*/
for (;;) {
if (!waiter.task)
break;
schedule();
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
}
tsk->state = TASK_RUNNING;
out:
;
}读者无锁应该等写者解锁:up_write。之后分析专用文件。
up_write
1 | enum rwsem_waiter_type { |
1 | void __up_write(struct rw_semaphore *sem) |
down_write
1 | void __sched __down_write(struct rw_semaphore *sem) |
up_read
1 | void __up_read(struct rw_semaphore *sem) |
互斥体
多数用户使用信号量只使用计数1,把它作为一个互斥的排它锁。信号量用户通用且没多少使用限制,这使得信号量适合用于那些较复杂的、未明情况下的互斥访问,比如内核于用户空间复杂的交互行为。
但这也意味着简单的锁定而使用信号量不方便,并且信号量也缺乏强制的规则来行使任何形式的自动调试,即便受限的调试也不可能。为了找到一个更简单的睡眠锁,内核开发者们引入了互斥体(mutex)。
mutex在内核中对应数据结构体mutex,其行为和使用计数为1的信号量类似,但操作接口更简单,实现也更高效,而且使用限制更强。
1 | struct mutex { |
mutex_init
1 |
|
mutex_lock
1 | void __sched mutex_lock(struct mutex *lock) |
1 | x86架构 |
1 | static __used noinline void __sched |
1 | static inline int __sched |
1 | static noinline |
mutex_unlock
TODO
完成变量
init_completion
1 | struct __wait_queue_head { |
1 | static inline void init_completion(struct completion *x) |
wait_for_completion
1 | void __sched wait_for_completion(struct completion *x) |
可自己分析函数变体 wait_for_completion_*。
complete
1 | void complete(struct completion *x) |
可以自己分析 函数变体 complete_all。
大内核锁(BKL)
对整个内核加锁,现在已不在使用。
顺序锁
顺序锁,简称seq锁,是在2.6版本内核中引入的一种新型锁。这种锁提供了一种很简单的机制,用于读写共享数据。实现这种锁主要依靠一个序列计数器。当有疑义的数据被写入时,会得到一个锁,并且序列值会增加。在读取数据之前和之后,序列号都被读取。如果读取的序列号值相同,说明在读操作进行的过程中没有被写操作打断过。此外,如果读取的值是偶数,那么久表明写操作没有发生(锁的初始值是0,写锁会使值成奇数,释放会使值变成偶数)。
初始化 DEFINE_SEQLOCK
seq锁的基本结构为
1 | /*一个计数器;一个自旋锁*/ |
1 |
|
初始化即为 将计数值为0并初始化自旋锁。也可以利用宏 seqlock_init进行初始化。
write_seqlock
1 | static inline void write_seqlock(seqlock_t *sl) |
write_sequnlock
1 | static inline void write_sequnlock(seqlock_t *sl) |
写的顺序锁基本无难度,下面举例看一下read的用法。
1 |
|
read_seqbegin
1 | static inline unsigned read_seqbegin(const seqlock_t *sl) |
read_seqretry
1 | static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start) |
seq锁有助于提供一种非常轻量级和具有扩展性的外观。但是seq锁对写者更有利。seq锁在遇到如下需求时将是最理想的选择:
- 数据存在很多读者。
- 数据写者很少。
- 写者很少,但是希望写优先于读,而且不允许读者让写着饥饿。
- 数据简单,如简单结构,甚至是简单的整型——在某些场合,是不能使用原子量的。(啥场合暂时未遇到)
jiffies中利用seq锁(函数 get_jiffies_64)。
禁止抢占
由于内核是抢占性的,内核中的进程在任何时刻都可能停下来以便另一个具有更高优先权的进程运行。这意味着一个任务与被枪占的任务可能会在同一个临界区内运行。为了避免这种情况,内核抢占代码使用自旋锁作为非抢占区域的标记。如果一个自旋锁被持有,内核便不能进行抢占。因为内核抢占和SMP面对相同的并发问题,并且内核已经是SMP安全的(SMP-safe),所以,这种简单的变化使得内核也是抢占安全的(preempt-safe)。
实际中,某些情况并不需要自旋锁,但是仍然需要关闭内核抢占。最频繁出现的情况就是每个处理器上的数据。如果数据对每个处理器是唯一的,那么,这样的数据可能就不需要使用锁来保护,因为数据只能被一个处理器访问。如果自旋锁没有被持有,内核又是抢占式的,那么一个新调度的任务就可能访问同一个变量。
为了解决这个问题,可以通过preempt_disable()禁止内核抢占。这是一个可以嵌套调用的函数,可以调用任意次。每次调用都必须有一个相应的preempt_enable()调用。当最后一次preempt_enable()被调用后,内核抢占才重新启用。
抢占计数存放着被持有锁的数量和preempt_disable()的调用次数,如果计数是0,那么内核可以进行枪占;如果为1或更大的值,那么,内核就不会进行抢占。
preempt_disable() 和 preempt_enable()实现比较简单,此处不在分析。
顺序和屏障
当处理多处理器之间或硬件设备之间的同步问题时,有时需要在你的程序代码中以指定的顺序发出读内存和写内存指令。在和硬件交互时,时常需要确保一个给定的读操作发生在其他读或写操作之前。另外,在多处理器上,可能需要按写数据的顺序读数据。但是编译器和处理器为了提高效率,可能对读和写程序排序(x86处理器不会这样做)。
不过,所有可能重新排序和写的处理器提供了机器指令来确保顺序要求。同样也可以指示编译器不要对给定点周围的指令序列进行重新排序。这些确保顺序的指令称为屏障(barriers)。
rmb()方法提供了一个“读”内存屏障,它确保跨越rmb()的载入动作不会发生重排序。
wmb()方法提供了一个“写”内存屏障,功能和rmb()类似,区别仅仅是它是针对存储而非载入——它确保跨越屏障的存储不发生重排序。
mb()方法既提供了读屏障也提供了写屏障。载入和存储动作都不会跨越屏障重排序。
read_barrier_depends()是rmb()的变种,它提供了一个读屏障,但是仅仅是针对后续读操作所依靠的那些载入。因为屏障后的读操作依赖于屏障前的读操作,因此该屏障确保屏障前的读操作在屏障后的读操作之前完成。
宏smp_rmb()、smp_wmb()、smp_mb()和smp_read_barrier_depends()提供了一个有用的优化。在SMP内核中它们被定义成常用的内存屏障,而在单处理机内核中,它们被定义成编译器的屏障。
barrier()方法可以防止编译器跨屏障对载入或存储操作进行优化。编译器不会重新组织存储或载入操作,而防止改变C代码的效果和现有数据的依赖关系。但是,它不知道在当前上下文之外会发生什么事。
注意,对于不同的体系结构,屏障的实际效果差别很大。
参考资料
What .long 0xXXXXXXXX stands for in asm?