跳转至

Lock & Synchronization

导言

  • 在PTA开发时,其中的多线程加速经常要用锁来隔离资源和逻辑。

互斥与同步

在进程/线程并发执行的过程中,进程/线程之间存在协作的关系,例如有互斥、同步的关系。

为了实现进程/线程间正确的协作,操作系统必须提供实现进程协作的措施和方法,主要的方法:

  • 基于原子操作指令(汇编) —— 测试和置位(Test-and-Set)指令, 或者(如Compare-And-Swap,CAS)
  • 锁:加锁、解锁操作;
    • 自旋锁(spin lock, 忙等待锁),
    • 无等待锁:思想,把当前线程放入到锁的等待队列,然后执行调度程序
  • 信号量:P、V 操作;

这两个都可以方便地实现进程/线程互斥,而信号量比锁的功能更强一些,它还可以方便地实现进程/线程同步。

常见问题

共享内存加锁之后释放锁,别的进程是如何知道锁释放的

  1. 常用的方法是在共享内存中设置标志位或信号量等,并在共享内存中保证这个标志位或信号量只有在锁被释放时才会被更新。这样,其它进程可以通过轮询或者等待通知的方式来获取锁并开始修改共享内存,从而避免冲突。在共享内存中设置的标志位或信号量通常需要原子操作的支持,以确保并发修改时的正确性。
    1. 轮询:轮询是指线程反复检查某个条件是否成立,直到条件成立为止。在锁机制中,当一个线程持有锁时,其他线程会不断地轮询锁的状态,直到该锁被释放。这种方式的优点是实现简单,不需要额外的通知机制,缺点是占用CPU资源,效率较低。
    2. 等待通知:等待通知是指线程在某个条件不满足时挂起等待,当条件满足时由其他线程通知它继续执行。在锁机制中,当一个线程持有锁时,其他线程会进入等待状态,直到该锁被释放,此时其他线程会被通知并继续执行。这种方式的优点是占用CPU资源少,效率高,缺点是实现稍微复杂一些,需要额外的通知机制。
  2. 另外,也可以使用一个中央锁服务器或者等待队列来管理锁,当一个进程获取锁时,会向中央锁服务器或等待队列发出请求,直到锁被成功获取,并在共享内存中记录锁的状态。当锁被释放时,中央锁服务器或等待队列会通知其它进程,并让其它进程开始自由修改共享内存。

如何保证操作的原子性

  1. 操作系统提供的原子操作:一些操作系统会提供线程安全的原子操作接口,如Compare-And-Swap(CAS)等,它们能够确保指令的原子性,从而保证线程安全。
  2. 事务:事务是指一组操作被视为一个不可分割的操作序列,要么全部执行成功,要么全部失败,具有原子性和一致性保证。常用于数据库操作等场景。
  3. 锁机制:锁机制是一种常用的多线程同步机制,能够确保同一时刻只有一个线程(或进程)可以访问共享资源,从而保证原子性。

如何避免死锁

  1. 避免使用多把锁并且同时持有多个锁。当需要持有多个锁时,可以通过加锁的顺序来避免死锁。如果所有可能的锁按照固定的顺序加锁,那么可以避免死锁。
  2. 设置请求超时时间。当一个进程请求锁时,如果在超时时间内没有获得锁,可以释放之前持有的锁,并尝试重新获取。这样可以避免某一个进程一直持有锁而导致死锁。
  3. 引入随机性。在获取锁的时候加入一些随机因素,让不同的程序在不同的时间获取锁。这样可以防止程序之间在自己的重试过程中的饥饿状态导致的死锁。

悲观锁

  1. 读写操作时,需要预先加锁,防止其他进程对资源的访问。
  2. 通过互斥锁(Mutex)和信号量(Semaphore)来实现。

乐观锁

  1. 在读取或修改共享资源时,并不先进行加锁操作,而是先读取资源,然后在对资源进行写操作时再进行一次比较,看看在这个时间间隔内是否发生了竞争。如果没有发生竞争,就可以将更新后的值写入共享资源,并结束操作;如果发生了竞争,则需要放弃本次更新,并进行重试
  2. 通过版本号的方式来实现。在共享资源中记录该资源的版本号,当一个进程想要修改共享资源时,需要先获取当前资源的版本号。如果当前版本号与自己保存的版本号相符,说明没有其他进程在这段时间内修改该资源,则可以进行写操作;如果版本号已经发生变化,则说明有其他进程对该资源进行了修改,当前进程需要放弃本次写操作,更新版本号,重新获取新的资源,并重新执行操作。

