基于 C++ Concurrency In Action Second Edition 编写
C++ 并发世界
并发 concurrency 是指两件或更多分开的活动在同时发生
单核 CPU 通过 context switch 来形成多个任务在同时执行的假象
多核 CPU 原理也类似
并发的方法:
并发和并行 parallelism 概念类似,但是
故使用并发的情况:
C++ 中原来没有关于并发的标准,和 C 语言一样使用操作系统提供的 API。从 C++11 开始,封装了较一般的 thread
等
从并发版的 hello world
开始:
#include <iostream> #include <thread> void hello () { std::cout << "Hello, World" << std::endl; }int main () { std::thread t (hello) ; t.join (); return 0 ; }
管理线程
基本线程管理
创建线程:std::thread my_thread(my_func);
等待线程完成:
my_thread.join()
会等待线程完成
my_thread.datach()
可以不需要等待线程完成,相当于将任务放到了后台
注意如果线程可能会抛出异常,要保证出现异常时也要调用 join()
可以使用 RAII 来保证每个线程被 join
向线程函数传递参数
简单的传递参数:
void f (int i, std::string const & s) ;std::thread t (f, 3 , "hello" ) ;
对于非 const 引用传递参数,则会悄悄地复制了一份对象,故应该在传递时指明:
void update_data_for_widget (widget_id w, widget_data& data) ;std::thread t (update_data_for_widget, w, std::ref(data)) ;
也可以传递成员函数指针,第二个参数则为调用该成员函数的对象:
class X { public : void do_lengthy_work () ; } my_X;std::thread t (&X::do_lengthy_work, &my_x) ;
对于一个不可复制的对象,可以使用 std::move()
转移一个线程的所有权
std::thread
是不可复制的,只能 move,即转移了其所有权
可以批量创建多个线程:
void do_work (unsigned id) ;void f () { std::vector<std::thread> threads; for (auto i = 0 ; i < 20 ; i++) threads.emplace_back (do_work, i); for (auto & entry: threads) entry.join (); }
识别线程
可以通过 std::thread::id
来识别线程,支持比较,可以用于让主线程做一些额外的工作,如:
std::thread::id master_thread;void some_core_part_of_algorithm () { if (std::this_thread::get_id () == master_thread) do_master_thread_work (); do_common_work (); }
线程之间共享数据
线程之间共享数据的问题
竞争 race :本质上是多线程打破了顺序执行中潜在的不变量 invariant
避免方法:
通过保护机制来包装数据结构
修改数据结构,使得成为一个无锁的
使用 mutex 来保护共享的数据
一个简单的例子:
#include <mutex> std::mutex some_mutex; std::list<int > some_list;void add_to_list (int new_value) { std::lock_guard guard (some_mutex) ; some_list.push_back (new_value); }
但是注意 mutex 不是万能的,如果有代码持有能够访问被保护的数据的指针或引用,则可以在没有上锁的情况下访问修改数据
因此,不要将对保护数据的指针和引用传递到锁的外部,无论是通过将其从函数中返回、存在外部可见的内存中、还是将其作为用户提供的函数的参数传递
事实上,一些数据结构的接口设计中就是会存在竞争情况的,如:
stack<int > s;if (!s.empty ()) { int const value = s.top (); s.pop (); do_something (value); }
我们可以对 pop()
接口作如下的修改:
传递引用
需要不会抛出异常的复制构造函数或移动构造函数
返回对被 pop 的物品的指针
例如接口设计如下:
std::shared_ptr<T> pop () ;void pop (T& value) ;
如果对一个操作有多个锁,则可能出现死锁 deadlock ,即两个线程在等待一个不会发生的情况
可以同时给两个 mutex 上锁,如:
void swap (X& lhs, X& rhs) { if (&lhs == &rhs) return ; std::scoped_lock guard (lhs.m, rhs.m) ; swap (lhs.some_detail, rhs.some_detail); }
其中 scoped_lock
和普通的 lock_guard
没什么区别,只不过其接受的是可变数量的参数
避免死锁的方法:
避免嵌套锁
避免当保持锁时调用用户提供的代码,因为其可能也会上锁,故可能出现嵌套锁
以固定的顺序获得锁
使用锁层次结构
unique_lock
和普通的 lock_guard
相比开销大一些,但更加灵活,如可以传递 std::defer_lock
作为第二个参数使构造时不上锁,std::adopt_lock
可以让改锁对象管理 mutex 上的锁
正如其名字,不能复制,只能 move
其还可以通过调用 .lock()
和 .unlock()
来进行细粒度的上锁
保护共享数据的替代方案
在初始化过程中保护共享数据
可以通过 std::once_flag
和 std::call_once()
来保证只被初始化一次
std::once_flag resource_flag;void init_resource () ;void foo () { std::call_once (resource_flag, init_resource); }
当然,static
也提供了类似的保证
保护很少被更新的数据结构
std::shared_mutex
和 std::shared_lock
配合,可以同时获取多个锁,而普通的 lock_guard
则会被阻隔,非常便于实现读者-写者模型:
#include <shared_mutex> std::map<std::string, dns_entry> entries;mutable std::shared_mutex entry_mutex;dns_entry find_entry (std::string const & domain) const { std::shared_lock lk (entry_mutex) ; return ... }void update (std::string const & domain) { std::lock_guard lk (entry_mutex); ... }
如果给一个已经上锁的 mutex 再次上锁,则会导致未定义的行为
同步并发操作
等待一个事件或其他情况
可以通过条件变量 std::condition_variable
来等待:
std::condition_variable data_cond;void data_preparation_thread () { while (more_data_to_prepare ()) { data_chunk const data = prepare_data (); data_cond.notify_one (); } }void data_processing_thread () { while (true ) { std::unique_lock lk (mut) ; data_cond.wait (lk, []{return !data_queue.empty ();}); } }
其中 .wait()
如果后面的函数返回值为 false,则释放锁 lk
,然后将线程挂起。直到收到了 .notify_one()
的通知,才会再次检测返回值。如果为 true,则上锁,并继续执行。
所以事实上 .wait()
本质上是一个优化版的忙碌等待
当然,也有对应的 .notify_all()
用 future 等待一次性的事件
使用 std::future
:
#include <future> int find_the_answer () ;int main () { std::future<int > the_answer = std::async (find_the_answer); std::cout << the_answer.get () << std::endl; }
std::async
和 thread
用法类似,同时支持第一个参数传递 std::launch::async
和 std::launch::deferred
选项
std::packaged_task<>
能够将 future 绑定到一个函数或可调用对象上:
std::future<void > post_task_for_gui_thread (Func f) { std::package_task<void () > task (f) ; std::future<void > res = task.get_future (); tasks.push_back (std::move (task)); retur res; }
std::promise/std::future
对提供了一个可能的处理机制:等待的线程会在 future 被 block,提供数据的线程可能使用 promise 来设置相关的值并使 future 准备好
promise 设置值 .set_value()
也可以设置异常 .set_exception()
std::future
只可以 move,std::shared_future
能够复制,多个线程都可以等待 future:
std::promise<int > p; std::shared_future<int > sf = p.get_future ().share ();
等待时间限制
clock 是一个提供四种不同信息的类:
时间 now
表示从始终获取的时间的类型的值
时钟滴答的周期
时钟是否以均匀速率滴答
时间间隔:
using namespace std::chorno_literals; auto one_day = 24 h;auto half_an_hour = 30 min;
.wait_for
可以等待某一时间间隔,如:
std::future<int > f = std::async (some_task);if (f.wait_for (std::chrono::milliseconds (35 )) == std::future_status::ready) do_something_with (f.get ());
.wait_until
等待某一时间点:
auto const timeout = std::chrono::steady_clock::now () + std::chrono::milliseconds (500 );if (cv.wait_until (lk, timeout) == std::cv_status::timeout)
很多函数都有相应的 wait_for
和 wait_until
版本,如 lock
、future
C++ 内存模型和对原子类型的操作
内存模型基础
由对象构成,在内存中有一块对应的区域
C++ 中原子操作和类型
原子类型有时会通过 mutex
实现,使平台等而定,可以通过 ::is_lock_free
等判断
几乎所有原始类型和定义的类型都有相关的原子版本,如 std::atomic<int>
std::atomic_flag
是最最简单的标准原子类型,类似于一个 Bool flag,只有两种状态:set 和 clear
可以用于实现 mutex
:
class spinlock_mutex { std::atomic_flag flag; public : spinlock_mutex () : flag (ATOMIC_FLAG_INIT) { } void lock () { while (flag.test_and_set (std::memory_order_acquire)); } void unlock () { flag.clear (std::memory_order_release); } }
其中 test_and_set
是读-修改-写 的操作
因为这种类型相当局限,故不可用于替代 std::atomic<bool>
std::atomic<bool>
支持更多操作:
std::atomic<bool > b;bool x = b.load (std::memory_order_acquire); b.store (true ); x = b.exchange (false , std::memory_order_acq_rel);
.compare_exchange_weak()
和 .compare_exchange_strong()
都能够基于当前值存储新的值,其中前者可能会虚假地出错,故通常用在循环中:
bool expected = false ;extern atomic<bool > b;while (!b.compare_exchange_weak (expected, true ) && !expected) ;
std::atomic<T*>
支持指针的一部分运行,同时包括 fetch_add()
等读-修改-写的操作
原子整数类型的操作更多
也可以为自定义写一个 std::atomic<>
类模板
设计基于锁的并发数据结构
线程安全的数据结构的设计:
没有线程能够看到其他线程破坏的数据结构的不变量
通过提供给完整的操作的函数来避免接口固有的竞争条件
即使异常发生时,数据结构也要保持不变量
通过限制锁的范围等最小化死锁的机会
另一方面要考虑的是并发访问:
能否通过限制锁的范围来允许一些操作在锁外部执行?
数据结构的不同部分能否用不同的锁保护?
所有的操作都需要相同的保护级别吗?
之前已经编写了线程安全的栈,这里不再举例
设计并发代码
在线程中划分工作的技术
在处理开始之前在线程间划分数据,代表性的有 OpenMP
递归划分数据,一般使用 async()
自动管理
通过任务类型划分工作,类似于流水线
影响并发代码性能的因素
处理器的数量
数据的竞争和缓存 ping-pong :如何处理不同处理器之间的缓存失效问题
一种解决方法:尽可能让不同处理器使用的数据位置远离,使得不同处理器不会相互干扰各自的缓存
这里分别给出手动划分和使用 async()
递归划分的并行 accumulate:
template <typename Iterator, typename T>T parallel_accumulate (Iterator first, Iterator last, T init) { unsigned long const length=std::distance (first, last); if (!length) return init; unsigned long const min_per_thread = 25 ; unsigned long const max_threads = (length+min_per_thread-1 ) / min_per_thread; unsigned long const hardware_threads = std::thread::hardware_concurrency (); unsigned long const num_threads = std::min (hardware_threads != 0 ? hardware_threads : 2 , max_threads); unsigned long const block_size = length / num_threads; std::vector<std::future<T>> futures (num_threads - 1 ); std::vector<std::thread> threads (num_threads - 1 ) ; join_threads joiner (threads) ; Iterator block_start = first; for (unsigned long i = 0 ; i < num_threads - 1 ; ++i) { Iterator block_end = block_start; std::advance (block_end, block_size); std::packaged_task<T (Iterator, Iterator) > task (accumulate_block<Iterator, T>()) ; futures[i] = task.get_future (); threads[i] = std::thread (std::move (task), block_start, block_end); block_start = block_end; } T last_result = accumulate_block <Iterator, T>()(block_start, last); T result = init; for (unsigned long i = 0 ; i < num_threads - 1 ; ++i) result += futures[i].get (); result += last_result; return result; }template <typename Iterator, typename T>T parallel_accumulate (Iterator first, Iterator last, T init) { unsigned long const length = std::distance (first, last); unsigned long const max_chunk_size = 25 ; if (length <= max_chunk_size) { return std::accumulate (first, last, init); Iterator mid_point = first; std::advance (mid_point, length / 2 ); std::future<T> first_half_result = std::async (parallel_accumulate<Iterator, T>, first, mid_point, init); T second_half_result = parallel_accumulate (mid_point, last, T ()); return first_half_result.get () + second_half_result; }
进阶线程管理
线程池
最简单的线程池用于固定 worker 线程的数量,减小频率创建和销毁线程的开销:
class thread_pool { std::atomic_bool done; threadsafe_queue<std::function<void ()>> work_queue; std::vector<std::thread> threads; join_threads joiner; void worker_thread () { while (!done) { std::function<void ()> task; if (work_queue.try_pop (task)) { task (); } else { std::this_thread::yield (); } } } public : thread_pool (): done (false ), joiner (threads) { unsigned const thread_count = std::thread::hardware_concurrency (); try { for (unsigned i = 0 ; i < thread_count; ++i) { threads.push_back (std::thread (&thread_pool::worker_thread, this )); } } catch (...) { done = true ; throw ; } } ~thread_pool () { done = true ; } template <typename FunctionType> void submit (FunctionType f) { work_queue.push (std::function <void ()>(f)); } };
等待提交到线程池中的任务,submit 返回的是一个 future
:
template <typename FunctionType> std::future<typename std::result_of<FunctionType ()>::type> submit (FunctionType f) { typedef typename std::result_of<FunctionType ()>::type result_type; std::packaged_task<result_type () > task (std::move(f)) ; std::future<result_type> res (task.get_future()) ; work_queue.push (std::move (task)); return res; }
等待其他任务的任务:
void thread_pool::run_pending_task () { function_wrapper task; if (work_queue.try_pop (task)) { task (); } else { std::this_thread::yield (); } }
为了避免工作队列的竞争,可以为每个线程添加只属于自己的队列
当然,这么做了以后为了防止某个队列过满,另一个队列过空,可以将队列设置为可以 steal 的
并行算法
和普通的算法相比,只多加了第一个参数:
std::execution::seq
std::execution::par
std::execution::par_unseq
越往后,越松散
测试和调试多线程应用
并发相关的 bug 类型
不想要的阻塞:
死锁
活锁:不会永远停止,但可能大量增加延迟
IO 或外部输入阻塞
竞争情况:
数据竞争
破坏不变量
生命时间问题:一个线程还在使用某个对象,另一个线程已经将其销毁了
定位并发相关 bug 的方法
观察代码
测试
设计的时候,代码要尽可能可测试
多线程的测试技术: