Skip to the content.

35 用 std::async 替代 std::thread

int f();
std::thread t{f};
int f();
std::future<int> ft = std::async(f);
int f() { return 1; }
auto ft = std::async(f);
int res = ft.get();
int f() noexcept;
std::thread t{f};  // 若无线程可用,仍会抛出异常
auto ft = std::async(f);  // 由标准库的实现者负责线程管理
auto ft = std::async(std::launch::async, f);

36 用 std::launch::async 指定异步求值

auto ft1 = std::async(f);  // 意义同下
auto ft2 = std::async(std::launch::async | std::launch::deferred, f);
auto ft = std::async(f);
/*
 * f 的 TLS 可能和一个独立线程相关
 * 但也可能与对 ft 调用 get 或 wait 的线程相关
 */
auto ft = std::async(f);
using namespace std::literals;

void f() { std::this_thread::sleep_for(1s); }

auto ft = std::async(f);
while (ft.wait_for(100ms) != std::future_status::ready) {
  // 循环至 f 运行完成,但这可能永远不会发生
}
auto ft = std::async(f);
if (ft.wait_for(0s) == std::future_status::deferred) {  // 任务被推迟
  // 使用 ft 的 wait 或 get 异步调用 f
} else {  // 任务未被推迟
  while (ft.wait_for(100ms) != std::future_status::ready) {
    // 任务未被推迟也未就绪,则做并发工作直至结束
  }
  // ft 准备就绪
}
auto ft = std::async(std::launch::async, f);  // 异步执行 f
template <typename F, typename... Args>
auto really_async(F&& f, Args&&... args)
    -> std::future<std::invoke_result_t<F, Args...>> {
  return std::async(std::launch::async, std::forward<F>(f),
                    std::forward<Args>(args)...);
}
/*
 * 异步运行 f
 * 如果 std::async 抛出异常则 really_async 也抛出异常
 */
auto ft = really_async(f);

37 RAII 线程管理

void f() {}

void g() {
  std::thread t{f};  // t.joinable() == true
}

int main() {
  g();  // g 运行结束时析构 t,但 t 未 join,导致程序终止
  do_something();  // 调用前程序已被终止
}
void f(int&) {}

void g() {
  int i = 1;
  std::thread t(f, i);
}  // 如果隐式 detach,局部变量 i 被销毁,但 f 仍在使用局部变量的引用
#include <thread>
#include <utility>

class ThreadGuard {
 public:
  enum class DtorAction { kJoin, kDetach };
  ThreadGuard(std::thread&& t, DtorAction a) : action_(a), t_(std::move(t)) {}
  ~ThreadGuard() {
    if (t_.joinable()) {
      if (action_ == DtorAction::kJoin) {
        t_.join();
      } else {
        t_.detach();
      }
    }
  }
  ThreadGuard(ThreadGuard&&) noexcept = default;
  ThreadGuard& operator=(ThreadGuard&&) = default;
  std::thread& get() { return t_; }

 private:
  DtorAction action_;
  std::thread t_;
};

void f() {
  ThreadGuard t{std::thread([] {}), ThreadGuard::DtorAction::kJoin};
}

int main() {
  f();
  /*
   * g 运行结束时将内部的 std::thread 置为 join,变为不可合并状态
   * 析构不可合并的 std::thread 不会导致程序终止
   * 这种手法带来了隐式 join 和隐式 detach 的问题,但可以调试
   */
}

38 std::future 的析构行为

std::promise<int> ps;
std::future<int> ft = ps.get_future();

std::shared_future<int> sf{std::move(ft)};
// 更简洁的写法是用 std::future::share() 返回 std::shared_future
// auto sf = ft.share();
auto sf2 = sf;
auto sf3 = sf;

std::vector<std::future<void>> v;  // 该容器可能在析构函数中阻塞

struct A {  // 该类型对象可能会在析构函数中阻塞
  std::shared_future<int> ft;
};
int f() { return 1; }
std::packaged_task<int()> pt(f);
auto ft = pt.get_future();     // ft 可以正常析构
std::thread t(std::move(pt));  // 创建一个线程来执行任务
int res = ft.get();
{
  std::packaged_task<int()> pt(f);
  auto ft = pt.get_future();  // ft 可以正常析构
  std::thread t(std::move(pt));
  ...  // t.join() 或 t.detach() 或无操作
}  // 如果t 不 join 不 detach,则此处 t 的析构程序终止
   // 如果 t 已 join,则 ft 析构时就无需阻塞
   // 如果 t 已 detach,则 ft 析构时就无需 detach
   // 因此 std::packaged_task 生成的 ft 一定可以正常析构