原子操作

Test-and-SetCompare-and-Swap (CAS) 都是并发编程中的重要原子操作指令,通常用于同步和处理多线程或多进程环境中的竞争条件。它们的作用是确保某些操作在执行时不会被中断,以保持数据一致性和避免并发冲突。我们来详细了解这两个命令。

1. Test-and-Set (TAS) 指令

  • 通常用于控制锁定状态,通过设置标志位来获取锁,它返回原始值(通常是锁状态),用于判断锁是否已经被占用。
  • 它通常用于实现自旋锁、互斥锁(mutex)或锁标志位。

它的操作过程如下:

  • Test: 检查指定位置(通常是一个布尔标志位或整数)当前的值。
  • Set: 将这个位置的值设置为 1 或 "true"。

操作是原子的,即不会被中断或干扰,保证在并发环境下,多个线程或进程不能同时修改同一位置。

用途:

  • 实现自旋锁(Spinlock):线程反复检查一个共享变量,如果该变量表示未被锁定,线程会尝试获得锁。
  • 防止竞争条件:确保在并发操作时,只有一个线程能够设置锁标志,其他线程需要等待。
示例

假设有一个共享的锁标志 lock,初始化为 false

lock = false;

while (Test_and_Set(lock)) {
    // 如果 lock 已经被设置为 true,则继续自旋等待
}
// lock 成功设置为 true,获取锁

2. Compare-and-Swap (CAS) 指令

  • 通常用于无锁编程,它通过比较期望值与当前值来决定是否进行更新(检查指定的内存位置的值是否等于一个预期的值,如果相等,则将该位置的值替换为新的值),适用于复杂的数据结构操作。
  • 常用于无锁数据结构和其他更高效的并发操作。

它的操作步骤如下:

  • Compare: 比较目标内存位置的当前值是否与预期值相等。
  • Swap: 如果当前值等于预期值,就将目标位置的值替换为新的值。如果不相等,则什么也不做。

CAS 也具有原子性,这意味着它在操作期间不会被中断。CAS 是实现无锁编程和避免线程同步问题的常用工具,尤其在需要高性能的并发程序中。

用途:

  • 实现无锁数据结构(如无锁队列、栈、哈希表等)。
  • 实现线程安全的计数器或标志位操作。
  • 用于一些算法(如版本控制或并发计数器)中,确保多个线程不冲突地更新共享数据。
示例:

假设有一个变量 x,初始值为 10:

int expected = 10;
int new_value = 20;

if (Compare_and_Swap(&x, expected, new_value)) {
    // 如果 x 的值为 expected (10),就将 x 更新为 new_value (20)
} else {
    // 如果 x 的值不为 expected,CAS 操作失败
}

3. TAS的汇编实现

  • Test-and-Set (TAS) 操作通常可以通过 XCHG 指令(交换)在 x86 汇编中实现。虽然 x86 没有直接的 Test-and-Set 指令,XCHG 被用作实现类似功能的原子操作。
例如,在 x86 中:
; x86 示例,使用 XCHG 实现 Test-and-Set
; 假设锁变量位于 memory_location
; eax 是存储旧值的寄存器
mov eax, 0          ; eax 设为 0,表示“未锁定”
xchg eax, [lock]    ; 将 eax 与锁变量交换,如果锁变量是 0,就设置为 1
                    ; 如果返回的 eax 仍为 0,说明锁成功设置

这个操作会交换 eax[lock] 位置的值,并将原值存储在 eax 中。如果 [lock] 最初是 0eax 将变为 1,并且 lock 会被设置为 1,表示锁已经被占用。

虽然 Test-and-Set 并不一定直接映射到汇编中的特定指令,但它可以通过交换指令来模拟。

4. CAS的汇编实现

  • Compare-and-Swap (CAS) 指令在 x86 中有专门的 CMPXCHG 指令,ARM 等其他架构也有类似的原子操作指令(如 LDREXSTREX)。
例如,在 x86 中:

x86 提供了 CMPXCHG(比较和交换)指令,它就是 CAS 操作的硬件实现。其语法如下:

