基于 C++ Concurrency In Action Second Edition 编写

C++ 并发世界

并发 concurrency 是指两件或更多分开的活动在同时发生

单核 CPU 通过 context switch 来形成多个任务在同时执行的假象

多核 CPU 原理也类似

并发的方法:

  • 多进程
  • 多线程

并发和并行 parallelism 概念类似,但是

  • 并行通常更考虑效率
  • 并发则关心关注点分离

故使用并发的情况:

  • 关注点分离
  • 任务和数据的并行

C++ 中原来没有关于并发的标准,和 C 语言一样使用操作系统提供的 API。从 C++11 开始,封装了较一般的 thread

从并发版的 hello world 开始:

#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello, World" << std::endl;
}
int main() {
std::thread t(hello);
t.join();
return 0;
}

管理线程

基本线程管理

创建线程:std::thread my_thread(my_func);

等待线程完成:

  • my_thread.join() 会等待线程完成
  • my_thread.datach() 可以不需要等待线程完成,相当于将任务放到了后台

注意如果线程可能会抛出异常,要保证出现异常时也要调用 join()

可以使用 RAII 来保证每个线程被 join

向线程函数传递参数

简单的传递参数:

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

对于非 const 引用传递参数,则会悄悄地复制了一份对象,故应该在传递时指明:

void update_data_for_widget(widget_id w, widget_data& data);
std::thread t(update_data_for_widget, w, std::ref(data));

也可以传递成员函数指针,第二个参数则为调用该成员函数的对象:

class X {
public:
void do_lengthy_work();
} my_X;
std::thread t(&X::do_lengthy_work, &my_x);

对于一个不可复制的对象,可以使用 std::move()

转移一个线程的所有权

std::thread 是不可复制的,只能 move,即转移了其所有权

可以批量创建多个线程:

void do_work(unsigned id);
void f() {
std::vector<std::thread> threads;
for (auto i = 0; i < 20; i++)
threads.emplace_back(do_work, i);
for (auto& entry: threads)
entry.join();
}

识别线程

可以通过 std::thread::id 来识别线程,支持比较,可以用于让主线程做一些额外的工作,如:

std::thread::id master_thread;
void some_core_part_of_algorithm() {
if (std::this_thread::get_id() == master_thread)
do_master_thread_work();
do_common_work();
}

线程之间共享数据

线程之间共享数据的问题

竞争 race:本质上是多线程打破了顺序执行中潜在的不变量 invariant

避免方法:

  • 通过保护机制来包装数据结构
  • 修改数据结构,使得成为一个无锁的

使用 mutex 来保护共享的数据

一个简单的例子:

#include <mutex>
std::mutex some_mutex;
std::list<int> some_list;
void add_to_list(int new_value) {
std::lock_guard guard(some_mutex);
some_list.push_back(new_value);
}

但是注意 mutex 不是万能的,如果有代码持有能够访问被保护的数据的指针或引用,则可以在没有上锁的情况下访问修改数据

因此,不要将对保护数据的指针和引用传递到锁的外部,无论是通过将其从函数中返回、存在外部可见的内存中、还是将其作为用户提供的函数的参数传递

事实上,一些数据结构的接口设计中就是会存在竞争情况的,如:

stack<int> s;
if (!s.empty()) {
int const value = s.top();
s.pop();
do_something(value);
}

我们可以对 pop() 接口作如下的修改:

  • 传递引用
  • 需要不会抛出异常的复制构造函数或移动构造函数
  • 返回对被 pop 的物品的指针

例如接口设计如下:

std::shared_ptr<T> pop();
void pop(T& value);

如果对一个操作有多个锁,则可能出现死锁 deadlock,即两个线程在等待一个不会发生的情况

可以同时给两个 mutex 上锁,如:

void swap(X& lhs, X& rhs) {
if (&lhs == &rhs)
return;
std::scoped_lock guard(lhs.m, rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}

其中 scoped_lock 和普通的 lock_guard 没什么区别,只不过其接受的是可变数量的参数

避免死锁的方法:

  • 避免嵌套锁
  • 避免当保持锁时调用用户提供的代码,因为其可能也会上锁,故可能出现嵌套锁
  • 以固定的顺序获得锁
  • 使用锁层次结构

unique_lock 和普通的 lock_guard 相比开销大一些,但更加灵活,如可以传递 std::defer_lock 作为第二个参数使构造时不上锁,std::adopt_lock 可以让改锁对象管理 mutex 上的锁

正如其名字,不能复制,只能 move

其还可以通过调用 .lock().unlock() 来进行细粒度的上锁

保护共享数据的替代方案

在初始化过程中保护共享数据

可以通过 std::once_flagstd::call_once() 来保证只被初始化一次

std::once_flag resource_flag;
void init_resource();
void foo() {
std::call_once(resource_flag, init_resource);
}

当然,static 也提供了类似的保证

保护很少被更新的数据结构

std::shared_mutexstd::shared_lock 配合,可以同时获取多个锁,而普通的 lock_guard 则会被阻隔,非常便于实现读者-写者模型:

#include <shared_mutex>
std::map<std::string, dns_entry> entries;
mutable std::shared_mutex entry_mutex;
dns_entry find_entry(std::string const& domain) const {
std::shared_lock lk(entry_mutex); // reader
return ...
}
void update(std::string const& domain) {
std::lock_guard lk(entry_mutex); // writer
...
}

如果给一个已经上锁的 mutex 再次上锁,则会导致未定义的行为

同步并发操作

等待一个事件或其他情况

可以通过条件变量 std::condition_variable 来等待:

std::condition_variable data_cond;
void data_preparation_thread() {
while (more_data_to_prepare()) {
data_chunk const data = prepare_data();
// 放入数据
data_cond.notify_one(); // 通知
}
}
void data_processing_thread() {
while (true) {
std::unique_lock lk(mut);
data_cond.wait(lk, []{return !data_queue.empty();}); // 等待
// 处理数据
}
}

其中 .wait() 如果后面的函数返回值为 false,则释放锁 lk,然后将线程挂起。直到收到了 .notify_one() 的通知,才会再次检测返回值。如果为 true,则上锁,并继续执行。

所以事实上 .wait() 本质上是一个优化版的忙碌等待

当然,也有对应的 .notify_all()

用 future 等待一次性的事件

使用 std::future

#include <future>
int find_the_answer();
int main() {
std::future<int> the_answer = std::async(find_the_answer);
// 做其他事
std::cout << the_answer.get() << std::endl;
}

std::asyncthread 用法类似,同时支持第一个参数传递 std::launch::asyncstd::launch::deferred 选项

std::packaged_task<> 能够将 future 绑定到一个函数或可调用对象上:

std::future<void> post_task_for_gui_thread(Func f) {
std::package_task<void()> task(f);
std::future<void> res = task.get_future();
tasks.push_back(std::move(task));
retur res;
}

std::promise/std::future 对提供了一个可能的处理机制:等待的线程会在 future 被 block,提供数据的线程可能使用 promise 来设置相关的值并使 future 准备好

promise 设置值 .set_value() 也可以设置异常 .set_exception()

std::future 只可以 move,std::shared_future 能够复制,多个线程都可以等待 future:

std::promise<int> p;
std::shared_future<int> sf = p.get_future().share();

等待时间限制

clock 是一个提供四种不同信息的类:

  • 时间 now
  • 表示从始终获取的时间的类型的值
  • 时钟滴答的周期
  • 时钟是否以均匀速率滴答

时间间隔:

using namespace std::chorno_literals; // 可以较方便地使用时钟单位
auto one_day = 24h;
auto half_an_hour = 30min;

.wait_for 可以等待某一时间间隔,如:

std::future<int> f = std::async(some_task);
if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)
do_something_with(f.get());

.wait_until 等待某一时间点:

auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
if (cv.wait_until(lk, timeout) == std::cv_status::timeout)
// ...

很多函数都有相应的 wait_forwait_until 版本,如 lockfuture

C++ 内存模型和对原子类型的操作

内存模型基础

由对象构成,在内存中有一块对应的区域

C++ 中原子操作和类型

原子类型有时会通过 mutex 实现,使平台等而定,可以通过 ::is_lock_free 等判断

几乎所有原始类型和定义的类型都有相关的原子版本,如 std::atomic<int>

std::atomic_flag 是最最简单的标准原子类型,类似于一个 Bool flag,只有两种状态:set 和 clear

可以用于实现 mutex

class spinlock_mutex {
std::atomic_flag flag;
public:
spinlock_mutex() : flag(ATOMIC_FLAG_INIT) { }
void lock() {
while (flag.test_and_set(std::memory_order_acquire));
}
void unlock() {
flag.clear(std::memory_order_release);
}
}

其中 test_and_set读-修改-写的操作

因为这种类型相当局限,故不可用于替代 std::atomic<bool>

std::atomic<bool> 支持更多操作:

std::atomic<bool> b;
bool x = b.load(std::memory_order_acquire); // 读取
b.store(true); // 储存
x = b.exchange(false, std::memory_order_acq_rel); // 读-修改-写

