Skip to the content.

线程间的工作划分

开始处理之前的线程间数据划分

递归划分数据

#include <algorithm>
#include <cassert>
#include <future>
#include <list>

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> v) {
  if (v.empty()) {
    return {};
  }
  std::list<T> res;
  res.splice(res.begin(), v, v.begin());
  auto it = std::partition(v.begin(), v.end(),
                           [&](const T& x) { return x < res.front(); });
  std::list<T> low;
  low.splice(low.end(), v, v.begin(), it);
  std::future<std::list<T>> l(
      std::async(&parallel_quick_sort<T>, std::move(low)));
  auto r(parallel_quick_sort(std::move(v)));
  res.splice(res.end(), r);
  res.splice(res.begin(), l.get());
  return res;
}

int main() {
  std::list<int> input = {5, 1, 9, 2, 9, 100, 8};
  std::list<int> expected = {1, 2, 5, 8, 9, 9, 100};
  assert(parallel_quick_sort(input) == expected);
}
#include <algorithm>
#include <atomic>
#include <future>
#include <list>
#include <memory>
#include <thread>
#include <vector>

#include "concurrent_stack.hpp"

template <typename T>
class Sorter {
 public:
  Sorter() : max_thread_count(std::thread::hardware_concurrency() - 1) {}

  ~Sorter() {
    end_of_data = true;
    for (auto& x : threads) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

  std::list<T> do_sort(std::list<T>& v) {
    if (v.empty()) {
      return {};
    }
    std::list<T> res;
    res.splice(res.begin(), v, v.begin());
    auto it = std::partition(v.begin(), v.end(),
                             [&](const T& x) { return x < res.front(); });
    ChunkToSort low;
    low.data.splice(low.data.end(), v, v.begin(), it);
    std::future<std::list<T>> l = low.promise.get_future();
    chunks.push(std::move(low));
    if (threads.size() < max_thread_count) {
      threads.emplace_back(&Sorter<T>::sort_thread, this);
    }
    auto r{do_sort(v)};
    res.splice(res.end(), r);
    while (l.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
      try_sort_chunk();
    }
    res.splice(res.begin(), l.get());
    return res;
  }

 private:
  struct ChunkToSort {
    std::list<T> data;
    std::promise<std::list<T>> promise;
  };

 private:
  void sort_chunk(const std::shared_ptr<ChunkToSort>& chunk) {
    chunk->promise.set_value(do_sort(chunk->data));
  }

  void try_sort_chunk() {
    std::shared_ptr<ChunkToSort> chunk = chunks.pop();
    if (chunk) {
      sort_chunk(chunk);
    }
  }

  void sort_thread() {
    while (!end_of_data) {
      try_sort_chunk();
      std::this_thread::yield();
    }
  }

 private:
  ConcurrentStack<ChunkToSort> chunks;
  std::vector<std::thread> threads;
  const std::size_t max_thread_count;
  std::atomic<bool> end_of_data = false;
};

template <typename T>
std::list<T> parallel_quick_sort(std::list<T> v) {
  if (v.empty()) {
    return {};
  }
  return Sorter<T>{}.do_sort(v);
}

基于任务划分

// 非 pipeline:每 20 秒 4 个数据(每个数据仍要 20 秒)
线程A-1- -1- -1- -1- -5- -5- -5- -5-
线程B-2- -2- -2- -2- -6- -6- -6- -6-
线程C-3- -3- -3- -3- -7- -7- -7- -7-
线程D-4- -4- -4- -4- -8- -8- -8- -8-

// pipeline:第一个数据 20 秒,之后每个 5 秒
线程A-1- -2- -3- -4- -5- -6- -7- -8-
线程B--- -1- -2- -3- -4- -5- -6- -7-
线程C--- --- -1- -2- -3- -4- -5- -6-
线程D--- --- --- -1- -2- -3- -4- -5-

影响并发代码性能的因素

处理器数量

乒乓缓存(cache ping-pong)

std::atomic<std::size_t> n(0);

void f() {  // 任何线程都能调用
  // 每次n自增,处理器都要确保 cache 中的拷贝是最新的
  // 修改值后再告知其他处理器
  // fetch_add 是读改写操作,每次都要检索最新值
  // 如果另一线程在另一处理器运行此代码
  // n 的数据就要在两个处理器之间来回传递
  // 这样 n 增加时两个处理器的 cache 才能有最新值
  while (n.fetch_add(1, std::memory_order_relaxed) < 100000000) {
    task();  // 如果很快完成或者有很多处理器运行此代码,处理器就要互相等待
    // 一个处理器在更新值,另一个更新值的处理器就要等待
    // 直到第一个更新完成并把改变传过来
    // 这种情况就是 high contention
    // 反之处理器很少要互相等待的情况就是 low contention
    // 在类似这样的循环中,n 的数据在 cache 之间来回传递
    // 这就是 cache ping-pong
  }
}
std::mutex m;

void f() {
  while (true) {
    std::lock_guard<std::mutex> l(m);  // 现在需要来回传递的是 m
    if (done_processing(data)) {
      break;
    }
  }
}

伪共享(false sharing)

data proximity

oversubscription

适用多线程性能的数据结构

struct ProtectedData {
  std::mutex m;
  // 使用超过一个 cache line 字节数的填充即可
  char padding[std::hardware_destructive_interference_size];
  // 不支持 C++17 则可以 padding[65536];
  Data data_to_protect;
};
struct Data {
  data_item1 d1;
  data_item2 d2;
  char padding[std::hardware_destructive_interference_size];
};

Data some_array[256];

并发设计的其他注意事项

并发算法的异常安全

#include <algorithm>
#include <functional>
#include <numeric>
#include <thread>
#include <vector>

template <typename Iterator, typename T>
struct accumulate_block {
  void operator()(Iterator first, Iterator last, T& res) {
    res = std::accumulate(first, last, res);  // 可能抛异常
  }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  std::size_t len = std::distance(first, last);  // 此时没做任何事,抛异常无影响
  if (!len) {
    return init;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::vector<T> res(num_threads);  // 仍未做任何事,抛异常无影响
  std::vector<std::thread> threads(num_threads - 1);  // 同上
  Iterator block_start = first;                       // 同上
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_end = block_start;  // 同上
    std::advance(block_end, block_size);
    // 下面创建 std::thread,抛异常就导致析构对象,并调用 std::terminate
    // 终止程序
    threads[i] = std::thread(accumulate_block<Iterator, T>{}, block_start,
                             block_end, std::ref(res[i]));
    block_start = block_end;
  }
  // accumulate_block::operator() 调用的 std::accumulate
  // 可能抛异常,此时抛异常造成问题同上
  accumulate_block<Iterator, T>()(block_start, last, res[num_threads - 1]);
  std::for_each(threads.begin(), threads.end(),
                std::mem_fn(&std::thread::join));
  // 最后调用 std::accumulate 可能抛异常,但不引发大问题,因为所有线程已 join
  return std::accumulate(res.begin(), res.end(), init);
}
#include <algorithm>
#include <functional>
#include <future>
#include <numeric>
#include <thread>
#include <vector>

template <typename Iterator, typename T>
struct accumulate_block {
  T operator()(Iterator first, Iterator last) {
    return std::accumulate(first, last, T{});
  }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  std::size_t len = std::distance(first, last);
  if (!len) {
    return init;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::vector<std::future<T>> fts(num_threads - 1);  // 改用 std::future 获取值
  std::vector<std::thread> threads(num_threads - 1);
  Iterator block_start = first;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, block_size);
    // 用 std::packaged_task 替代直接创建 std::thread
    std::packaged_task<T(Iterator, Iterator)> pt(
        accumulate_block<Iterator, T>{});
    fts[i] = pt.get_future();
    threads[i] = std::thread(std::move(pt), block_start, block_end);
    block_start = block_end;
  }
  T last_res = accumulate_block<Iterator, T>{}(block_start, last);
  std::for_each(threads.begin(), threads.end(),
                std::mem_fn(&std::thread::join));
  T res = init;
  try {
    for (std::size_t i = 0; i < num_threads - 1; ++i) {
      res += fts[i].get();
    }
    res += last_res;
  } catch (...) {
    for (auto& x : threads) {
      if (x.joinable()) {
        x.join();
      }
    }
    throw;
  }
  return res;
}
#include <algorithm>
#include <functional>
#include <future>
#include <numeric>
#include <thread>
#include <vector>

