C++ Concurrency in Action [9] | CH09 Advanced Thread Management

警告
本文最后更新于 2023-12-19,文中内容可能已过时。

线程池

  • 线程池一般会用一个表示线程数的参数来初始化,内部需要一个队列来存储任务。下面是一个最简单的线程池实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>

class ThreadPool {
 public:
  explicit ThreadPool(std::size_t n) {
    for (std::size_t i = 0; i < n; ++i) {
      std::thread{[this] {
        std::unique_lock<std::mutex> l(m_);
        while (true) {
          if (!q_.empty()) {
            auto task = std::move(q_.front());
            q_.pop();
            l.unlock();
            task();
            l.lock();
          } else if (done_) {
            break;
          } else {
            cv_.wait(l);
          }
        }
      }}.detach();
    }
  }

  ~ThreadPool() {
    {
      std::lock_guard<std::mutex> l(m_);
      done_ = true;  // cv_.wait 使用了 done_ 判断所以要加锁
    }
    cv_.notify_all();
  }

  template <typename F>
  void submit(F&& f) {
    {
      std::lock_guard<std::mutex> l(m_);
      q_.emplace(std::forward<F>(f));
    }
    cv_.notify_one();
  }

 private:
  std::mutex m_;
  std::condition_variable cv_;
  bool done_ = false;
  std::queue<std::function<void()>> q_;
};
  • 如果想让提交的任务带参数会麻烦很多
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
template <class F, class... Args>
auto ThreadPool::submit(F&& f, Args&&... args) {
  using RT = std::invoke_result_t<F, Args...>;
  // std::packaged_task 不允许拷贝构造,不能直接传入 lambda,
  // 因此要借助 std::shared_ptr
  auto task = std::make_shared<std::packaged_task<RT()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  // 但 std::bind 会按值拷贝实参,因此这个实现不允许任务的实参是 move-only 类型
  {
    std::lock_guard<std::mutex> l(m_);
    q_.emplace([task]() { (*task)(); });  // 捕获指针以传入 std::packaged_task
  }
  cv_.notify_one();
  return task->get_future();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <atomic>
#include <functional>
#include <thread>
#include <vector>

#include "concurrent_queue.hpp"

class ThreadPool {
 public:
  ThreadPool() {
    std::size_t n = std::thread::hardware_concurrency();
    try {
      for (std::size_t i = 0; i < n; ++i) {
        threads_.emplace_back(&ThreadPool::worker_thread, this);
      }
    } catch (...) {
      done_ = true;
      for (auto& x : threads_) {
        if (x.joinable()) {
          x.join();
        }
      }
      throw;
    }
  }

  ~ThreadPool() {
    done_ = true;
    for (auto& x : threads_) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

  template <typename F>
  void submit(F f) {
    q_.push(std::function<void()>(f));
  }

 private:
  void worker_thread() {
    while (!done_) {
      std::function<void()> task;
      if (q_.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 private:
  std::atomic<bool> done_ = false;
  ConcurrentQueue<std::function<void()>> q_;
  std::vector<std::thread> threads_;  // 要在 done_ 和 q_ 之后声明
};
  • 这样做的问题是,如果线程池处于空闲状态,就会无限转让时间片,导致 CPU 使用率达 100%,下面是对书中的线程池的 CPU 使用率测试结果

  • 对相同任务用之前实现的线程池的测试结果

  • 这里还是把书上的内容列出来,下文均为书中内容
  • 这个线程池只能执行无参数无返回值的函数,并且可能出现死锁,下面希望能执行无参数但有返回值的函数。为了得到返回值,就应该把函数传递给 std::packaged_task 再加入队列,并返回 std::packaged_task 中的 std::future。由于 std::packaged_task 是 move-only 类型,而 std::function 要求存储的函数实例可以拷贝构造,因此这里需要实现一个支持 move-only 类型的函数包裹类,即一个带 call 操作的类型擦除(type-erasure)类
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <memory>
#include <utility>

class FunctionWrapper {
 public:
  FunctionWrapper() = default;

  FunctionWrapper(const FunctionWrapper&) = delete;

  FunctionWrapper& operator=(const FunctionWrapper&) = delete;

  FunctionWrapper(FunctionWrapper&& rhs) noexcept
      : impl_(std::move(rhs.impl_)) {}

  FunctionWrapper& operator=(FunctionWrapper&& rhs) noexcept {
    impl_ = std::move(rhs.impl_);
    return *this;
  }

  template <typename F>
  FunctionWrapper(F&& f) : impl_(new ImplType<F>(std::move(f))) {}

  void operator()() const { impl_->call(); }

 private:
  struct ImplBase {
    virtual void call() = 0;
    virtual ~ImplBase() = default;
  };

  template <typename F>
  struct ImplType : ImplBase {
    ImplType(F&& f) noexcept : f_(std::move(f)) {}
    void call() override { f_(); }

    F f_;
  };

 private:
  std::unique_ptr<ImplBase> impl_;
};
  • 用这个包裹类替代 std::function<void()>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <atomic>
#include <future>
#include <thread>
#include <type_traits>
#include <vector>

#include "concurrent_queue.hpp"
#include "function_wrapper.hpp"

class ThreadPool {
 public:
  ThreadPool() {
    std::size_t n = std::thread::hardware_concurrency();
    try {
      for (std::size_t i = 0; i < n; ++i) {
        threads_.emplace_back(&ThreadPool::worker_thread, this);
      }
    } catch (...) {
      done_ = true;
      for (auto& x : threads_) {
        if (x.joinable()) {
          x.join();
        }
      }
      throw;
    }
  }

  ~ThreadPool() {
    done_ = true;
    for (auto& x : threads_) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

  template <typename F>
  std::future<std::invoke_result_t<F>> submit(F f) {
    std::packaged_task<std::invoke_result_t<F>()> task(std::move(f));
    std::future<std::invoke_result_t<F>> res(task.get_future());
    q_.push(std::move(task));
    return res;
  }

 private:
  void worker_thread() {
    while (!done_) {
      FunctionWrapper task;
      if (q_.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 private:
  std::atomic<bool> done_ = false;
  ConcurrentQueue<FunctionWrapper> q_;
  std::vector<std::thread> threads_;  // 要在 done_ 和 q_ 之后声明
};
  • 往线程池添加任务会增加任务队列的竞争,lock-free 队列可以避免这点但存在乒乓缓存的问题。为此需要把任务队列拆分为线程独立的本地队列和全局队列,当线程队列无任务时就去全局队列取任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <atomic>
#include <future>
#include <memory>
#include <queue>
#include <thread>
#include <type_traits>
#include <vector>

#include "concurrent_queue.hpp"
#include "function_wrapper.hpp"

class ThreadPool {
 public:
  ThreadPool() {
    std::size_t n = std::thread::hardware_concurrency();
    try {
      for (std::size_t i = 0; i < n; ++i) {
        threads_.emplace_back(&ThreadPool::worker_thread, this);
      }
    } catch (...) {
      done_ = true;
      for (auto& x : threads_) {
        if (x.joinable()) {
          x.join();
        }
      }
      throw;
    }
  }

  ~ThreadPool() {
    done_ = true;
    for (auto& x : threads_) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

  template <typename F>
  std::future<std::invoke_result_t<F>> submit(F f) {
    std::packaged_task<std::invoke_result_t<F>()> task(std::move(f));
    std::future<std::invoke_result_t<F>> res(task.get_future());
    if (local_queue_) {
      local_queue_->push(std::move(task));
    } else {
      pool_queue_.push(std::move(task));
    }
    return res;
  }

 private:
  void worker_thread() {
    local_queue_.reset(new std::queue<FunctionWrapper>);
    while (!done_) {
      FunctionWrapper task;
      if (local_queue_ && !local_queue_->empty()) {
        task = std::move(local_queue_->front());
        local_queue_->pop();
        task();
      } else if (pool_queue_.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 private:
  std::atomic<bool> done_ = false;
  ConcurrentQueue<FunctionWrapper> pool_queue_;
  inline static thread_local std::unique_ptr<std::queue<FunctionWrapper>>
      local_queue_;
  std::vector<std::thread> threads_;
};
  • 这可以避免数据竞争,但如果任务分配不均,就会导致某个线程的本地队列中有很多任务,而其他线程无事可做,为此应该让没有工作的线程可以从其他线程获取任务
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include <atomic>
#include <deque>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <type_traits>
#include <vector>

#include "concurrent_queue.hpp"
#include "function_wrapper.hpp"

class WorkStealingQueue {
 public:
  WorkStealingQueue() = default;

  WorkStealingQueue(const WorkStealingQueue&) = delete;

  WorkStealingQueue& operator=(const WorkStealingQueue&) = delete;

  void push(FunctionWrapper f) {
    std::lock_guard<std::mutex> l(m_);
    q_.push_front(std::move(f));
  }

  bool empty() const {
    std::lock_guard<std::mutex> l(m_);
    return q_.empty();
  }

  bool try_pop(FunctionWrapper& res) {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return false;
    }
    res = std::move(q_.front());
    q_.pop_front();
    return true;
  }

  bool try_steal(FunctionWrapper& res) {
    std::lock_guard<std::mutex> l(m_);
    if (q_.empty()) {
      return false;
    }
    res = std::move(q_.back());
    q_.pop_back();
    return true;
  }

 private:
  std::deque<FunctionWrapper> q_;
  mutable std::mutex m_;
};

class ThreadPool {
 public:
  ThreadPool() {
    std::size_t n = std::thread::hardware_concurrency();
    try {
      for (std::size_t i = 0; i < n; ++i) {
        work_stealing_queue_.emplace_back(
            std::make_unique<WorkStealingQueue>());
        threads_.emplace_back(&ThreadPool::worker_thread, this, i);
      }
    } catch (...) {
      done_ = true;
      for (auto& x : threads_) {
        if (x.joinable()) {
          x.join();
        }
      }
      throw;
    }
  }

  ~ThreadPool() {
    done_ = true;
    for (auto& x : threads_) {
      if (x.joinable()) {
        x.join();
      }
    }
  }

  template <typename F>
  std::future<std::invoke_result_t<F>> submit(F f) {
    std::packaged_task<std::invoke_result_t<F>()> task(std::move(f));
    std::future<std::invoke_result_t<F>> res(task.get_future());
    if (local_queue_) {
      local_queue_->push(std::move(task));
    } else {
      pool_queue_.push(std::move(task));
    }
    return res;
  }

 private:
  bool pop_task_from_local_queue(FunctionWrapper& task) {
    return local_queue_ && local_queue_->try_pop(task);
  }

  bool pop_task_from_pool_queue(FunctionWrapper& task) {
    return pool_queue_.try_pop(task);
  }

  bool pop_task_from_other_thread_queue(FunctionWrapper& task) {
    for (std::size_t i = 0; i < work_stealing_queue_.size(); ++i) {
      std::size_t index = (index_ + i + 1) % work_stealing_queue_.size();
      if (work_stealing_queue_[index]->try_steal(task)) {
        return true;
      }
    }
    return false;
  }

  void worker_thread(std::size_t index) {
    index_ = index;
    local_queue_ = work_stealing_queue_[index_].get();
    while (!done_) {
      FunctionWrapper task;
      if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) ||
          pop_task_from_other_thread_queue(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 private:
  std::atomic<bool> done_ = false;
  ConcurrentQueue<FunctionWrapper> pool_queue_;
  std::vector<std::unique_ptr<WorkStealingQueue>> work_stealing_queue_;
  std::vector<std::thread> threads_;

  static thread_local WorkStealingQueue* local_queue_;
  static thread_local std::size_t index_;
};

thread_local WorkStealingQueue* ThreadPool::local_queue_;
thread_local std::size_t ThreadPool::index_;

中断

  • 可中断线程的简单实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class InterruptFlag {
 public:
  void set();
  bool is_set() const;
};

thread_local InterruptFlag this_thread_interrupt_flag;

class InterruptibleThread {
 public:
  template <typename F>
  InterruptibleThread(F f) {
    std::promise<InterruptFlag*> p;
    t = std::thread([f, &p] {
      p.set_value(&this_thread_interrupt_flag);
      f();
    });
    flag = p.get_future().get();
  }

  void interrupt() {
    if (flag) {
      flag->set();
    }
  }

 private:
  std::thread t;
  InterruptFlag* flag;
};

void interruption_point() {
  if (this_thread_interrupt_flag.is_set()) {
    throw thread_interrupted();
  }
}
  • 在函数中使用
1
2
3
4
5
6
void f() {
  while (!done) {
    interruption_point();
    process_next_item();
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class InterruptFlag {
 public:
  void set() {
    b_.store(true, std::memory_order_relaxed);
    std::lock_guard<std::mutex> l(m_);
    if (cv_) {
      cv_->notify_all();
    }
  }

  bool is_set() const { return b_.load(std::memory_order_relaxed); }

  void set_condition_variable(std::condition_variable& cv) {
    std::lock_guard<std::mutex> l(m_);
    cv_ = &cv;
  }

  void clear_condition_variable() {
    std::lock_guard<std::mutex> l(m_);
    cv_ = nullptr;
  }

  struct ClearConditionVariableOnDestruct {
    ~ClearConditionVariableOnDestruct() {
      this_thread_interrupt_flag.clear_condition_variable();
    }
  };

 private:
  std::atomic<bool> b_;
  std::condition_variable* cv_ = nullptr;
  std::mutex m_;
};

void interruptible_wait(std::condition_variable& cv,
                        std::unique_lock<std::mutex>& l) {
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);
  // 之后的 wait_for 可能抛异常,所以需要 RAII 清除标志
  InterruptFlag::ClearConditionVariableOnDestruct guard;
  interruption_point();
  // 设置线程看到中断前的等待时间上限
  cv.wait_for(l, std::chrono::milliseconds(1));
  interruption_point();
}

template <typename Predicate>
void interruptible_wait(std::condition_variable& cv,
                        std::unique_lock<std::mutex>& l, Predicate pred) {
  interruption_point();
  this_thread_interrupt_flag.set_condition_variable(cv);
  InterruptFlag::ClearConditionVariableOnDestruct guard;
  while (!this_thread_interrupt_flag.is_set() && !pred()) {
    cv.wait_for(l, std::chrono::milliseconds(1));
  }
  interruption_point();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <atomic>
#include <condition_variable>
#include <mutex>

class InterruptFlag {
 public:
  void set() {
    b_.store(true, std::memory_order_relaxed);
    std::lock_guard<std::mutex> l(m_);
    if (cv_) {
      cv_->notify_all();
    } else if (cv_any_) {
      cv_any_->notify_all();
    }
  }

  template <typename Lockable>
  void wait(std::condition_variable_any& cv, Lockable& l) {
    class Mutex {
     public:
      Mutex(InterruptFlag* self, std::condition_variable_any& cv, Lockable& l)
          : self_(self), lock_(l) {
        self_->m_.lock();
        self_->cv_any_ = &cv;
      }

      ~Mutex() {
        self_->cv_any_ = nullptr;
        self_->m_.unlock();
      }

      void lock() { std::lock(self_->m_, lock_); }

      void unlock() {
        lock_.unlock();
        self_->m_.unlock();
      }

     private:
      InterruptFlag* self_;
      Lockable& lock_;
    };

    Mutex m(this, cv, l);
    interruption_point();
    cv.wait(m);
    interruption_point();
  }
  // rest as before

 private:
  std::atomic<bool> b_;
  std::condition_variable* cv_ = nullptr;
  std::condition_variable_any* cv_any_ = nullptr;
  std::mutex m_;
};

template <typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& l) {
  this_thread_interrupt_flag.wait(cv, l);
}
  • 对于其他阻塞调用(比如 mutex、future)的中断,一般也可以像对 std::condition_variable 一样设置超时时间,因为不访问内部 mutex 或 future 无法在未满足等待的条件时中断等待
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <typename T>
void interruptible_wait(std::future<T>& ft) {
  while (!this_thread_interrupt_flag.is_set()) {
    if (ft.wait_for(std::chrono::milliseconds(1)) ==
        std::future_status::ready) {
      break;
    }
  }
  interruption_point();
}
  • 从被中断的线程角度来看,中断就是一个 thread_interrupted 异常。因此检查出中断后,可以像异常一样对其进行处理
1
2
3
4
5
6
7
8
9
internal_thread = std::thread{[f, &p] {
  p.set_value(&this_thread_interrupt_flag);
  try {
    f();
  } catch (const thread_interrupted&) {
    // 异常传入 std::thread 的析构函数时将调用 std::terminate
    // 为了防止程序终止就要捕获异常
  }
}};
  • 假如有一个桌面搜索程序,除了与用户交互,程序还需要监控文件系统的状态,以识别任何更改并更新其索引。为了避免影响 GUI 的响应性,这个处理通常会交给一个后台线程,后台线程需要运行于程序的整个生命周期。这样的程序通常只在机器关闭时退出,而在其他情况下关闭程序,就需要井然有序地关闭后台线程,一个关闭方式就是中断
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::mutex config_mutex;
std::vector<InterruptibleThread> background_threads;

void background_thread(int disk_id) {
  while (true) {
    interruption_point();
    fs_change fsc = get_fs_changes(disk_id);
    if (fsc.has_changes()) {
      update_index(fsc);
    }
  }
}

void start_background_processing() {
  background_threads.emplace_back(background_thread, disk_1);
  background_threads.emplace_back(background_thread, disk_2);
}

int main() {
  start_background_processing();
  process_gui_until_exit();
  std::unique_lock<std::mutex> l(config_mutex);
  for (auto& x : background_threads) {
    x.interrupt();
  }
  // 中断所有线程后再join
  for (auto& x : background_threads) {
    if (x.joinable()) {
      x.join();
    }
  }
  // 不直接在一个循环里中断并 join 的目的是为了并发,
  // 因为中断不会立即完成,它们必须进入下一个中断点,
  // 再在退出前必要地调用析构和异常处理的代码,
  // 如果对每个线程都中断后立即 join,就会造成中断线程的等待,
  // 即使它还可以做一些有用的工作,比如中断其他线程
}
Buy me a coffee~
支付宝
微信
0%