.compare_exchange_weak().compare_exchange_strong() 都能够基于当前值存储新的值,其中前者可能会虚假地出错,故通常用在循环中:

bool expected = false;
extern atomic<bool> b;
while (!b.compare_exchange_weak(expected, true) && !expected) ;

std::atomic<T*> 支持指针的一部分运行,同时包括 fetch_add() 等读-修改-写的操作

原子整数类型的操作更多

也可以为自定义写一个 std::atomic<> 类模板

设计基于锁的并发数据结构

线程安全的数据结构的设计:

  • 没有线程能够看到其他线程破坏的数据结构的不变量
  • 通过提供给完整的操作的函数来避免接口固有的竞争条件
  • 即使异常发生时,数据结构也要保持不变量
  • 通过限制锁的范围等最小化死锁的机会

另一方面要考虑的是并发访问:

  • 能否通过限制锁的范围来允许一些操作在锁外部执行?
  • 数据结构的不同部分能否用不同的锁保护?
  • 所有的操作都需要相同的保护级别吗?

之前已经编写了线程安全的栈,这里不再举例

设计并发代码

在线程中划分工作的技术

在处理开始之前在线程间划分数据,代表性的有 OpenMP

递归划分数据,一般使用 async() 自动管理

通过任务类型划分工作,类似于流水线

影响并发代码性能的因素

处理器的数量

数据的竞争和缓存 ping-pong:如何处理不同处理器之间的缓存失效问题

一种解决方法:尽可能让不同处理器使用的数据位置远离,使得不同处理器不会相互干扰各自的缓存

这里分别给出手动划分和使用 async() 递归划分的并行 accumulate:

// 手动划分
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length=std::distance(first, last);
if(!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length+min_per_thread-1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;

std::vector<std::future<T>> futures(num_threads - 1);
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for(unsigned long i = 0; i < num_threads - 1; ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)> task(accumulate_block<Iterator, T>());
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), block_start, block_end);
block_start = block_end;
}

T last_result = accumulate_block<Iterator, T>()(block_start, last);
T result = init;
for(unsigned long i = 0; i < num_threads - 1; ++i)
result += futures[i].get();
result += last_result;
return result;
}

// 使用 async 自动划分
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
unsigned long const max_chunk_size = 25;
if(length <= max_chunk_size) {
return std::accumulate(first, last, init);
Iterator mid_point = first;
std::advance(mid_point, length / 2);
std::future<T> first_half_result = std::async(parallel_accumulate<Iterator, T>, first, mid_point, init);
T second_half_result = parallel_accumulate(mid_point, last, T());
return first_half_result.get() + second_half_result;
}

进阶线程管理

线程池

最简单的线程池用于固定 worker 线程的数量,减小频率创建和销毁线程的开销:

class thread_pool {
std::atomic_bool done;
threadsafe_queue<std::function<void()>> work_queue;
std::vector<std::thread> threads;
join_threads joiner;
void worker_thread() {
while(!done) {
std::function<void()> task;
if(work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
}
public:
thread_pool(): done(false), joiner(threads) {
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for(unsigned i = 0; i < thread_count; ++i) {
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
}
catch(...) {
done = true;
throw;
}
}
~thread_pool() {
done = true;
}
template<typename FunctionType>
void submit(FunctionType f) {
work_queue.push(std::function<void()>(f));
}
};

等待提交到线程池中的任务,submit 返回的是一个 future

template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(std::move(f));
std::future<result_type> res(task.get_future());
work_queue.push(std::move(task));
return res;
}

等待其他任务的任务:

void thread_pool::run_pending_task() {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}

为了避免工作队列的竞争,可以为每个线程添加只属于自己的队列

当然,这么做了以后为了防止某个队列过满,另一个队列过空,可以将队列设置为可以 steal 的

并行算法

和普通的算法相比,只多加了第一个参数:

  • std::execution::seq
  • std::execution::par
  • std::execution::par_unseq

越往后,越松散

测试和调试多线程应用

并发相关的 bug 类型

不想要的阻塞:

  • 死锁
  • 活锁:不会永远停止,但可能大量增加延迟
  • IO 或外部输入阻塞

竞争情况:

  • 数据竞争
  • 破坏不变量
  • 生命时间问题:一个线程还在使用某个对象,另一个线程已经将其销毁了

定位并发相关 bug 的方法

  • 观察代码
  • 测试
  • 设计的时候,代码要尽可能可测试
  • 多线程的测试技术:
    • 暴力
    • 穷举所有组合
    • 特殊的库