Skip to the content.

条件变量(condition variable)

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class A {
 public:
  void step1() {
    {
      std::lock_guard<std::mutex> l(m_);
      step1_done_ = true;
    }
    std::cout << 1;
    cv_.notify_one();
  }

  void step2() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return step1_done_; });
    step2_done_ = true;
    std::cout << 2;
    cv_.notify_one();
  }

  void step3() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return step2_done_; });
    std::cout << 3;
  }

 private:
  std::mutex m_;
  std::condition_variable cv_;
  bool step1_done_ = false;
  bool step2_done_ = false;
};

int main() {
  A a;
  std::thread t1(&A::step1, &a);
  std::thread t2(&A::step2, &a);
  std::thread t3(&A::step3, &a);
  t1.join();
  t2.join();
  t3.join();
}  // maybe: 123, 213, 231, 1-block
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class A {
 public:
  void wait1() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return done_; });
    std::cout << 1;
  }

  void wait2() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return done_; });
    std::cout << 2;
  }

  void signal() {
    {
      std::lock_guard<std::mutex> l(m_);
      done_ = true;
    }
    cv_.notify_all();
  }

 private:
  std::mutex m_;
  std::condition_variable cv_;
  bool done_ = false;
};

int main() {
  A a;
  std::thread t1(&A::wait1, &a);
  std::thread t2(&A::wait2, &a);
  std::thread t3(&A::signal, &a);
  t1.join();
  t2.join();
  t3.join();
}  // 12 or 21
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

class Mutex {
 public:
  void lock() {}
  void unlock() {}
};

class A {
 public:
  void signal() {
    std::cout << 1;
    cv_.notify_one();
  }

  void wait() {
    Mutex m;
    cv_.wait(m);
    std::cout << 2;
  }

 private:
  std::condition_variable_any cv_;
};

int main() {
  A a;
  std::thread t1(&A::signal, &a);
  std::thread t2(&A::wait, &a);
  t1.join();
  t2.join();
}  // 12
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>

template <typename T>
class ConcurrentQueue {
 public:
  ConcurrentQueue() = default;

  ConcurrentQueue(const ConcurrentQueue& rhs) {
    std::lock_guard<std::mutex> l(rhs.m_);
    q_ = rhs.q_;
  }

  void push(T x) {
    std::lock_guard<std::mutex> l(m_);
    q_.push(std::move(x));
    cv_.notify_one();
  }

  void wait_and_pop(T& res) {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return !q_.empty(); });
    res = std::move(q_.front());
    q_.pop();
  }

  std::shared_ptr<T> wait_and_pop() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return !q_.empty(); });
    auto res = std::make_shared<T>(std::move(q_.front()));
    q_.pop();
    return res;
  }

  bool try_pop(T& res) {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return false;
    }
    res = std::move(q_.front());
    q_.pop();
    return true;
  }

  std::shared_ptr<T> try_pop() {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return nullptr;
    }
    auto res = std::make_shared<T>(std::move(q_.front()));
    q_.pop();
    return res;
  }

  bool empty() const {
    std::lock_guard<std::mutex> l(m_);
    // 其他线程可能有此对象(拷贝构造)所以要上锁
    return q_.empty();
  }

 private:
  mutable std::mutex m_;
  std::condition_variable cv_;
  std::queue<T> q_;
};
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <utility>

template <typename T>
class ConcurrentQueue {
 public:
  ConcurrentQueue() = default;

  ConcurrentQueue(const ConcurrentQueue& rhs) {
    std::lock_guard<std::mutex> l(rhs.m_);
    q_ = rhs.q_;
  }

  void push(T x) {
    auto data = std::make_shared<T>(std::move(x));
    std::lock_guard<std::mutex> l(m_);
    q_.push(data);
    cv_.notify_one();
  }

  void wait_and_pop(T& res) {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return !q_.empty(); });
    res = std::move(*q_.front());
    q_.pop();
  }

  std::shared_ptr<T> wait_and_pop() {
    std::unique_lock<std::mutex> l(m_);
    cv_.wait(l, [this] { return !q_.empty(); });
    auto res = q_.front();
    q_.pop();
    return res;
  }

  bool try_pop(T& res) {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return false;
    }
    res = std::move(*q_.front());
    q_.pop();
    return true;
  }

  std::shared_ptr<T> try_pop() {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return nullptr;
    }
    auto res = q_.front();
    q_.pop();
    return res;
  }

  bool empty() const {
    std::lock_guard<std::mutex> l(m_);
    return q_.empty();
  }

 private:
  mutable std::mutex m_;
  std::condition_variable cv_;
  std::queue<std::shared_ptr<T>> q_;
};

信号量(semaphore)

#include <iostream>
#include <semaphore>
#include <thread>

class A {
 public:
  void wait1() {
    sem_.acquire();
    std::cout << 1;
  }

  void wait2() {
    sem_.acquire();
    std::cout << 2;
  }

  void signal() { sem_.release(2); }

 private:
  std::counting_semaphore<2> sem_{0};  // 初始值 0,最大值 2
};

int main() {
  A a;
  std::thread t1(&A::wait1, &a);
  std::thread t2(&A::wait2, &a);
  std::thread t3(&A::signal, &a);
  t1.join();
  t2.join();
  t3.join();
}  // 12 or 21
#include <iostream>
#include <semaphore>
#include <thread>

class A {
 public:
  void wait() {
    sem_.acquire();
    std::cout << 2;
  }

  void signal() {
    std::cout << 1;
    sem_.release();
  }

 private:
  std::binary_semaphore sem_{0};
};

int main() {
  A a;
  std::thread t1(&A::wait, &a);
  std::thread t2(&A::signal, &a);
  t1.join();
  t2.join();
}  // 12

屏障(barrier)

#include <barrier>
#include <cassert>
#include <iostream>
#include <thread>

class A {
 public:
  void f() {
    std::barrier sync_point{3, [&]() noexcept { ++i_; }};
    for (auto& x : tasks_) {
      x = std::thread([&] {
        std::cout << 1;
        sync_point.arrive_and_wait();
        assert(i_ == 1);
        std::cout << 2;
        sync_point.arrive_and_wait();
        assert(i_ == 2);
        std::cout << 3;
      });
    }
    for (auto& x : tasks_) {
      x.join();  // 析构 barrier 前 join 所有使用了 barrier 的线程
    }  // 析构 barrier 时,线程再调用 barrier 的成员函数是 undefined behavior
  }

 private:
  std::thread tasks_[3] = {};
  int i_ = 0;
};

int main() {
  A a;
  a.f();
}
#include <iostream>
#include <latch>
#include <string>
#include <thread>

class A {
 public:
  void f() {
    for (auto& x : data_) {
      x.t = std::jthread([&] {
        x.s += x.s;
        done_.count_down();
      });
    }
    done_.wait();
    for (auto& x : data_) {
      std::cout << x.s << std::endl;
    }
  }

 private:
  struct {
    std::string s;
    std::jthread t;
  } data_[3] = {
      {"hello"},
      {"down"},
      {"demo"},
  };

  std::latch done_{3};
};

int main() {
  A a;
  a.f();
}

期值(future)

#include <future>
#include <iostream>

class A {
 public:
  int f(int i) { return i; }
};

int main() {
  A a;
  std::future<int> res = std::async(&A::f, &a, 1);
  std::cout << res.get();  // 1,阻塞至线程返回结果
}
#include <future>
#include <iostream>

int main() {
  std::future<void> res = std::async([] {});
  res.get();
  try {
    res.get();
  } catch (const std::future_error& e) {
    std::cout << e.what() << std::endl;  // no state
  }
}
namespace std {
enum class launch {  // names for launch options passed to async
  async = 0x1,       // 运行新线程来执行任务
  deferred = 0x2     // 惰性求值,请求结果时才执行任务
};
}

// std::async 创建任务默认使用两者
std::async([] {
});  // 等价于 std::async(std::launch::async | std::launch::deferred, [] {})
#include <future>
#include <iostream>

int main() {
  std::packaged_task<int(int)> task([](int i) { return i; });
  task(1);  // 请求计算结果,内部的 future 将设置结果值
  std::future<int> res = task.get_future();
  std::cout << res.get();  // 1
}
#include <future>
#include <iostream>

int main() {
  std::promise<int> ps;
  ps.set_value(1);  // 内部的 future 将设置结果值
  std::future<int> res = ps.get_future();
  std::cout << res.get();  // 1
}
#include <chrono>
#include <future>
#include <iostream>

class A {
 public:
  void signal() {
    std::cout << 1;
    ps_.set_value();
  }

  void wait() {
    std::future<void> res = ps_.get_future();
    res.wait();
    std::cout << 2;
  }

 private:
  std::promise<void> ps_;
};

int main() {
  A a;
  std::thread t1{&A::signal, &a};
  std::thread t2{&A::wait, &a};
  t1.join();
  t2.join();
}
#include <chrono>
#include <future>
#include <iostream>

class A {
 public:
  void task() { std::cout << 1; }
  void wait_for_task() {
    ps_.get_future().wait();
    task();
  }
  void signal() { ps_.set_value(); }

 private:
  std::promise<void> ps_;
};

void task() { std::cout << 1; }

int main() {
  A a;
  std::thread t(&A::wait_for_task, &a);
  a.signal();
  t.join();
}
#include <future>
#include <iostream>

int main() {
  std::promise<void> ps;
  auto a = ps.get_future();
  try {
    auto b = ps.get_future();
  } catch (const std::future_error& e) {
    std::cout << e.what() << std::endl;  // future already retrieved
  }
}
#include <future>
#include <iostream>
#include <stdexcept>

int main() {
  std::future<void> res = std::async([] { throw std::logic_error("error"); });
  try {
    res.get();
  } catch (const std::exception& e) {
    std::cout << e.what() << std::endl;
  }
}
#include <future>
#include <iostream>
#include <stdexcept>

int main() {
  std::promise<void> ps;
  try {
    throw std::logic_error("error");
  } catch (...) {
    ps.set_exception(std::current_exception());
  }
  auto res = ps.get_future();
  try {
    res.get();
  } catch (const std::exception& e) {
    std::cout << e.what() << std::endl;
  }
}
#include <future>
#include <iostream>
#include <stdexcept>

int main() {
  std::promise<int> ps;
  try {
    ps.set_value([] {
      throw std::logic_error("error");
      return 0;
    }());
  } catch (const std::exception& e) {
    std::cout << e.what() << std::endl;
  }
  ps.set_value(1);
  auto res = ps.get_future();
  std::cout << res.get();  // 1
}
#include <future>
#include <iostream>

int main() {
  std::future<void> ft1;
  std::future<void> ft2;
  {
    std::packaged_task<void()> task([] {});
    std::promise<void> ps;
    ft1 = task.get_future();
    ft2 = ps.get_future();
  }
  try {
    ft1.get();
  } catch (const std::future_error& e) {
    std::cout << e.what() << std::endl;  // broken promise
  }
  try {
    ft2.get();
  } catch (const std::future_error& e) {
    std::cout << e.what() << std::endl;  // broken promise
  }
}
#include <future>

int main() {
  std::promise<void> ps;
  std::future<void> ft = ps.get_future();
  std::shared_future<void> sf(std::move(ft));
  // 或直接 std::shared_future<void> sf{ps.get_future()};
  ps.set_value();
  sf.get();
  sf.get();
}
#include <future>

int main() {
  std::promise<void> ps;
  auto sf = ps.get_future().share();
  ps.set_value();
  sf.get();
  sf.get();
}

时钟

#ifdef _WIN32
#include <chrono>
#elif defined __GNUC__
#include <time.h>
#endif

long long now_in_ns() {
#ifdef _WIN32
  return std::chrono::duration_cast<std::chrono::nanoseconds>(
             std::chrono::system_clock::now().time_since_epoch())
      .count();
#elif defined __GNUC__
  struct timespec t;
  clockid_t clk_id = CLOCK_REALTIME;
  clock_gettime(clk_id, &t);
  return t.tv_sec * 1e9 + t.tv_nsec;
#endif
}
#include <chrono>
#include <iomanip>
#include <iostream>

int main() {
  std::chrono::system_clock::time_point t = std::chrono::system_clock::now();
  std::time_t c = std::chrono::system_clock::to_time_t(t);  // UNIX 时间戳,秒
  //  %F 即 %Y-%m-%d,%T 即 %H:%M:%S,如 2011-11-11 11:11:11
  std::cout << std::put_time(std::localtime(&c), "%F %T");
}
namespace std {
namespace chrono {
using nanoseconds = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds = duration<long long>;
using minutes = duration<int, ratio<60>>;
using hours = duration<int, ratio<3600>>;
// C++20
using days = duration<int, ratio_multiply<ratio<24>, hours::period>>;
using weeks = duration<int, ratio_multiply<ratio<7>, days::period>>;
using years = duration<int, ratio_multiply<ratio<146097, 400>, days::period>>;
using months = duration<int, ratio_divide<years::period, ratio<12>>>;
}  // namespace chrono
}  // namespace std
#include <cassert>
#include <chrono>

using namespace std::literals::chrono_literals;

int main() {
  auto a = 45min;
  assert(a.count() == 45);
  auto b = std::chrono::duration_cast<std::chrono::seconds>(a);
  assert(b.count() == 2700);
  auto c = std::chrono::duration_cast<std::chrono::hours>(a);
  assert(c.count() == 0);  // 转换会截断
}
#include <cassert>
#include <chrono>

using namespace std::literals::chrono_literals;

int main() {
  assert((1h - 2 * 15min).count() == 30);
  assert((0.5h + 2 * 15min + 60s).count() == 3660);
}
#include <chrono>
#include <future>
#include <iostream>
#include <thread>

int f() {
  std::this_thread::sleep_for(std::chrono::seconds(1));
  return 1;
}

int main() {
  auto res = std::async(f);
  if (res.wait_for(std::chrono::seconds(5)) == std::future_status::ready) {
    std::cout << res.get();
  }
}
// 第一个模板参数为开始时间点的时钟类型,第二个为时间单位
std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>
#include <cassert>
#include <chrono>

int main() {
  std::chrono::system_clock::time_point a = std::chrono::system_clock::now();
  std::chrono::system_clock::time_point b = a + std::chrono::hours(1);
  long long diff =
      std::chrono::duration_cast<std::chrono::seconds>(b - a).count();
  assert(diff == 3600);
}
#include <cstdint>

#ifdef _WIN32
#include <intrin.h>
#endif

static inline std::uint64_t read_tsc() {
#ifdef _WIN32
  return __rdtsc();
#elif defined __GNUC__
  std::uint64_t res;
  __asm__ __volatile__(
      "rdtsc;"
      "shl $32, %%rdx;"
      "or %%rdx, %%rax"
      : "=a"(res)
      :
      : "%rcx", "%rdx");
  return res;
#endif
}

static inline std::uint64_t read_tscp() {
#ifdef _WIN32
  std::uint32_t t;
  return __rdtscp(&t);
#elif defined __GNUC__
  std::uint64_t res;
  __asm__ __volatile__(
      "rdtscp;"
      "shl $32, %%rdx;"
      "or %%rdx, %%rax"
      : "=a"(res)
      :
      : "%rcx", "%rdx");
  return res;
#endif
}

static inline void fence() {
#ifdef _WIN32
  __faststorefence();
#elif defined __GNUC__
  __asm__ __volatile__("mfence" : : : "memory");
#endif
}

inline std::uint64_t tsc_begin() {
  std::uint64_t res = read_tsc();
  fence();
  return res;
}

inline std::uint64_t tsc_mid() {
  std::uint64_t res = read_tscp();
  fence();
  return res;
}

inline std::uint64_t tsc_end() { return read_tscp(); }

函数式编程(functional programming)

quickSort :: Ord a => [a] -> [a]
quickSort [] = []
quickSort (x : xs) = l ++ [x] ++ r
  where
    l = quickSort (filter (<= x) xs)
    r = quickSort (filter (> x) xs)

main :: IO ()
main = print (quickSort "downdemo") -- "ddemnoow"
#include <algorithm>
#include <iostream>
#include <list>
#include <utility>

template <typename T>
std::list<T> quick_sort(std::list<T> v) {
  if (v.empty()) {
    return v;
  }
  std::list<T> res;
  res.splice(res.begin(), v, v.begin());  // 将 v 的首元素移到 res 中
  // 将 v 按条件划分为两部分,并返回第一个不满足条件元素的迭代器
  auto it = std::partition(v.begin(), v.end(),
                           [&](const T& x) { return x < res.front(); });
  std::list<T> low;
  low.splice(low.end(), v, v.begin(), it);  // 转移左半部分到 low
  auto l(quick_sort(std::move(low)));       // 递归对左半部分快速排序
  auto r(quick_sort(std::move(v)));         // 递归对右半部分快速排序
  res.splice(res.end(), r);                 // 右半部分移到结果后
  res.splice(res.begin(), l);               // 左半部分移到结果前
  return res;
}