CMPXCHG destination, source
  • destination:是要比较的内存位置。
  • source:是用于替换的值。
  • 如果 destination 中的值与 source 中的值相等,destination 将被更新为 source 的值,且 AX 寄存器的值将保存 destination 原先的值。
  • 如果不相等,destination 的值保持不变,并且 AX 寄存器将保存 destination 的原值。
; x86 示例,使用 CMPXCHG 实现 Compare-and-Swap
; 假设目标内存位置是 lock,期望值是 eax,新的值是 ebx

mov eax, expected_value    ; 将期望的值加载到 eax
mov ebx, new_value        ; 将新的值加载到 ebx
cmpxchg [lock], ebx        ; 比较 [lock] 的值与 eax(期望值),如果相等则将 [lock] 更新为 ebx(新值)
                        ; 如果不相等,eax 会保存 [lock] 的原值

这个 CMPXCHG 指令会首先将内存中的值与 eax 比较,如果两者相等,它会将 ebx 的值存储到内存中,并且 eax 会保存原来的值;如果两者不相等,内存中的值不会改变,而 eax 会保存当前内存值,指示操作失败。

在 ARM 架构中:

ARM 也提供了类似的原子操作指令,称为 LDREXSTREXLDREX 加载一个值并锁定它,STREX 则用于条件性地写回值。

; ARM 示例,使用 LDREX 和 STREX 实现 CAS
LDREX R0, [lock]    ; 加载锁的当前值到 R0
CMP R0, R1           ; 比较 R0(当前锁的值)与 R1(期望值)
BNE CAS_FAIL         ; 如果不相等,跳转到失败处理
STREX R2, R3, [lock] ; 如果相等,将 R3(新值)写入 [lock],并将成功标志存储在 R2

atomic 封装

std::atomic 是对 Test-and-SetCompare-and-Swap 等底层原子操作的高级封装,使得 C++ 开发者可以在多线程环境中更方便、更安全地使用这些原子操作。

原子操作(std::atomic)有如下特点:

  • 线程安全:原子操作保证了即使在多线程环境下也能正确地修改和读取共享变量。
  • 高效:没有加锁开销,适用于需要频繁检查和修改标志的场景。
  • 原子操作的限制std::atomic 只能用于一些简单的类型(如整数、指针和布尔类型),对于复杂的数据结构,仍然需要其他同步机制(如锁)。

示例:使用 std::atomic<bool> 控制初始化

在控制初始化只执行一次的场景中,你可以使用 std::atomic<bool> 来替代普通的 bool 类型,这样就能确保线程安全地检查和设置初始化标志。

#include <atomic>

std::atomic<bool> isInited(false);

void initFunction() {
    // 原子操作,检查并更新 isInited
    if (!isInited.exchange(true)) { // mutex  std::call_once 和std::once_flag 也可以实现类似功能。
        // 执行初始化逻辑
        // ...
    }
}
  1. std::atomic<bool> isInited(false);:声明一个原子类型的 bool 变量,初始值为 false
  2. isInited.exchange(true):调用 exchange 方法,该方法会将 isInited 的当前值返回,并将其更新为 true。如果当前值为 false,说明还没有初始化,此时会执行初始化逻辑。
  3. 如果 isInited 已经是 true,则 exchange 会返回 true,跳过初始化部分。

避免直接读取 std::atomic 对象, 使用load store

std::atomic<int> counter(0);
counter = 5;  // 这种赋值操作不是原子操作,可能导致线程不安全。
int value = counter;  // 直接读取可能没有同步, 导致缓存不一致问题,多个线程读取的值不一定是最新的。

原子操作是不会失败的

  • load、store 和 exchange 都不会失败,它们保证原子操作的顺序性和正确性。
  • compare_exchange_strong 和 compare_exchange_weak 是可能会“失败”的操作,它们只有在预期值和原子变量的值一致时才会成功执行,否则会返回 false,并更新预期值。

内存顺序:std::memory_order

对于一个线程里的同一个代码块内的无依赖关系的指令,处理器在运行时会重排序乱序执行。这对原子指令也是一样的。但实际生产中,多个原子操作之间,可能会有依赖关系,比如一个线程读取另一个线程写入的数据。为了确保这些依赖关系的正确性,需要使用内存顺序来控制处理器对指令的重排序。

std::atomic 的操作支持指定内存顺序,这控制了编译器和硬件对操作的优化和重排序。常见的内存顺序选项有:

  • std::memory_order_relaxed:没有限制,运行时会乱序。
  • std::memory_order_acquirestd::memory_order_release:分别用于“获取”和“释放”同步,确保操作顺序。
    • std::memory_order_acquire:保证在此操作之前的所有操作不会被重排序到它之后。
    • std::memory_order_release:保证在此操作之后的所有操作不会被重排序到它之前。
  • std::memory_order_seq_cst默认的内存顺序,保证在多线程程序中,所有原子操作按顺序执行,不允许重排序。但性能可能会略受影响。
memory_order_acquire

这两者提供了获取和释放同步的保证:

  • std::memory_order_acquire:保证在此操作之前的所有操作不会被重排序到它之后。
  • std::memory_order_release:保证在此操作之后的所有操作不会被重排序到它之前。
#include <atomic>
#include <iostream>
#include <thread>

std::atomic<int> counter(0);
std::atomic<bool> ready(false);

void producer() {
    counter.store(42, std::memory_order_release);  // 确保 counter 写入在 ready 之前
    ready.store(true, std::memory_order_release);  // 标记准备好
}

void consumer() {
    while (!ready.load(std::memory_order_acquire)) {}  // 等待 ready 为 true
    std::cout << "Counter: " << counter.load(std::memory_order_acquire) << std::endl;
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    return 0;
}

解释

  • consumer 线程在 ready.load(std::memory_order_acquire) 后才能读取 counter,确保 ready 被设置为 true 后,counter 的写入才是可见的。
  • memory_order_release 确保 producer 线程的 counter 写操作不会被重排序到 ready.store 之前。
  • memory_order_acquire 确保 consumer 在读取 ready 后,才能正确地读取到 counter 的值。

1. 直接赋值:store

在原子操作中,store 用来将一个值存储到原子变量中。与普通赋值不同,store 会确保在多线程环境下,赋值操作是原子性的(即不会被打断)。

std::atomic<int> counter(0);
counter.store(5, std::memory_order_relaxed);  // 存储 5
- store 会保证在指定的内存顺序下将值存储到原子变量中,因此它是线程安全的。

2. 加载:load

load 用于从原子变量中读取值。它会确保在多线程环境中,读取操作是线程安全的,并且可以指定内存顺序。

int value = counter.load(std::memory_order_acquire);  // 从原子变量读取
  • load 会确保读取的是正确同步后的值,避免在多线程场景下出现读取错误。

3. 比较并交换:compare_exchange_strong

  • compare_exchange_strong(以及 compare_exchange_weak)广泛应用于实现锁和无锁算法。它会尝试将原子变量的值与一个预期值进行比较,如果相同,则将其更新为新值。如果比较失败,原子变量的值不会更改,并且返回 false
compare_exchange_strong 与compare_exchange_weak 的区别
  • compare_exchange_strong 是标准的、严格的 Compare-and-Swap 操作,失败时立即返回,并不会自动重试。
  • compare_exchange_weak 是一种可能会在某些实现中自动重试的版本,适用于高并发场景,特别是在竞争激烈时可以减少操作的开销。
  • 虽然它们有细微的区别,但两者的核心作用是相同的:原子地比较并更新共享变量。在选择使用哪种方法时,主要取决于你对性能的要求以及是否需要在失败时进行重试。
std::atomic<int> value(0);
int expected = 0;
int desired = 42;

if (value.compare_exchange_strong(expected, desired)) {
    std::cout << "CAS succeeded! Value updated to " << value.load() << "\n";
} else {
    std::cout << "CAS failed! Expected: " << expected << ", Actual: " << value.load() << "\n";
}

// 尝试比较并交换
while (!value.compare_exchange_weak(expected, desired)) {
    std::cout << "CAS failed, retrying...\n";
    // 可以执行一些其他的操作或稍微延迟再重试
}
std::cout << "CAS succeeded! Value updated to " << value.load() << "\n";

4. exchange

exchange 是一种常见的原子操作,类似于 store,但它会返回原子变量的先前值。

int oldValue = counter.exchange(5);  // 返回更新前的值,并将 counter 设置为 5
  • exchange 在多线程环境下是安全的,并且可以返回修改前的值,适用于某些需要获取旧值的场景。

常见锁的优缺点和实现

自旋锁(spinlock)