class threads_guard {
 public:
  explicit threads_guard(std::vector<std::thread>& threads)
      : threads_(threads) {}

  ~threads_guard() {
    for (auto& x : threads_) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

 private:
  std::vector<std::thread>& threads_;
};

template <typename Iterator, typename T>
struct accumulate_block {
  T operator()(Iterator first, Iterator last) {
    return std::accumulate(first, last, T{});
  }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  std::size_t len = std::distance(first, last);
  if (!len) {
    return init;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::vector<std::future<T>> fts(num_threads - 1);
  std::vector<std::thread> threads(num_threads - 1);
  threads_guard g{threads};  // threads 元素析构时自动 join
  Iterator block_start = first;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, block_size);
    std::packaged_task<T(Iterator, Iterator)> pt(
        accumulate_block<Iterator, T>{});
    fts[i] = pt.get_future();
    threads[i] = std::thread(std::move(pt), block_start, block_end);
    block_start = block_end;
  }
  T last_res = accumulate_block<Iterator, T>{}(block_start, last);
  std::for_each(threads.begin(), threads.end(),
                std::mem_fn(&std::thread::join));
  T res = init;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    res += fts[i].get();
  }
  res += last_res;
  return res;
}
#include <algorithm>
#include <functional>
#include <future>
#include <numeric>
#include <thread>
#include <vector>

