跳转至

Multithreading

简介

在C++11之前,涉及到多线程问题,都是和平台相关的,比如 windows 和 linux 下各有自己的接口,这使得代码的可移植性比较差。C++11中最重要的特性就是对线程进行支持了,使得C++在并行编程时不需要依赖第三方库,而且在原子操作中还引入了原子类的概念。

C++11大致增加了五个库,其中常见的有:线程库(thread)互斥量库(mutex)原子性操作库(atomic)条件变量库(condition_variable)

交替打印
/*
使用条件变量实现交替打印奇数和偶数
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;

int main()
{
    mutex mt;
    thread odd,even;
    condition_variable cv;
    bool f = false;
    auto f0 = [&]()
    {
        int now = 1;
        for(;;)
        {
            unique_lock<mutex> ulk(mt);
            if(!f)
                cv.wait(ulk);
            cout << this_thread::get_id() << ": " << now * 2 << endl;
            now++;
            // 打印后休眠 500 毫秒方便观察
            this_thread::sleep_for(chrono::milliseconds(500));
            f = false;
            cv.notify_one();
        }
    };

    auto f1 = [&]()
    {
        int now = 0;
        for(;;)
        {
            unique_lock<mutex> ulk(mt);
            if(f)
                cv.wait(ulk);
            cout << this_thread::get_id() << ": " << now * 2 + 1<< endl;
            now++;
            this_thread::sleep_for(chrono::milliseconds(500));
            f = true;
            cv.notify_one();
        }
    };

    odd = thread(f1);
    even = thread(f0);

    for(;;);

    return 0;
}

Futures

标准库提供了获取异步操作的获取返回值异常的工具。后续模板类和模板函数被定义在 <future>中。

异步操作

标准库提供了三个模板类来执行一些异步操作:std::asyncstd::packaged_taskstd::promise

  • std::promise:stores a value for asynchronous retrieval,它禁止拷贝,支持移动语义。

  • std::packaged_task:它是一个模板类,它会打包一个函数并且存储该函数的返回值。调用它会执行对应的函数,我们通过成员函数 get_future 获取一个 std::future 对象。它类似于 std::function

  • std::async:它是一个模板函数,运行它(可能会创建一个新线程)会返回一个 std::future 类型。其它的用法类似 std::thread

for asynchronous retrieval 就是后续获取这个异步操作的结果,要实现这个功能就要用到 std::future

std::future

std::future 提供了一种获取异步操作结果的方式。

在通过我们上文说到的方法获得 std::future 对象后,只要在需要的时候,通过 std::futureget 成员函数就可以获取异步操作的运行结果。

要注意的就是这里 get 操作只能进行一次,不然程序会崩。

下面是一些使用这些模板类的例子:

void sum(std::vector<int>::iterator begin
    , std::vector<int>::iterator end
    , std::promise<int> pro) {
    int res = std::accumulate(begin, end, 0);
    pro.set_value(res);    // 设置该异步操作的值
}

int main() {   
    std::vector<int> vec{ 1,2,3,4,5 };
    std::promise<int> pro;
    std::future<int> fut = pro.get_future();    // 获取一个 fut 等待该异步操作的结果
    std::thread worker(sum, vec.begin(), vec.end(), std::move(pro));

    // 在 std::promise 没有 set_value 前会一直阻塞
    std::cout << fut.get() << std::endl;
    worker.join();
    return 0;
}
auto gcd(int x, int y) -> int {
    if (x == 0) return y;
    if (y == 0) return x;
    return gcd(y, x % y);
}

int main() {   
    std::packaged_task<int(int, int)> f(gcd);
    f(16,12);   // 调用函数
    std::cout << f.get_future().get() << std::endl; // 获取函数运行结果
    return 0;
}

std::async 并不一定会开启一个新线程,我们可以通过选项 std::launch 来指定这个函数的具体行为:

  • std::launch::async : 创建一个新线程执行任务;
  • std::launch::deferred: 在 get 的时候才执行函数。

std::launch 是一个位图,当我们将两个选项都设置时,系统会根据情况选择调用方式,当不传递该参数时,效果就是两个选项都设置。

auto gcd(int x, int y) -> int {
    if (x == 0) return y;
    if (y == 0) return x;
    return gcd(y, x % y);
}

int main() {   
    // std::async 会返回一个 std::future 对象
    std::future<int> f = std::async(std::launch::async, gcd, 4, 12); 
    //std::future<int> f = std::async(std::launch::deferred,gcd, 4, 12); 

    std::cout << f.get() << std::endl;
    return 0;
}

std::shared_future

std::shared_future 允许多次 get 异步操作的结果。它可以通过一个 std::future 使用成员函数 share 获取,其他的用法都与 std::future 相同。