多线程同步的一种忙等待锁,线程反复检查锁变量是否可用。

  • 优点:避免了操作系统进程调度和线程切换,所以自旋锁通常适用在时间比较短的情况下。由于这个原因,操作系统的内核经常使用自旋锁。
  • 缺点:如果长时间上锁的话,自旋锁会非常耗费性能,它阻止了其他线程的运行和调度
    • 线程持有锁的时间越长,则持有该锁的线程将被 OS(Operating System) 调度程序中断的风险越大。
  • 解决办法:
    • TicketLock 是采用排队叫号的机制。
    • CLHLock和MCSLock通过链表的方式避免了减少了处理器缓存同步,极大的提高了性能,区别在于CLHLock是通过轮询其前驱节点的状态,而MCS则是查看当前节点的锁状态。
std::atomic_flag lock = ATOMIC_FLAG_INIT;

void spinlock() {
    while (lock.test_and_set(std::memory_order_acquire)) {
        // 自旋等待,直到锁被释放
    }
    // 临界区代码
    lock.clear(std::memory_order_release);  // 解锁
}

互斥锁(mutex)

把自己阻塞起来(内核态和用户态之间的切换进入阻塞状态,可能上下文切换),等待重新调度请求。

  • 互斥锁的实现
    1. 软件实现:软件互斥锁需要借助操作系统提供的原子操作(如Compare-And-Swap,CAS)来实现 优点是灵活性高 缺点是性能较低,
    2. CAS操作需要三个参数,内存地址A,期望值V,新值N。执行过程如下:
      • 读取内存地址A的原始值,保存在临时变量Value中
      • 比较Value和期待值V是否相等,如果相等则将内存地址A的值更新为新值N
      • 如果内存地址A的值已经被其他线程改变,则不进行更新操作
  • TAS(test and set)
    • 一个TAS指令包括两个子步骤,把给定的内存地址设置为1,然后返回之前的旧值。
    • 硬件实现:硬件互斥锁使用计算机硬件提供的特殊指令(如锁总线指令)来实现。当线程要获取锁时,它会发出一个锁总线指令,这个指令会占用系统总线,使得其他CPU无法读写内存。
      1. 当lock前缀指令执行时,它将锁定处理器总线,确保其他处理器无法同时访问同一内存区域,
代码实例:lock_guard

std::lock_guard<std::mutex> 是 C++ 标准库中的一个 RAII(Resource Acquisition Is Initialization)类,它用于在作用域内自动锁住一个 std::mutex,并在作用域结束时自动解锁。它可以帮助你避免手动管理锁,减少死锁和忘记解锁的风险。

用法

std::lock_guard 通过构造时自动锁住互斥量,并且在作用域结束时自动释放互斥量的锁。因此,你不需要手动调用 mutex.unlock()。只需要确保 lock_guard 的生命周期结束时,互斥量会被自动解锁。

示例代码

#include <iostream>
#include <mutex>
#include <thread>

std::mutex init_mutex_;  // 互斥量,防止多个线程同时初始化

void some_function() {
    // 创建一个 lock_guard 对象,自动锁住 init_mutex_
    std::lock_guard<std::mutex> lock(init_mutex_);

    // 在此作用域内,init_mutex_ 被锁定
    std::cout << "Doing some work inside the critical section...\n";

    // lock_guard 在作用域结束时会自动解锁 mutex
}

int main() {
    std::thread t1(some_function);
    std::thread t2(some_function);

    t1.join();
    t2.join();

    return 0;
}

解释

  1. std::lock_guard<std::mutex> lock(init_mutex_);
  2. 在这行代码中,lock_guard 对象被创建,并立即锁住 init_mutex_
  3. std::lock_guard 的构造函数会调用 mutex.lock() 来锁定互斥量。
  4. lock 变量在作用域结束时自动销毁,这会触发析构函数 lock_guard::~lock_guard(),并自动解锁互斥量(即调用 mutex.unlock())。

  5. some_function 中,互斥量在整个函数内都是被锁定的,因此其他线程无法进入这段代码,避免了并发冲突。

为什么使用 std::lock_guard

  • 避免死锁std::lock_guard 会自动解锁互斥量,减少了忘记解锁的风险。
  • 简洁易懂:它为你管理锁,减少了显式的锁管理代码,使代码更简洁。
  • 线程安全:当多个线程访问同一资源时,std::lock_guard 可以确保每次只有一个线程能够访问临界区,防止数据竞争。

适用场景

通常用于在函数、代码块中保护共享资源。它适合需要访问互斥量的代码块,但不想手动管理锁和解锁时。

代码实例:cv.wait 实现多线程的先后顺序
  1. 使用 std::thread 和 std::mutex + std::condition_variable

如果你希望在销毁环境的 Final 函数中等待某些子线程完成,可以使用 std::mutex 和 std::condition_variable 来同步子线程的完成状态。

示例代码: 假设你有一个主线程(执行 Final 函数)和一个子线程,主线程需要等待子线程完成后才能销毁环境。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>

std::mutex mtx;
std::condition_variable cv;
std::atomic<bool> thread_done{false};

void worker_thread() {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟工作
    std::cout << "Worker thread finished." << std::endl;
    thread_done = true;  // 设置线程完成标志
    cv.notify_one();     // 通知主线程
}

void Final() {
    std::cout << "Final: Waiting for worker thread to finish..." << std::endl;

    // 等待子线程完成
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [] { return thread_done.load(); });
    // cv.wait(lock, [this] { return thread_done.load(); }); // in class

    std::cout << "Final: Worker thread finished. Proceeding with cleanup." << std::endl;
}

int main() {
    // 启动子线程
    std::thread t(worker_thread);

    // 在主线程中调用 Final
    Final();

    t.join();  // 等待子线程结束

    return 0;
}

关键点:

  • std::mutex mtx: 用于锁住条件变量,确保线程间的同步。
  • std::condition_variable cv: 用于实现主线程等待子线程的完成。
  • std::atomic<bool> thread_done: 用于标记子线程是否完成。
  • cv.wait(lock, condition): 主线程会等待子线程设置 thread_done 为 true,然后才会继续执行。

读写锁(ReadWrite Lock)

  • 在读操作和写操作之间提供了更细粒度的同步控制。
  • 多个线程可以同时获取读锁,但只有一个线程能够获取写锁。
    • 读写锁有三种状态:读加锁状态、写加锁状态和不加锁状态
    • 规则
    • 当读写锁在写加锁模式下,任何试图对这个锁进行加锁的线程都会被阻塞,直到写进程对其解锁。
    • 当读写锁在读加锁模式先,任何线程都可以对其进行读加锁操作,但是所有试图进行写加锁操作的线程都会被阻塞,直到所有的读线程都解锁。
  • 缺点:当读者源源不断到来的时候,写者总是得不到读写锁,就会造成不公平的状态。
    • 避免方法: 当处于读模式的读写锁接收到一个试图对其进行写模式加锁操作时,便会阻塞后面对其进行读模式加锁操作的线程。这样等到已经加读模式的锁解锁后,写进程能够访问此锁保护的资源。
  • 优点:
    • 读写锁可以提高并发性,允许多个线程同时读取数据,而只有在需要修改数据时才会互斥。
    • 适合对数据结构读的次数远远大于写的情况。

RCU(Read-Copy-Update)

  • 对读写锁的一种改进。适用于读多写少场景的数据同步机制。
  • 具体内容
    • 并发读取数据不再需要加锁
    • 写数据时,RCU机制通过创建一个副本来实现读写分离,确保在更新过程中没有线程正在读取旧的数据。
      • 写者修改数据前首先拷贝一个被修改元素的副本,然后在副本上进行修改,修改完毕后它向垃圾回收器注册一个回调函数以便在适当的时机执行真正的修改操作。
      • 读者必须提供一个信号给写者以便写者能够确定数据可以被安全地释放或修改的时机。
      • 有一个专门的垃圾收集器来探测读者的信号,一旦所有的读者都已经发送信号告知它们都不在使用被RCU保护的数据结构,垃圾收集器就调用回调函数完成最后的数据释放或修改操作。

通知与同步

适用于大段的修改和同步。

eventfd 事件通知

eventfd 是 Linux 提供的一种用于线程安全的事件通知机制,类似于文件描述符,可以通过读写操作来实现跨线程或跨进程的同步。通过 eventfd,线程或进程可以通过某些事件(例如计数器增减、通知等)来触发其他线程或进程的动作。

eventfd 可以通过两种方式工作:

  • 计数器模式:通过 eventfd_write 增加计数器,其他线程或进程可以通过 eventfd_read 来读取这些计数器的值。
  • 通知模式:使用 eventfd 进行事件通知,线程或进程通过 eventfd_read 来等待并响应这些事件。