int main() {
  for (auto& x : quick_sort(std::list<int>{1, 3, 2, 4, 5})) {
    std::cout << x;  // 12345
  }
}
#include <algorithm>
#include <future>
#include <iostream>
#include <list>
#include <utility>

template <typename T>
std::list<T> quick_sort(std::list<T> v) {
  if (v.empty()) {
    return v;
  }
  std::list<T> res;
  res.splice(res.begin(), v, v.begin());
  auto it = std::partition(v.begin(), v.end(),
                           [&](const T& x) { return x < res.front(); });
  std::list<T> low;
  low.splice(low.end(), v, v.begin(), it);
  // 用另一个线程对左半部分排序
  std::future<std::list<T>> l(std::async(&quick_sort<T>, std::move(low)));
  auto r(quick_sort(std::move(v)));
  res.splice(res.end(), r);
  res.splice(res.begin(), l.get());
  return res;
}

int main() {
  for (auto& x : quick_sort(std::list<int>{1, 3, 2, 4, 5})) {
    std::cout << x;  // 12345
  }
}

链式调用

import { interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const source1$ = interval(500);
const source2$ = interval(1000);
source1$.pipe(withLatestFrom(source2$, (x, y) => `${x}${y}`));  // 10 20 31 41 52 62---
int f(std::experimental::future<int>);

std::experimental::future<int> eft;
auto ft1 = eft();  // std::experimental::future 由本身的构造函数生成
// 与 std::async 不同,不能传入 f 的参数
// 因为参数已经在运行库中定义为了一个就绪的期值
// 这里 f 的返回 int,因此参数就是 std::experimental::future<int>
auto ft2 = ft1.then(f);
// then 后原期值就无效了
assert(!ft1.valid());
assert(ft2.valid());
template <typename F>
std::experimental::future<decltype(std::declval<F>()())> new_async(F&& func) {
  std::experimental::promise<decltype(std::declval<F>()())> p;
  auto ft = p.get_future();
  std::thread t([p = std::move(p), f = std::decay_t<F>(func)]() mutable {
    try {
      p.set_value_at_thread_exit(f());
    } catch (...) {
      p.set_exception_at_thread_exit(std::current_exception());
    }
  });
  t.detach();
  return ft;
}
void process_login(const std::string& username, const std::string& password) {
  try {
    const user_id id = backend.authenticate_user(username, password);
    const user_data info_to_display = backend.request_current_info(id);
    update_display(info_to_display);
  } catch (std::exception& e) {
    display_error(e);
  }
}
std::future<void> process_login(const std::string& username,
                                const std::string& password) {
  return std::async(std::launch::async, [=]() {
    try {
      const user_id id = backend.authenticate_user(username, password);
      const user_data info_to_display = backend.request_current_info(id);
      update_display(info_to_display);
    } catch (std::exception& e) {
      display_error(e);
    }
  });
}
std::experimental::future<void> process_login(const std::string& username,
                                              const std::string& password) {
  return new_async(
             [=]() { return backend.authenticate_user(username, password); })
      .then([](std::experimental::future<user_id> id) {
        return backend.request_current_info(id.get());
      })
      .then([](std::experimental::future<user_data> info_to_display) {
        try {
          update_display(info_to_display.get());
        } catch (std::exception& e) {
          display_error(e);
        }
      });
}
std::experimental::future<void> process_login(const std::string& username,
                                              const std::string& password) {
  return backend.async_authenticate_user(username, password)
      .then([](std::experimental::future<user_id> id) {
        return backend.async_request_current_info(id.get());
      })
      .then([](std::experimental::future<user_data> info_to_display) {
        try {
          update_display(info_to_display.get());
        } catch (std::exception& e) {
          display_error(e);
        }
      });
}
std::experimental::future<void> process_login(const std::string& username,
                                              const std::string& password) {
  return backend.async_authenticate_user(username, password)
      .then(
          [](auto id) { return backend.async_request_current_info(id.get()); })
      .then([](auto info_to_display) {
        try {
          update_display(info_to_display.get());
        } catch (std::exception& e) {
          display_error(e);
        }
      });
}
auto ft1 = new_async(some_function).share();
auto ft2 = ft1.then(
    [](std::experimental::shared_future<some_data> data) { do_stuff(data); });
auto ft3 = ft1.then([](std::experimental::shared_future<some_data> data) {
  return do_other_stuff(data);
});
std::future<FinalResult> process_data(std::vector<MyData>& vec) {
  const size_t chunk_size = whatever;
  std::vector<std::future<ChunkResult>> res;
  for (auto begin = vec.begin(), end = vec.end(); beg ! = end;) {
    const size_t remaining_size = end - begin;
    const size_t this_chunk_size = std::min(remaining_size, chunk_size);
    res.push_back(std::async(process_chunk, begin, begin + this_chunk_size));
    begin += this_chunk_size;
  }
  return std::async([all_results = std::move(res)]() {
    std::vector<ChunkResult> v;
    v.reserve(all_results.size());
    for (auto& f : all_results) {
      v.push_back(f.get());  // 这里会导致反复唤醒,增加了很多开销
    }
    return gather_results(v);
  });
}
std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) {
  const size_t chunk_size = whatever;
  std::vector<std::experimental::future<ChunkResult>> res;
  for (auto begin = vec.begin(), end = vec.end(); beg ! = end;) {
    const size_t remaining_size = end - begin;
    const size_t this_chunk_size = std::min(remaining_size, chunk_size);
    res.push_back(new_async(process_chunk, begin, begin + this_chunk_size));
    begin += this_chunk_size;
  }
  return std::experimental::when_all(res.begin(), res.end())
      .then([](std::future<std::vector<std::experimental::future<ChunkResult>>>
                   ready_results) {
        std::vector<std::experimental::future<ChunkResult>> all_results =
            ready_results.get();
        std::vector<ChunkResult> v;
        v.reserve(all_results.size());
        for (auto& f : all_results) {
          v.push_back(f.get());
        }
        return gather_results(v);
      });
}
std::experimental::future<FinalResult> find_and_process_value(
    std::vector<MyData>& data) {
  const unsigned concurrency = std::thread::hardware_concurrency();
  const unsigned num_tasks = (concurrency > 0) ? concurrency : 2;
  std::vector<std::experimental::future<MyData*>> res;
  const auto chunk_size = (data.size() + num_tasks - 1) / num_tasks;
  auto chunk_begin = data.begin();
  std::shared_ptr<std::atomic<bool>> done_flag =
      std::make_shared<std::atomic<bool>>(false);
  for (unsigned i = 0; i < num_tasks; ++i) {  // 生成异步任务到 res 中
    auto chunk_end =
        (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
    res.push_back(new_async([=] {
      for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end);
           ++entry) {
        if (matches_find_criteria(*entry)) {
          *done_flag = true;
          return &*entry;
        }
      }
      return (MyData**)nullptr;
    }));
    chunk_begin = chunk_end;
  }
  std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
      std::make_shared<std::experimental::promise<FinalResult>>();

  struct DoneCheck {
    std::shared_ptr<std::experimental::promise<FinalResult>> final_result;

    DoneCheck(
        std::shared_ptr<std::experimental::promise<FinalResult>> final_result_)
        : final_result(std::move(final_result_)) {}

    void operator()(
        std::experimental::future<std::experimental::when_any_result<
            std::vector<std::experimental::future<MyData*>>>>
            res_param) {
      auto res = res_param.get();
      MyData* const ready_result =
          res.futures[res.index].get();  // 从就绪的期值中获取值
      // 找到符合条件的值则处理结果并 set_value
      if (ready_result) {
        final_result->set_value(process_found_value(*ready_result));
      } else {
        res.futures.erase(res.futures.begin() + res.index);  // 否则丢弃值
        if (!res.futures.empty()) {  // 如果还有需要检查的值则再次调用 when_any
          std::experimental::when_any(res.futures.begin(), res.futures.end())
              .then(std::move(*this));
        } else {  // 如果没有其他期值则在 promise 中设置一个异常
          final_result->set_exception(
              std::make_exception_ptr(std::runtime_error("Not found")));
        }
      }
    }
  };
  std::experimental::when_any(res.begin(), res.end())
      .then(DoneCheck(final_result));
  return final_result->get_future();
}
std::experimental::future<int> ft1 = new_async(f1);
std::experimental::future<std::string> ft2 = new_async(f2);
std::experimental::future<double> ft3 = new_async(f3);
std::experimental::future<std::tuple<std::experimental::future<int>,
                                     std::experimental::future<std::string>,
                                     std::experimental::future<double>>>
    res = std::experimental::when_all(std::move(ft1), std::move(ft2),
                                      std::move(ft3));

CSP(Communicating Sequential Processes)