第18章 并发
第18章 并发
- 介绍
- 任务与 线程
- 传递参数
- 返回结果
- 共享数据
- mutex 和锁
- atomic
- 等待事件
- 任务间通信
- future 和 promise
- packaged_task
- async()
- 停止 线程
- 协程
- 协作式多任务处理
- 建议
18.1 介绍
并发——即同时执行多个任务——被广泛用于提高吞吐量(通过使用多个处理器进行单次计算)或提高响应速度(允许程序的一部分在另一部分等待响应时继续执行)。所有现代编程语言都提供了对此的支持。
C++标准库提供的支持是一种可移植且类型安全的变体,这种变体已经在C++中使用了20多年,并且几乎得到了现代硬件的普遍支持。标准库的支持主要旨在支持系统级并发,而不是直接提供复杂的高级并发模型;这些可以作为使用标准库设施构建的库来提供。
标准库直接支持在单个地址空间中并发执行多个线程。为了实现这一点,C++提供了合适的内存模型和一组原子操作。原子操作允许无锁编程[Dechev,2010]。
内存模型确保只要程序员避免数据竞争(对可变数据的无控制并发访问),一切都能如人们所天真地期望的那样工作。然而,大多数用户只会从标准库和在此基础上构建的库的角度来看待并发。本节简要给出了主要标准库并发支持设施的示例: 线程 、 互斥锁 、 lock() 操作、 packaged_task 和 future 。这些特性是直接建立在操作系统提供的功能之上的,与之相比不会带来性能损失。它们也不保证与操作系统提供的功能相比有显著的性能提升。
不要把并发视为万灵药。如果一个任务可以顺序执行,那么通常更简单、更快。从一个线程向另一个线程传递信息可能会出乎意料地昂贵。作为使用显式并发特性的替代方案,我们通常可以使用并行算法来利用多个执行引擎以提高性能(§ 13.6、§ 17.3.1)。
最后,C++支持协程,即那些在调用之间保持其状态的函数(§ 18.6)。
18.2 任务和线程
我们将那些可能与其他计算同时执行的计算称为 任务 。 线程 是程序中任务的系统级表示。要与其他任务并发执行的任务,是通过构造一个 线程 (在 <thread> 中)来启动的,任务作为其参数。任务是一个函数或一个函数对象:
void f(); // 函数
struct F { // 函数对象
void operator()(); // F的调用运算符(§7.3.2
};
void user()
{
thread t1 {f}; // f() 在单独的线程中执行
thread t2 {F{}}; // F{}() 在单独的线程中执行
}
t1.join(); // 等待 t1
t2.join(); // 等待 t2
join() 确保我们不会在线程完成之前退出 user() 。要使一个线程“加入”意味着“等待线程终止”。 很容易忘记调用 join() ,结果通常很糟糕,所以标准库提供了 jthread ,它是一个“加入线程”,遵循RAII原则,其析构函数会调用 join() :
void user()
{
jthread t1 {f};
jthread t2 {F{}};
}
加入操作是由析构函数完成的,所以顺序与构造顺序相反。在这里,我们在 t1 之前等待 t2 。
程序中的线程共享一个单一的地址空间。在这方面,线程与进程不同,进程通常不直接共享数据。由于线程共享一个地址空间,它们可以通过共享对象(§18.3)进行通信。这种通信通常由锁或其他机制控制,以防止数据竞争(对变量的无控制并发访问)。
编程并发任务可能非常棘手。考虑任务 f (一个函数)和 F (一个函数对象)的可能实现:
void f()
{
cout << "Hello ";
}
struct F {
void operator()() { cout << "Parallel World!\n"; }
};
这是一个错误的例子:在这里, f 和 F{} 都使用对象 cout ,但没有任何形式的同步。结果输出将是不可预测的,并且可能在不同程序执行之间有所不同,因为两个任务中各个操作的执行顺序没有定义。程序可能产生“奇怪”的输出,如:
PaHerallllel o World!
只有在标准中对 ostream 的定义中有一个特定的保证,才能避免可能导致崩溃的数据竞争。 为了避免输出流出现此类问题,可以让只有一个线程使用流,或者使用 osyncstream (§11.7.5)。 在定义并发程序的任务时,我们的目标是使任务完全分离,除非它们以简单且明显的方式进行通信。思考并发任务的最简单方式是将其视为一个函数,该函数恰好与其调用者并发运行。为了使其工作,我们只需要传递参数,获取结果,并确保之间没有使用共享数据(没有数据竞争)。
18.2.1 传递参数
通常,任务需要对数据进行操作。我们可以轻松地将数据(或数据的指针或引用)作为参数传递。考虑以下代码:
void f(vector<double>& v); // 函数:对v进行操作
struct F { // 函数对象:对v进行操作
vector<double>& v;
F(vector<double>& vv) :v{vv} { }
void operator()(); // 应用运算符;§7.3.2
};
int main()
{
vector<double> some_vec {1, 2, 3, 4, 5, 6, 7, 8, 9};
vector<double> vec2 {10, 11, 12, 13, 14};
}
jthread t1 {f,ref(some_vec)}; // f(some_vec) 在单独的线程中执行
jthread t2 {F{vec2}}; // F(vec2)() 在单独的线程中执行
F{vec2} 在 F 中保存了对参数向量的引用。现在 F 可以使用该向量,希望没有其他任务在 F 执行时访问 vec2 。通过值传递 vec2 可以消除这种风险。
使用 {f,ref(some_vec)} 进行初始化时,会使用线程的变参模板构造函数,它可以接受任意数量的参数(§8.4)。 ref() 是来自 <functional> 的类型函数,不幸的是,需要它来告诉变参模板将 some_vec 视为引用,而不是对象。如果没有这个 ref() , some_vec 将按值传递。编译器会检查给定参数是否可以调用第一个参数,并构建必要的函数对象以传递给线程。因此,如果 F::operator()() 和 f() 执行相同的算法,则这两个任务的处理大致相等:在这两种情况下,都会为线程构造一个函数对象以执行。
18.2.2 返回结果
在§18.2.1中的示例中,我通过非常量引用来传递参数。我只有在期望任务修改所引用数据的值时才会这样做(§1.7)。这是一种有些狡猾但并非不常见的返回结果的方式。一个不那么晦涩的技术是通过常量引用传递输入数据,并将存放结果的位置作为单独的参数传递:
void f(const vector<double>& v, double* res); // 从v中取得输入;将结果存放在*res中
class F {
public:
F(const vector<double>& vv, double* p) :v{vv}, res{p} { }
void operator()(); // 将结果存放在*res中
private:
const vector<double>& v; // 输入来源
double* res; // 输出目标
};
double g(const vector<double>&); // 使用返回值
void user(vector<double>& vec1, vector<double> vec2, vector<double> vec3)
{
double res1;
double res2;
double res3;
thread t1 {f,cref(vec1),&res1}; // f(vec1,&res1)在单独的线程中执行
thread t2 {F{vec2,&res2}}; // F{vec2,&res2}()在单独的线程中执行
thread t3 { [&](){ res3 = g(vec3); } }; // 通过引用捕获局部变量
t1.join();// 在使用结果之前等待
t2.join();
t3.join();
}
cout << res1 << ' ' << res2 << ' ' << res3 << '\n';
在这里, cref(vec1) 将 vec1 的常量引用作为参数传递给 t1 。
这种方法可行且非常常见,但我认为通过引用来返回结果并不特别优雅,因此我将在§18.5.1中再次讨论这个话题。
18.3 共享数据
有时任务需要共享数据。在这种情况下,访问必须同步,以便一次最多只有一个任务可以访问。有经验的程序员会认识到这是一种简化(例如,许多任务同时读取不可变数据是没有问题的),但要考虑如何确保一次最多只有一个任务可以访问给定的一组对象。
18.3.1 互斥锁和锁
mutex 是一种“互斥对象,mutual exclusion object”,是 线程 间共享数据的关键元素。线程使用 lock() 操作获取 mutex :
mutex m; // 控制互斥锁
int sh; // 共享数据
void f()
{
scoped_lock lck {m}; // 获取互斥锁
sh += 7; // 操作共享数据
} // 隐式释放互斥锁
lck 的类型被推导为 scoped_lock<mutex> (§7.2.3)。 scoped_lock 的构造函数获取互斥锁(通过调用 m.lock() )。如果另一个线程已经获取了该互斥锁,则该线程将等待(“阻塞”),直到另一个线程完成其访问。一旦线程完成了对共享数据的访问, scoped_lock 就会释放互斥锁(通过调用 m.unlock() )。当互斥锁被释放时,等待它的 线程 将恢复执行(“被唤醒”)。互斥和锁设施在 <mutex> 中。
注意RAII(§6.3)的使用。使用资源句柄,如 scoped_lock 和 unique_lock (§18.4),比显式锁定和解锁互斥锁更简单、更安全。
共享数据和互斥锁之间的对应关系依赖于约定:程序员必须知道哪个互斥锁应该对应于哪些数据。显然,这是容易出错的,同样显然的是,我们试图通过各种语言手段使这种对应关系更加清晰。例如:
class Record {
public:
mutex rm;
// ...
};
不需要天才也能猜到,对于一个名为 rec 的 Record ,你应该在访问 rec 的其余部分之前获取 rec.rm ,尽管注释或更好的名称可能有助于读者理解。
同时访问多个资源以执行某些操作并不罕见。这可能导致死锁。例如,如果 线程1 获取了 mutex1 ,然后尝试获取 mutex2 ,而 线程2 获取了 mutex2 ,然后尝试获取 mutex1 ,那么两个任务都将无法继续执行。 scoped_lock 通过使我们能够同时获取多个锁来帮助解决这个问题:
void f()
{
scoped_lock lck {mutex1,mutex2,mutex3}; // 获取所有三个锁
// ... 操作共享数据 ...
} // 隐式释放所有互斥锁
这个 scoped_lock 只有在获取了其所有 互斥锁 参数之后才会继续执行,并且在持有 互斥锁 时永远不会阻塞(“进入睡眠状态”)。 scoped_lock 的析构函数确保当线程离开作用域时释放互斥锁。
通过共享数据进行通信是相当底层的。特别是,程序员必须设计方法来知道各种任务已经完成了哪些工作以及尚未完成哪些工作。在这方面,使用共享数据不如调用和返回的概念。另一方面,有些人认为共享必须比复制参数和返回值更高效。当涉及大量数据时,这确实可能是这样,上锁和解锁是相对昂贵的操作。另一方面,现代机器非常擅长复制数据,尤其是紧凑的数据,如 vector 元素。因此,不要因为“效率”而选择共享数据进行通信,而不加思考,最好也不要不加测量。
基本的 互斥锁 允许一次只有一个线程访问数据。共享数据最常见的方式之一是多个读者和一个单一的写者之间。这种“读写锁”习语由 shared_mutex 支持。读者将以“共享”方式获取互斥锁,以便其他读者仍然可以获得访问权限,而写者将要求独占访问权限。例如:
shared_mutex mx; // 一个可以共享的互斥锁
void reader()
{
shared_lock lck {mx}; // 愿意与其他读者共享访问权限
// ... 读取 ...
}
void writer()
{
unique_lock lck {mx}; // 需要独占(唯一)访问权限
// ... 写入 ...
}
18.3.2 原子操作
互斥锁 是一种相当重量级的机制,涉及操作系统。它允许任意量的工作在无数据竞争的情况下完成。然而,对于只需执行少量工作的情况,有一个更简单且成本更低的机制: 原子 (atomic)变量。例如,以下是经典双重检查锁定的一种简单变体:
mutex mut;
atomic<bool>init_x; // 初始化为false。
X x; // 需要非平凡初始化的变量
if (!init_x) {
lock_guard lck {mut};
if (!init_x) {
// ... 对x进行非平凡初始化 ...
init_x = true;
}
}
// ... 使用x ...
原子 操作使我们避免了大多数对成本更高的 互斥锁 的使用。如果 init_x 不是原子的,那么初始化将会非常罕见地失败,导致神秘且难以发现的错误,因为 init_x 上会发生数据竞争。 在这里,我使用了 lock_guard 而不是 scoped_lock ,因为我只需要一个互斥锁,所以最简单的锁( lock_guard )就足够了。
18.4 等待事件
有时,线程需要等待某种外部事件,如另一个线程完成任务或经过一定时间。最简单的“事件”就是时间的流逝。 使用 <chrono> 中的时间设施,我可以这样写:
using namespace chrono;
auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();
cout << duration_cast<nanoseconds>(t1-t0).count() << " nanoseconds passed\n";
我甚至不需要启动一个线程;默认情况下, this_thread 可以指代唯一的一个线程。
我使用 duration_cast 来调整时钟的单位到我想要的纳秒。
使用外部事件进行通信的基本支持是由 <condition_variable> 中的 condition_variable 提供的。 condition_variable 是一种机制,允许一个 线程 等待另一个 线程 。特别是,它允许一个 线程 等待由其他线程完成的工作导致的一些 条件 (通常称为 事件 )发生。
使用 condition_variable 支持许多优雅和高效的共享形式,但可能相当棘手。考虑两个线程通过队列传递消息进行通信的经典示例。为了简化,我声明了队列和避免在该队列上发生竞争条件的机制,这些对生产者和消费者来说是全局的:
class Message { // 要通信的对象
// ...
};
queue<Message> mqueue; // 消息队列
condition_variable mcond; // 通信事件变量
mutex mmutex; // 用于同步访问mcond
queue 、 condition_variable 和 mutex 类型由标准库提供。 consumer() 读取并处理 Messages :
void consumer()
{
while(true) {
unique_lock lck {mmutex}; // 获取mmutex
mcond.wait(lck,[] { return !mqueue.empty(); }); // 释放mmutex并等待;
// 唤醒时重新获取mmutex 除非mqueue非空,否则不唤醒
auto m = mqueue.front(); // 获取消息
mqueue.pop();
lck.unlock(); // 释放mmutex
// ... 处理m ...
}
}
在这里,我明确地使用 mutex 上的 unique_lock 来保护对队列和 condition_variable 的操作。在 condition_variable 上等待会释放其锁参数,直到等待结束(即队列非空),然后重新获取它。显式检查条件(这里是 !mqueue.empty() )可以防止唤醒后发现其他任务已经“先到先得”,因此条件不再成立。
我使用 unique_lock 而不是 scoped_lock 有两个原因:
• 我们需要将锁传递给 condition_variable 的 wait() 。 scoped_lock 不能移动,但 unique_lock 可以。
• 我们希望在处理消息之前解锁保护条件变量的 互斥锁 。 unique_lock 提供了 lock() 和 unlock() 等操作,用于低级别的同步控制。 另一方面, unique_lock 只能处理一个互斥锁。 相应的生产者看起来像这样:
void producer()
{
while(true) {
Message m;
// ... 填充消息 ...
scoped_lock lck {mmutex}; // 保护操作
mqueue.push(m);
mcond.notify_one(); // 通知
} // 释放mmutex(在作用域结束时)
}
18.5 任务通信
标准库提供了一些工具,允许程序员在任务(可能并发执行的工作)的概念层面上操作,而不是直接在较低级别的线程和锁上进行操作:
• future 和 promise 用于从在单独线程上生成的任务返回一个值
• packaged_task 用于帮助启动任务并连接返回结果的机制
• async() 用于以非常类似于调用函数的方式启动任务 这些工具在 <future> 中可以找到。
18.5.1 future和promise
关于 future 和 promise 的重要一点是,它们能够在两个任务之间传输一个值,而无需显式使用锁;“系统”会高效地实现这种传输。基本思想很简单:当一个任务想要将值传递给另一个任务时,它会将该值放入一个 promise 中。不知何故,实现机制会使得该值出现在相应的 future 中,然后可以从 future 中读取该值(通常由任务的启动者读取)。我们可以用图形来表示这一点:
如果我们有一个名为 fx 的 future<X> ,我们可以通过调用 get() 从中获取一个类型为 X 的值:
X v = fx.get(); // 如果需要,等待值被计算出来
如果值还没有到达,我们的线程会被阻塞,直到它到达。如果值无法被计算, get() 可能会抛出一个异常(来自系统或由 promise 传递)。
promise 的主要目的是提供简单的“ put ”操作(称为 set_value() 和 set_exception() ),以匹配 future 的 get() 。名称“ future ”和“ promise ”是历史遗留的,请不要为此责怪我或赞扬我。它们又是另一个充满双关语的源泉。
如果你有一个 promise 并需要向一个 future 发送一个类型为 X 的结果,你可以做两件事之一:传递一个值或传递一个异常。例如:
void f(promise<X>& px) // 一个任务:将结果放入px中
{
// ...
try {
X res;
// ... 为res计算一个值 ...
px.set_value(res);
}
catch (...) { // 哎呀:无法计算res
px.set_exception(current_exception()); // 将异常传递给future的线程
}
}
这里的 current_exception() 指的是捕获到的异常。
为了处理通过 future 传递的异常,调用 get() 的人必须准备好在某处捕获它。例如:
void g(future<X>& fx) // 一个任务:从fx中获取结果
{
// ...
try {
X v = fx.get(); // 如果需要,等待值被计算出来
// ... 使用v ...
}
catch (...) { // 哎呀:有人无法计算v
// ... 处理错误 ...
}
}
如果错误不需要由 g() 本身处理,代码可以简化为最小形式:
void g(future<X>& fx) // 一个任务:从fx中获取结果
{
// ...
X v = fx.get(); // 如果需要,等待值被计算出来
// ... 使用v ...
}
现在,从 fx 的函数( f() )中抛出的异常会隐式地传播到 g() 的调用者,就像 g() 直接调用 f() 时一样。
18.5.2 packaged_task
如何将一个需要结果的 future 以及相应的 promise 放入应该产生该结果的线程中呢?提供了 packaged_task 类型来简化设置与 future 和 promise 相关联的任务,以便在 线程 上运行。 packaged_task 提供了包装代码,用于将任务的返回值或异常放入 promise 中(如§ 18.5.1所示的代码)。如果你通过调用 get_future 来请求, packaged_task 会给你与其 promise 相对应的 future 。例如,我们可以设置两个任务,每个任务都使用标准库的 accumulate() (§ 17.3)来计算 vector<double> 中一半元素的和:
double accum(vector<double>::iterator beg, vector<double>::iterator end, double init)
// 计算从初始值init开始的[beg:end)区间的和
{
return accumulate(&*beg, &*end, init);
}
double comp2(vector<double>& v)
{
packaged_task pt0 {accum}; // 包装任务(即accum)
packaged_task pt1 {accum};
future<double> f0 {pt0.get_future()}; // 获取pt0的future
future<double> f1 {pt1.get_future()}; // 获取pt1的future
double* first = &v[0];
thread t1 {move(pt0), first, first + v.size() / 2, 0}; // 为pt0启动一个线程
thread t2 {move(pt1), first + v.size() / 2, first + v.size(), 0}; // 为pt1启动一个线程
// ...
return f0.get() + f1.get(); // 获取结果
}
packaged_task 模板将任务的类型作为其模板参数(此处为 double(double*,double*,double) ** ),并将任务作为其构造函数参数(此处为 ** accum ** )。需要 ** move() ** 操作,因为 ** packaged_task ** 不能被复制。 ** packaged_task ** 不能被复制的原因是它是一个资源句柄:它拥有其 ** promise ,并且(间接地)负责其任务可能拥有的任何资源。
请注意,此代码中未明确提及锁:我们能够专注于要完成的任务,而不是用于管理它们通信的机制。这两个任务将在单独的线程上运行,因此可能并行执行。
18.5.3 async()
本章我所追求的思路是我认为最简单但仍是最强大的思路之一:将一个任务视为一个可能与其他任务并发运行的函数。这远非C++标准库支持的唯一模型,但它能满足广泛的需求。在需要时,可以使用更微妙和棘手的模型(例如,依赖于共享内存的编程风格)。
为了启动可能异步运行的任务,我们可以使用 async() :
double comp4(vector<double>& v)
// 如果v足够大,则生成多个任务
{
if (v.size()<10'000) // 是否值得使用并发?
return accum(v.begin(),v.end(),0.0);
auto v0 = &v[0];
auto sz = v.size();
auto f0 = async(accum,v0,v0+sz/4,0.0); //第一个四分之一
auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0); //第二个四分之一
auto f2 = async(accum,v0+sz/2,v0+sz*3/4,0.0); //第三个四分之一
auto f3 = async(accum,v0+sz*3/4,v0+sz,0.0); //第四个四分之一
}
return f0.get()+f1.get()+f2.get()+f3.get(); // 收集并组合结果
基本上, async() 将函数调用的“调用部分”与“获取结果部分”分开,并将这两者与实际任务的执行分开。使用 async() ,你不必考虑线程和锁。相反,你会考虑可能异步计算结果的任务。有一个明显的限制:不要考虑使用 async() 来执行需要锁定资源的任务。使用 async() 时,你甚至不知道将使用多少个线程,因为这取决于 async() 根据调用时可用的系统资源来决定。例如, async() 可能会检查是否有空闲核心(处理器)可用,然后再决定使用多少个 线程 。
使用关于计算成本与启动 线程 成本之间的猜测,如 v.size()<10'000 ,是非常原始的,并且容易对性能做出重大错误判断。然而,这并不是讨论如何管理线程的合适场合。不要把这个估计看作是一个比简单且可能很差的猜测更好的东西。
很少需要手动并行化标准库算法,如 accumulate() ,因为并行算法(例如, reduce(par_unseq,/.../) )通常在这方面做得更好(§17.3.1)。然而,这种技术是通用的。
请注意, async() 不仅仅是一种专门用于并行计算以提高性能的机制。例如,它还可以用于生成一个任务来获取用户信息,同时让“主程序”忙于其他事情(§18.5.3)。
18.5.4 停止线程
有时,我们想停止一个 线程 ,因为我们不再关心它的结果。仅仅“杀死”它通常是不可接受的,因为一个 线程 可能拥有必须释放的资源(例如,锁、子线程和数据库连接)。相反,标准库提供了一种机制,可以礼貌地请求 线程 进行清理并退出:即 stop_token 。如果一个线程拥有 stop_token 并且被请求停止,那么它可以被编程为终止。
考虑一个并行算法 find_any() ,它生成许多线程来寻找结果。当一个线程带着答案返回时,我们希望停止剩余的线程。 find_any() 生成的每个线程都会调用 find() 来完成实际工作。这个 find() 是一个常见任务风格的非常简单的例子,其中有一个主循环,我们可以在其中插入一个测试,以决定是继续还是停止:
atomic<int> result = -1; // 在此处放置结果索引
template<class T> struct Range { T* first; T* last; }; // 传递T范围的一种方式
void find(stop_token tok, const string* base, const Range<string> r, const string target)
{
for (string* p = r.first; p!=r.last && !tok.stop_requested(); ++p)
if (match(*p, target)) { // match()对两个字符串应用一些匹配标准
result = p - base; // 找到元素的索引
return;
}
}
在这里, !tok.stop_requested() 测试是否有其他线程请求此线程终止。 stop_token 是安全(无数据竞争)地传达此类请求的机制。 以下是一个简单的 find_any() ,它只生成两个运行 find() 的 线程 :
void find_all(vector<string>& vs, const string& key)
{
int mid = vs.size()/2;
string* pvs = &vs[0];
stop_source ss1{};
jthread t1(find, ss1.get_token(), pvs, Range{pvs,pvs+mid}, key); // vs的第一半
stop_source ss2{};
jthread t2(find, ss2.get_token(), pvs, Range{pvs+mid,pvs+vs.size()} , key); // vs的第二半
while (result == -1)
this_thread::sleep_for(10ms);
ss1.request_stop(); // 我们有了结果:停止所有线程
ss2.request_stop();
}
// ... 使用结果 ...
stop_source 通过 stop_token 生成请求,以将停止请求传达给 线程 。
同步和返回结果的实现是我能想到的最简单的:将结果放入一个原子变量(§18.3.2)中,并对其进行旋转循环。
当然,我们可以将这个简单示例进行扩展,使用多个搜索线程,使结果的返回更加通用,并使用不同的元素类型。然而,这将掩盖 stop_source 和 stop_token 的基本作用。
18.6 协程
协程是一种在调用之间保持其状态的函数。在这方面,它有点像函数对象,但它在调用之间保存和恢复状态是隐式的且完整的。考虑一个经典的例子:
generator<long long> fib() // 生成斐波那契数列
{
long long a = 0;
long long b = 1;
while (a<b) {
auto next = a+b;
co_yield next; // 保存状态,返回值,并等待
a = b;
b = next;
}
co_return 0; // 一个过远的斐波那契数
}
void user(int max)
{
for (int i=0; i++<max;)
cout << fib() << ' ';
}
这将生成:
1 2 3 5 8 13 ...
生成器的返回值是协程在调用之间存储其状态的地方。当然,我们本来可以制作一个以同样方式工作的函数对象 Fib ,但那时我们就必须自己维护它的状态了。对于更大的状态和更复杂的计算,保存和恢复状态会变得乏味、难以优化,并且容易出错。实际上,协程是一种在调用之间保存其栈帧的函数。 co_yield 返回一个值并等待下一次调用。 co_return 返回一个值并终止协程。 协程可以是同步的(调用者等待结果)或异步的(调用者执行一些其他工作,直到它查找来自协程的结果)。斐波那契示例显然是同步的。这允许进行一些很好的优化。例如,一个好的优化器可以内联对 fib() 的调用并展开循环,只留下一系列的 << 调用,它们本身可以优化为:
cout << "1 2 3 5 7 12"; // fib(6)
协程被实现为一个极其灵活的框架,能够满足广泛的潜在用途。它是由专家为专家设计的,带有一些委员会设计的风格。这很好,只是C++20中仍然缺少使简单用途变得简单的库设施。例如, generator 还不是(尚未成为)标准库的一部分。不过,已经有一些提案,并且在网上搜索可以找到一些很好的实现; [Cppcoro] 就是一个例子。
18.6.1 协同多任务处理
在 《The Art of Computer Programming》 的第一卷中,唐纳德·克努特(Donald Knuth)赞扬了协程的实用性,但也哀叹难以给出简短的示例,因为协程在简化复杂系统方面最为有用。在这里,我将给出一个简单的示例,展示进行事件驱动模拟所需的基本元素,而事件驱动模拟是C++早期成功的主要原因之一。核心思想是将系统表示为一个简单任务(协程)网络,这些任务协作完成复杂任务。基本上,每个任务都是一个执行大型任务一小部分的参与者。有些任务是生成器,产生请求流(可能使用随机数生成器,也可能处理现实世界的数据),有些是网络计算结果的一部分,还有一些产生输出。我个人倾向于通过消息队列来让任务(协程)进行通信。组织这种系统的一种方式是,每个任务在产生结果后将自己置于事件队列中等待更多工作。然后,调度器在需要时从事件队列中选择下一个任务来运行。这是一种 协同多任务处理 的形式。我从Simula [Dahl,1970]借用了这些关键思想(并予以致谢),以形成最早的C++库的基础(§ 19.1.2)。
这种设计的关键是:
- 多种不同的 协程 ,它们在调用之间保持状态。
- 一种 多态 形式,允许我们保持包含不同类型协程的事件列表,并独立于它们的类型调用它们。
- 一个 调度器 ,它从列表中选择下一个要运行的协程(或多个协程)。
在这里,我将只展示几个协程并交替执行它们。对于这样的系统来说,不使用太多空间是至关重要的。这就是为什么我们不使用进程或线程来处理这类应用的原因。一个线程需要一兆字节或两兆字节的空间(主要用于其堆栈),而协程通常只需要几十字节。如果你需要数千个任务,这会产生很大的差异。协程之间的上下文切换也比线程或进程之间的上下文切换快得多。
首先,我们需要一些运行时多态性,以便我们能够统一调用几十种或几百种不同类型的协程:
struct Event_base {
virtual void operator()() = 0;
virtual ~Event_base() {}
};
template<class Act>
struct Event : Event_base {
Event(const string n, Act a) : name{n}, act{move(a)} {}
string name;
Act act;
void operator()() override { act(); }
};
Event 简单地存储一个操作并允许调用它;该操作通常是一个协程。我添加了一个名称只是为了说明事件通常携带比协程句柄更多的信息。
下面是一个简单的用法示例:
void test()
{
vector<Event_base*> events = { // 创建几个包含协程的事件
new Event{ "integers ", sequencer(10) },
new Event{ "chars ", char_seq('a') }
};
vector<int> order {0, 1, 1, 0, 1, 0, 1, 0, 0}; // 选择一些顺序
for (int x : order) // 按顺序调用协程
(*events[x])();
for (auto p : events) // 清理
delete p;
}
到目前为止,这里并没有特别针对协程的内容;它只是一个常规的面向对象框架,用于在一组可能类型不同的对象上执行操作。然而, sequencer 和 char_seq 恰好是协程。它们在调用之间保持状态,这对于此类框架的实际应用至关重要:
task sequencer(int start, int step = 1)
{
auto value = start;
while (true) {
cout << "value: " << value << '\n'; // 传递结果
co_yield 0; // 休眠直到有人恢复此协程
value += step; // 更新状态
}
}
我们可以看到 sequencer 是一个协程,因为它使用了 co_yield 来在调用之间挂起自己。这意味着 task 必须是一个协程句柄(见下文)。
这是一个故意设计得非常简单的协程。它所做的只是生成一系列值并输出它们。在严肃的模拟中,该输出将直接或间接地成为其他协程的输入。
char_seq 非常相似,但类型不同,以锻炼运行时多态性:
task char_seq(char start)
{
auto value = start;
while (true) {
cout << "value: " << value << '\n';// 传递结果
co_yield 0;
++value;
}
}
“‘魔法’”在于返回类型 task ;它在调用之间保存协程的状态(实际上是函数的栈帧),并确定 co_yield 的含义。从用户的角度来看, task 是微不足道的,它只提供了一个运算符来调用协程:
struct task {
void operator()();
// ... 实现细节 ...
};
如果 task 在一个库中,最好是标准库中,那么我们就只需要知道这些,但它不在,所以这里有一点如何实现这种协程句柄类型的提示。不过,确实有提案,并且网络搜索可以找到很好的实现; [Cppcoro] 库就是一个例子。
我的 task 结构尽可能简洁,以实现我的关键示例:
struct task {
void operator()() { coro.resume(); }
struct promise_type { // 映射到语言特性
suspend_always initial_suspend() { return {}; }
suspend_always final_suspend() noexcept { return {}; } // co_return
suspend_always yield_value(int) { return {}; } // co_yield
auto get_return_object() { return task{ handle_type::from_promise(*this) }; }
void return_void() {}
void unhandled_exception() { exit(1); }
};
};
using handle_type = coroutine_handle<promise_type>;
task(handle_type h) : coro(h) {} // 由 get_return_object() 调用
handle_type coro; // 这里是协程句柄
我强烈建议您不要自己编写此类代码,除非您是库实现者,试图为他人省去麻烦。如果您对此感到好奇,网上有很多解释。
18.7 建议
- 使用并发来提高响应速度或提高吞吐量;§18.1。
- 尽可能在你能够承受的范围内,在最高级别的抽象层次上工作;§18.1。
- 考虑将进程作为线程的替代方案;§18.1。
- 标准库的并发设施是类型安全的;§18.1。
- 内存模型的存在是为了让大多数程序员不必考虑计算机的机器架构层面;§18.1。
- 内存模型使内存的表现大致符合天真的预期;§18.1。
- 原子操作允许进行无锁编程;§18.1。
- 把无锁编程留给专家去做;§18.1。
- 有时,顺序解决方案比并发解决方案更简单、更快;§18.1。
- 避免数据竞争;§18.1,§18.2。
- 优先使用并行算法,而不是直接使用并发;§18.1,§18.5.3。
- 线程 是系统线程的类型安全接口;§18.2。
- 使用 join() 等待线程完成;§18.2。
- 优先使用 jthread 而不是 thread ;§18.2。
- 尽可能避免显式共享数据;§18.2。
- 优先使用RAII(资源获取即初始化)而不是显式的 lock/unlock ;§18.3;[CG: CP.20]。
- 使用 scoped_lock 来管理 互斥锁 ;§18.3。
- 使用 scoped_lock 来获取多个锁;§18.3;[CG: CP.21]。
- 使用 shared_lock 来实现读写锁;§18.3。
- 与受保护的数据一起定义 互斥锁 ;§18.3;[CG: CP.50]。
- 对于非常简单的共享,使用 原子 操作;§18.3.2。
- 使用 condition_variable 来管理线程之间的通信;§18.4。
- 当你需要复制锁或需要更低级别的同步操作时,使用 unique_lock (而不是 scoped_lock );§18.4。
- 与 condition_variable 一起使用 unique_lock (而不是 scoped_lock );§18.4。
- 不要无条件地等待;§18.4;[CG: CP.42]。
- 尽量减少在临界区中的时间;§18.4 [CG: CP.43]。
- 从并发任务的角度来思考,而不是直接从线程的角度来思考;§18.5。
- 重视简洁性;§18.5。
- 优先使用 packaged_task 和 future ,而不是直接使用线程和互斥锁;§18.5。
- 使用 promise 返回一个结果,并使用 future 获取结果;§18.5.1;[CG: CP.60]。
- 使用 packaged_task 来处理任务抛出的异常;§18.5.2。
- 使用 packaged_task 和 future 来表达对外部服务的请求,并等待其响应;§18.5.2。
- 使用 async() 来启动简单的任务;§18.5.3;[CG: CP.61]。
- 使用 stop_token 来实现协作式终止;§18.5.4。
- 协程可能比线程小得多;§18.6。
- 优先使用协程支持库,而不是手工编写的代码;§18.6。