39 用 std::promisestd::future 之间的通信实现一次性通知

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

std::condition_variable cv;
std::mutex m;
bool flag = false;
std::string s = "hello";

void f() {
  std::unique_lock<std::mutex> lk{m};
  cv.wait(lk, [] {
    return flag;
  });  // lambda 返回 false 则阻塞,并在收到通知后重新检测
  std::cout << s;  // 若返回 true 则继续执行
}

int main() {
  std::thread t{f};
  {
    std::lock_guard<std::mutex> l{m};
    s += " world";
    flag = true;
    cv.notify_one();  // 发出通知
  }
  t.join();
}
#include <future>
#include <iostream>
#include <thread>

std::promise<void> p;

void f() {
  p.get_future().wait();  // 阻塞至 p.set_value()
  std::cout << 1;
}

int main() {
  std::thread t{f};
  p.set_value();  // 解除阻塞
  t.join();
}
#include <future>
#include <iostream>
#include <thread>

std::promise<void> p;

void f() { std::cout << 1; }

int main() {
  std::thread t([] {
    p.get_future().wait();
    f();
  });
  p.set_value();
  t.join();
}
#include <future>
#include <iostream>
#include <thread>

std::promise<void> p;

void f() { std::cout << 1; }

int main() {
  std::jthread t{[&] {
    p.get_future().wait();
    f();
  }};
  /*
   * 如果此处抛异常,则 set_value 不会被调用,wait 将永远不返回
   * 而 RAII 会在析构时调用 join,join 将一直等待线程完成
   * 但 wait 使线程永不完成
   * 因此如果此处抛出异常,析构函数永远不会完成,程序将失去效应
   */
  p.set_value();
}
#include <future>
#include <iostream>
#include <thread>
#include <vector>

std::promise<void> p;

void f(int x) { std::cout << x; }

int main() {
  std::vector<std::thread> v;
  auto sf = p.get_future().share();
  for (int i = 0; i < 10; ++i) {
    v.emplace_back([sf, i] {
      sf.wait();
      f(i);
    });
  }
  p.set_value();
  for (auto& x : v) {
    x.join();
  }
}

40 std::atomic 提供原子操作,volatile 禁止优化内存

#include <atomic>
#include <iostream>
#include <thread>

std::atomic<int> i{0};

void f() {
  ++i;  // 原子自增
  ++i;  // 原子自增
}

void g() { std::cout << i; }

int main() {
  std::thread t1{f};
  std::thread t2{g};  // 结果只能是 0 或 1 或 2
  t1.join();
  t2.join();
}
#include <atomic>
#include <iostream>
#include <thread>

volatile int i{0};

void f() {
  ++i;  // 读改写操作,非原子操作
  ++i;  // 读改写操作,非原子操作
}

void g() { std::cout << i; }

int main() {
  std::thread t1{f};
  std::thread t2{g};  // 存在数据竞争,值未定义
  t1.join();
  t2.join();
}
#include <atomic>
#include <iostream>
#include <thread>

std::atomic<bool> a{false};
int x = 0;

void f() {
  x = 1;  // 一定在 a 赋值为 true 之前执行
  a = true;
}

void g() {
  if (a) {
    std::cout << x;
  }
}

int main() {
  std::thread t1{f};
  std::thread t2{g};
  t1.join();
  t2.join();  // 不打印,或打印 1
}
#include <atomic>
#include <iostream>
#include <thread>

volatile bool a{false};
int x = 0;

void f() {
  x = 1;  // 可能被重排在 a 赋值为 true 之后
  a = true;
}

void g() {
  if (a) {
    std::cout << x;
  }
}

int main() {
  std::thread t1{f};
  std::thread t2{g};
  t1.join();
  t2.join();  // 不打印,或打印 0 或 1
}
int x = 42;
int y = x;
y = x;  // 冗余的初始化

// 优化为
int x = 42;
int y = x;
int x;
x = 10;
x = 20;

// 优化为
int x;
x = 20
int x = 42;
int y = x;
y = x;
x = 10;
x = 20;

// 优化为
int x = 42;
int y = x;
x = 20;
std::atomic<int> y{x.load()};
y.store(x.load());

// 优化为
register = x.load();           // 将 x 读入寄存器
std::atomic<int> y{register};  // 用寄存器值初始化 y
y.store(register);             // 将寄存器值存入 y
int current_temperature;   // 传感器中记录当前温度的变量
current_temperature = 25;  // 更新当前温度,这条语句不应该被消除
current_temperature = 26;