template <typename Iterator, typename T>
struct accumulate_block {
  T operator()(Iterator first, Iterator last) {
    return std::accumulate(first, last, T{});
  }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  std::size_t len = std::distance(first, last);
  if (!len) {
    return init;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::vector<std::future<T>> fts(num_threads - 1);
  std::vector<std::jthread> threads(num_threads - 1);
  Iterator block_start = first;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, block_size);
    std::packaged_task<T(Iterator, Iterator)> pt(
        accumulate_block<Iterator, T>{});
    fts[i] = pt.get_future();
    threads[i] = std::jthread(std::move(pt), block_start, block_end);
    block_start = block_end;
  }
  T last_res = accumulate_block<Iterator, T>{}(block_start, last);
  std::for_each(threads.begin(), threads.end(),
                std::mem_fn(&std::jthread::join));
  T res = init;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    res += fts[i].get();
  }
  res += last_res;
  return res;
}
#include <future>
#include <numeric>

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
  std::size_t len = std::distance(first, last);
  std::size_t max_chunk_size = 25;
  if (len <= max_chunk_size) {
    return std::accumulate(first, last, init);
  }
  Iterator mid_point = first;
  std::advance(mid_point, len / 2);
  std::future<T> l =
      std::async(parallel_accumulate<Iterator, T>, first, mid_point, init);
  // 递归调用如果抛出异常,std::async 创建的 std::future 将在异常传播时被析构
  T r = parallel_accumulate(mid_point, last, T{});
  // 如果异步任务抛出异常,get 就会捕获异常并重新抛出
  return l.get() + r;
}

可扩展性与阿姆达尔定律(Amdahl’s law)

S = 1 / (a + ( 1 - a ) / N) // a 为串行部分占比,N 为处理器倍数,S 为性能倍数
// 正常情况下 S < 1 /a,最理想的情况是 a 为 0,S = N

用多线程隐藏延迟(lantency)

用并发提高响应度(responsiveness)

while (true) {
  event_data event = get_event();
  if (event.type == quit) {
    break;
  }
  process(event);
}
std::thread task_thread;
std::atomic<bool> task_cancelled(false);

void gui_thread() {
  while (true) {
    event_data event = get_event();
    if (event.type == quit) {
      break;
    }
    process(event);
  }
}

void task() {
  while (!task_complete() && !task_cancelled) do_next_operation();
  if (task_cancelled) {
    perform_cleanup();
  } else {
    post_gui_event(task_complete);
  }
}

void process(const event_data& event) {
  switch (event.type) {
    case start_task:
      task_cancelled = false;
      task_thread = std::thread(task);
      break;
    case stop_task:
      task_cancelled = true;
      task_thread.join();
      break;
    case task_complete:
      task_thread.join();
      display_results();
      break;
    default:
      ...
  }
}

实践

并行版 std::for_each

#include <algorithm>
#include <future>
#include <thread>
#include <vector>

template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
  std::size_t len = std::distance(first, last);
  if (!len) {
    return;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::vector<std::future<void>> fts(num_threads - 1);
  std::vector<std::jthread> threads(num_threads - 1);
  Iterator block_start = first;
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_end = block_start;
    std::advance(block_end, block_size);
    std::packaged_task<void(void)> pt(
        [=] { std::for_each(block_start, block_end, f); });
    fts[i] = pt.get_future();
    threads[i] = std::jthread(std::move(pt));
    block_start = block_end;
  }
  std::for_each(block_start, last, f);
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    fts[i].get();  // 只是为了传递异常
  }
}
#include <algorithm>
#include <future>

template <typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f) {
  std::size_t len = std::distance(first, last);
  if (!len) {
    return;
  }
  std::size_t min_per_thread = 25;
  if (len < 2 * min_per_thread) {
    std::for_each(first, last, f);
    return;
  }
  const Iterator mid_point = first + len / 2;
  std::future<void> l =
      std::async(&parallel_for_each<Iterator, Func>, first, mid_point, f);
  parallel_for_each(mid_point, last, f);
  l.get();
}

并行版 std::find

#include <algorithm>
#include <atomic>
#include <functional>
#include <future>
#include <numeric>
#include <thread>
#include <vector>

template <typename Iterator, typename T>
Iterator parallel_find(Iterator first, Iterator last, T match) {
  struct find_element {
    void operator()(Iterator begin, Iterator end, T match,
                    std::promise<Iterator>* res, std::atomic<bool>* done_flag) {
      try {
        for (; begin != end && !done_flag->load(); ++begin) {
          if (*begin == match) {
            res->set_value(begin);
            done_flag->store(true);
            return;
          }
        }
      } catch (...) {
        try {
          res->set_exception(std::current_exception());
          done_flag->store(true);
        } catch (...) {
        }
      }
    }
  };

  std::size_t len = std::distance(first, last);
  if (!len) {
    return last;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  std::promise<Iterator> res;
  std::atomic<bool> done_flag(false);
  {
    std::vector<std::jthread> threads(num_threads - 1);
    Iterator block_start = first;
    for (auto& x : threads) {
      Iterator block_end = block_start;
      std::advance(block_end, block_size);
      x = std::jthread(find_element{}, block_start, block_end, match, &res,
                       &done_flag);
      block_start = block_end;
    }
    find_element{}(block_start, last, match, &res, &done_flag);
  }
  if (!done_flag.load()) {
    return last;
  }
  return res.get_future().get();
}
#include <atomic>
#include <future>

template <typename Iterator, typename T>
Iterator parallel_find_impl(Iterator first, Iterator last, T match,
                            std::atomic<bool>& done_flag) {
  try {
    std::size_t len = std::distance(first, last);
    std::size_t min_per_thread = 25;
    if (len < (2 * min_per_thread)) {
      for (; first != last && !done_flag.load(); ++first) {
        if (*first == match) {
          done_flag = true;
          return first;
        }
      }
      return last;
    }
    const Iterator mid_point = first + len / 2;
    std::future<Iterator> async_res =
        std::async(&parallel_find_impl<Iterator, T>, mid_point, last, match,
                   std::ref(done_flag));
    const Iterator direct_res =
        parallel_find_impl(first, mid_point, match, done_flag);
    return direct_res == mid_point ? async_res.get() : direct_res;
  } catch (...) {
    done_flag = true;
    throw;
  }
}

template <typename Iterator, typename T>
Iterator parallel_find(Iterator first, Iterator last, T match) {
  std::atomic<bool> done_flag(false);
  return parallel_find_impl(first, last, match, done_flag);
}

并行版 std::partial_sum

#include <numeric>
#include <vector>

int main() {
  std::vector<int> v{1, 2, 3, 4};
  std::partial_sum(
      v.begin(), v.end(),
      std::ostream_iterator<int>(std::cout << "hi"),  // 输出到的迭代器起始位置
      std::plus<int>{});  // 使用的二元运算符,不指定则默认累加
}  // 输出 hi13610
template <class InputIt, class OutputIt, class BinaryOperation>
OutputIt partial_sum(InputIt first, InputIt last, OutputIt d_first,
                     BinaryOperation op) {
  if (first == last) {
    return d_first;
  }
  typename std::iterator_traits<InputIt>::value_type sum = *first;
  *d_first = sum;
  while (++first != last) {
    sum = op(std::move(sum), *first);
    *++d_first = sum;
  }
  return ++d_first;
}
1 1 1 1 1 1 1 1 1 // 输入 9 个 1
// 划分为三部分
1 1 1
1 1 1
1 1 1
// 得到三个部分的结果
1 2 3
1 2 3
1 2 3
// 将第一部分的尾元素(即 3)加到第二部分
1 2 3
4 5 6
1 2 3
// 再将第二部分的尾元素(即 6)加到第三部分
1 2 3
4 5 6
7 8 9
#include <algorithm>
#include <future>
#include <numeric>

template <typename Iterator>
void parallel_partial_sum(Iterator first, Iterator last) {
  using value_type = typename Iterator::value_type;
  struct process_chunk {
    void operator()(Iterator begin, Iterator last,
                    std::future<value_type>* previous_end_value,
                    std::promise<value_type>* end_value) {
      try {
        Iterator end = last;
        ++end;
        std::partial_sum(begin, end, begin);
        if (previous_end_value) {  // 不是第一个块
          value_type addend = previous_end_value->get();
          *last += addend;
          if (end_value) {
            end_value->set_value(*last);
          }
          std::for_each(begin, last,
                        [addend](value_type& item) { item += addend; });
        } else if (end_value) {
          end_value->set_value(*last);  // 是第一个块则可以为下个块更新尾元素
        }
      } catch (...) {
        // 如果抛出异常则存储到
        // std::promise,异常会传播给下一个块(获取这个块的尾元素时)
        if (end_value) {
          end_value->set_exception(std::current_exception());
        } else {
          throw;  // 异常最终传给最后一个块,此时再抛出异常
        }
      }
    }
  };

  std::size_t len = std::distance(first, last);
  if (!len) {
    return;
  }
  std::size_t min_per_thread = 25;
  std::size_t max_threads = (len + min_per_thread - 1) / min_per_thread;
  std::size_t hardware_threads = std::thread::hardware_concurrency();
  std::size_t num_threads =
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  std::size_t block_size = len / num_threads;
  // end_values 存储块内尾元素值
  std::vector<std::promise<value_type>> end_values(num_threads - 1);
  // prev_end_values 检索前一个块的尾元素
  std::vector<std::future<value_type>> prev_end_values;
  prev_end_values.reserve(num_threads - 1);
  Iterator block_start = first;
  std::vector<std::jthread> threads(num_threads - 1);
  for (std::size_t i = 0; i < num_threads - 1; ++i) {
    Iterator block_last = block_start;
    std::advance(block_last, block_size - 1);  // 指向尾元素
    threads[i] = std::jthread(process_chunk{}, block_start, block_last,
                              i != 0 ? &prev_end_values[i - 1] : nullptr,
                              &end_values[i]);
    block_start = block_last;
    ++block_start;
    prev_end_values.emplace_back(end_values[i].get_future());
  }
  Iterator final_element = block_start;
  std::advance(final_element, std::distance(block_start, last) - 1);
  process_chunk{}(block_start, final_element,
                  num_threads > 1 ? &prev_end_values.back() : nullptr, nullptr);
}
1 1 1 1 1 1 1 1 1 // 输入 9 个 1
// 先让距离为 1 的元素相加
1 2 2 2 2 2 2 2 2
// 再让距离为 2 的元素相加
1 2 3 4 4 4 4 4 4
// 再让距离为 4 的元素相加
1 2 3 4 5 6 7 8 8
// 再让距离为 8 的元素相加
1 2 3 4 5 6 7 8 9