eventfd_read 的功能 : eventfd_read 是一个用于从 eventfd 文件描述符读取事件的系统调用。它会从 eventfd 中读取一个 64 位无符号整数,并返回这个值。这个值通常表示某个事件的状态或计数器的值。

线程间同步

如果两个线程同时对同一个 eventfd 文件描述符进行读写操作:

  • eventfd_read 会阻塞直到 eventfd 中有事件(即计数器的值大于 0),并且在读取之后会减少计数器。
  • eventfd_write 会增加计数器的值,并通知正在等待的线程。

函数原型

#include <sys/eventfd.h>

ssize_t eventfd_read(int efd, uint64_t *value);
ssize_t eventfd_write(int efd, uint64_t value);
  • efdeventfd 文件描述符。
  • * value:指向一个 uint64_t 类型的变量,用于存储从 eventfd 读取的值。
  • value:要写入 eventfd 的值,通常是一个 64 位无符号整数 (uint64_t),表示事件计数器的增加量或特定事件的通知值。

返回值

  • 如果成功,eventfd_read 返回读取到的字节数,通常为 8(因为它读取一个 64 位的值)。
  • 如果失败,返回 -1,并将 errno 设置为合适的错误码。

错误码

  • EAGAINeventfd 中没有事件可读(即没有数据)。
  • EBADF:无效的文件描述符。
  • 其他常见的文件描述符错误。

代码示例

假设你使用 eventfd 来同步不同的线程:

#include <sys/eventfd.h>
#include <unistd.h>
#include <iostream>
#include <cstring>

int main() {
    // 创建 eventfd 文件描述符
    int efd = eventfd(0, EFD_NONBLOCK);
    if (efd == -1) {
        std::cerr << "Failed to create eventfd: " << strerror(errno) << std::endl;
        return -1;
    }

    // 写入事件
    uint64_t u = 1; // 写入一个事件值
    ssize_t s = write(efd, &u, sizeof(u));
    if (s == -1) {
        std::cerr << "Failed to write to eventfd: " << strerror(errno) << std::endl;
        close(efd);
        return -1;
    }

    // 读取事件
    uint64_t read_value;
    s = eventfd_read(efd, &read_value);
    if (s == -1) {
        std::cerr << "Failed to read from eventfd: " << strerror(errno) << std::endl;
    } else {
        std::cout << "Read value: " << read_value << std::endl;
    }

    close(efd);
    return 0;
}

概念题

RedStar (小红书) 笔试:图中有依赖的任务的,需要几个信号量来实现同步

CSDN,有一条依赖线,需要一个信号量

在使用信号量(Semaphore)进行线程同步时,P(proberen)和V(verhogen)操作是非常重要的概念。

  1. P操作(也称为Wait操作或Down操作):

  2. 表示获取或等待信号量。

  3. 如果信号量内部计数值大于0,获取信号量并将计数值减1。
  4. 如果计数值等于0,线程将等待,直到计数值大于0。如果信号量的值大于0,表示资源可用,进程可以继续执行。如果信号量的值为0,表示资源不可用,P操作将阻塞(即等待)进程,直到该信号量的值大于0为止。

伪代码表示为:

P(S):
  while S <= 0:
    // 等待,直到S大于0
  S = S - 1
  1. V操作(也称为Signal操作或Up操作):

  2. 表示释放或增加信号量。

  3. 将信号量内部计数值加1。
  4. 如果存在等待线程,唤醒其中一个线程继续执行。

伪代码表示为:

V(S):
  S = S + 1

P和V操作保证了对共享资源的互斥访问。

一个线程使用P操作等待获取信号量,V操作在使用完共享资源后释放信号量。

信号量的值通常用于控制共享资源的数量,它可以是非负整数。当信号量被初始化为1时,称为二进制信号量(Binary Semaphore),因为它只能取0或1的值,通常用于实现互斥访问临界区。如果信号量的值大于1,称为计数信号量,可用于限制对资源的并发访问数。

在实际编程中,P操作和V操作通常是原子操作,确保在多线程或多进程环境下的正确同步和竞争条件的安全处理。

TP-link笔试:设计的程序在多个CPU上运行时,不应使用哪个实现多个CPU间的数据访问同步?

参考文献

https://www.cswiki.top/pages/f398f1/#blocking-i-o

原文链接:https://blog.csdn.net/qq_15437629/article/details/79116590

https://zhuanlan.zhihu.com/p/161936748