警告
本文最后更新于 2023-12-19,文中内容可能已过时。
条件变量(condition variable)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| #include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
class A {
public:
void step1() {
{
std::lock_guard<std::mutex> l(m_);
step1_done_ = true;
}
std::cout << 1;
cv_.notify_one();
}
void step2() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return step1_done_; });
step2_done_ = true;
std::cout << 2;
cv_.notify_one();
}
void step3() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return step2_done_; });
std::cout << 3;
}
private:
std::mutex m_;
std::condition_variable cv_;
bool step1_done_ = false;
bool step2_done_ = false;
};
int main() {
A a;
std::thread t1(&A::step1, &a);
std::thread t2(&A::step2, &a);
std::thread t3(&A::step3, &a);
t1.join();
t2.join();
t3.join();
} // 123
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| #include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
class A {
public:
void wait1() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return done_; });
std::cout << 1;
}
void wait2() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return done_; });
std::cout << 2;
}
void signal() {
{
std::lock_guard<std::mutex> l(m_);
done_ = true;
}
cv_.notify_all();
}
private:
std::mutex m_;
std::condition_variable cv_;
bool done_ = false;
};
int main() {
A a;
std::thread t1(&A::wait1, &a);
std::thread t2(&A::wait2, &a);
std::thread t3(&A::signal, &a);
t1.join();
t2.join();
t3.join();
} // 12 or 21
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| #include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
class Mutex {
public:
void lock() {}
void unlock() {}
};
class A {
public:
void signal() {
std::cout << 1;
cv_.notify_one();
}
void wait() {
Mutex m;
cv_.wait(m);
std::cout << 2;
}
private:
std::condition_variable_any cv_;
};
int main() {
A a;
std::thread t1(&A::signal, &a);
std::thread t2(&A::wait, &a);
t1.join();
t2.join();
} // 12
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| #include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
template <typename T>
class ConcurrentQueue {
public:
ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue& rhs) {
std::lock_guard<std::mutex> l(rhs.m_);
q_ = rhs.q_;
}
void push(T x) {
std::lock_guard<std::mutex> l(m_);
q_.push(std::move(x));
cv_.notify_one();
}
void wait_and_pop(T& res) {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return !q_.empty(); });
res = std::move(q_.front());
q_.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return !q_.empty(); });
auto res = std::make_shared<T>(std::move(q_.front()));
q_.pop();
return res;
}
bool try_pop(T& res) {
std::lock_guard<std::mutex> l(m_);
if (q_.empty()) {
return false;
}
res = std::move(q_.front());
q_.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> l(m_);
if (q_.empty()) {
return nullptr;
}
auto res = std::make_shared<T>(std::move(q_.front()));
q_.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> l(m_);
// 其他线程可能有此对象(拷贝构造)所以要上锁
return q_.empty();
}
private:
mutable std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
};
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| #include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <utility>
template <typename T>
class ConcurrentQueue {
public:
ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue& rhs) {
std::lock_guard<std::mutex> l(rhs.m_);
q_ = rhs.q_;
}
void push(T x) {
auto data = std::make_shared<T>(std::move(x));
std::lock_guard<std::mutex> l(m_);
q_.push(data);
cv_.notify_one();
}
void wait_and_pop(T& res) {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return !q_.empty(); });
res = std::move(*q_.front());
q_.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> l(m_);
cv_.wait(l, [this] { return !q_.empty(); });
auto res = q_.front();
q_.pop();
return res;
}
bool try_pop(T& res) {
std::lock_guard<std::mutex> l(m_);
if (q_.empty()) {
return false;
}
res = std::move(*q_.front());
q_.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> l(m_);
if (q_.empty()) {
return nullptr;
}
auto res = q_.front();
q_.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> l(m_);
return q_.empty();
}
private:
mutable std::mutex m_;
std::condition_variable cv_;
std::queue<std::shared_ptr<T>> q_;
};
|
信号量(semaphore)
- 信号量用于实现多线程之间指定数量的事件通知,P 操作对信号量减 1,V 操作对信号量加 1,若 P 操作将导致信号量小于 0 则阻塞,直至可减少信号量为止。C++20 提供了 std::counting_semaphore ,构造时通过模板参数设置信号量的最大值,通过函数参数设置信号量的初始值,acquire() 即 P 操作,会在信号量值不小于 0 时将信号量减 1,否则阻塞至可以减 1 为止,release() 即 V 操作,会将信号量加上指定值(不指定则加 1),并唤醒指定数量的被 acquire() 阻塞的信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| #include <iostream>
#include <semaphore>
#include <thread>
class A {
public:
void wait1() {
sem_.acquire();
std::cout << 1;
}
void wait2() {
sem_.acquire();
std::cout << 2;
}
void signal() { sem_.release(2); }
private:
std::counting_semaphore<2> sem_{0}; // 初始值 0,最大值 2
};
int main() {
A a;
std::thread t1(&A::wait1, &a);
std::thread t2(&A::wait2, &a);
std::thread t3(&A::signal, &a);
t1.join();
t2.join();
t3.join();
} // 12 or 21
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| #include <iostream>
#include <semaphore>
#include <thread>
class A {
public:
void wait() {
sem_.acquire();
std::cout << 2;
}
void signal() {
std::cout << 1;
sem_.release();
}
private:
std::binary_semaphore sem_{0};
};
int main() {
A a;
std::thread t1(&A::wait, &a);
std::thread t2(&A::signal, &a);
t1.join();
t2.join();
} // 12
|
屏障(barrier)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| #include <barrier>
#include <cassert>
#include <iostream>
#include <thread>
class A {
public:
void f() {
std::barrier sync_point{3, [&]() noexcept { ++i_; }};
for (auto& x : tasks_) {
x = std::thread([&] {
std::cout << 1;
sync_point.arrive_and_wait();
assert(i_ == 1);
std::cout << 2;
sync_point.arrive_and_wait();
assert(i_ == 2);
std::cout << 3;
});
}
for (auto& x : tasks_) {
x.join(); // 析构 barrier 前 join 所有使用了 barrier 的线程
} // 析构 barrier 时,线程再调用 barrier 的成员函数是 undefined behavior
}
private:
std::thread tasks_[3] = {};
int i_ = 0;
};
int main() {
A a;
a.f();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| #include <iostream>
#include <latch>
#include <string>
#include <thread>
class A {
public:
void f() {
for (auto& x : data_) {
x.t = std::jthread([&] {
x.s += x.s;
done_.count_down();
});
}
done_.wait();
for (auto& x : data_) {
std::cout << x.s << std::endl;
}
}
private:
struct {
std::string s;
std::jthread t;
} data_[3] = {
{"hello"},
{"down"},
{"demo"},
};
std::latch done_{3};
};
int main() {
A a;
a.f();
}
|
期值(future)
1
2
3
4
5
6
7
8
9
10
11
12
13
| #include <future>
#include <iostream>
class A {
public:
int f(int i) { return i; }
};
int main() {
A a;
std::future<int> res = std::async(&A::f, &a, 1);
std::cout << res.get(); // 1,阻塞至线程返回结果
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| #include <future>
#include <iostream>
int main() {
std::future<void> res = std::async([] {});
res.get();
try {
res.get();
} catch (const std::future_error& e) {
std::cout << e.what() << std::endl; // no state
}
}
|
1
2
3
4
5
6
7
8
9
| namespace std {
enum class launch { // names for launch options passed to async
async = 0x1, // 运行新线程来执行任务
deferred = 0x2 // 惰性求值,请求结果时才执行任务
};
}
// std::async 创建任务默认使用两者
std::async([] {}); // 等价于 std::async(std::launch::async | std::launch::deferred, [] {})
|
1
2
3
4
5
6
7
8
9
| #include <future>
#include <iostream>
int main() {
std::packaged_task<int(int)> task([](int i) { return i; });
task(1); // 请求计算结果,内部的 future 将设置结果值
std::future<int> res = task.get_future();
std::cout << res.get(); // 1
}
|
1
2
3
4
5
6
7
8
9
| #include <future>
#include <iostream>
int main() {
std::promise<int> ps;
ps.set_value(1); // 内部的 future 将设置结果值
std::future<int> res = ps.get_future();
std::cout << res.get(); // 1
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| #include <chrono>
#include <future>
#include <iostream>
class A {
public:
void signal() {
std::cout << 1;
ps_.set_value();
}
void wait() {
std::future<void> res = ps_.get_future();
res.wait();
std::cout << 2;
}
private:
std::promise<void> ps_;
};
int main() {
A a;
std::thread t1{&A::signal, &a};
std::thread t2{&A::wait, &a};
t1.join();
t2.join();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| #include <chrono>
#include <future>
#include <iostream>
class A {
public:
void task() { std::cout << 1; }
void wait_for_task() {
ps_.get_future().wait();
task();
}
void signal() { ps_.set_value(); }
private:
std::promise<void> ps_;
};
void task() { std::cout << 1; }
int main() {
A a;
std::thread t(&A::wait_for_task, &a);
a.signal();
t.join();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| #include <future>
#include <iostream>
int main() {
std::promise<void> ps;
auto a = ps.get_future();
try {
auto b = ps.get_future();
} catch (const std::future_error& e) {
std::cout << e.what() << std::endl; // future already retrieved
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| #include <future>
#include <iostream>
#include <stdexcept>
int main() {
std::future<void> res = std::async([] { throw std::logic_error("error"); });
try {
res.get();
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| #include <future>
#include <iostream>
#include <stdexcept>
int main() {
std::promise<void> ps;
try {
throw std::logic_error("error");
} catch (...) {
ps.set_exception(std::current_exception());
}
auto res = ps.get_future();
try {
res.get();
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| #include <future>
#include <iostream>
#include <stdexcept>
int main() {
std::promise<int> ps;
try {
ps.set_value([] {
throw std::logic_error("error");
return 0;
}());
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
}
ps.set_value(1);
auto res = ps.get_future();
std::cout << res.get(); // 1
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| #include <future>
#include <iostream>
int main() {
std::future<void> ft1;
std::future<void> ft2;
{
std::packaged_task<void()> task([] {});
std::promise<void> ps;
ft1 = task.get_future();
ft2 = ps.get_future();
}
try {
ft1.get();
} catch (const std::future_error& e) {
std::cout << e.what() << std::endl; // broken promise
}
try {
ft2.get();
} catch (const std::future_error& e) {
std::cout << e.what() << std::endl; // broken promise
}
}
|
1
2
3
4
5
6
7
8
9
10
11
| #include <future>
int main() {
std::promise<void> ps;
std::future<void> ft = ps.get_future();
std::shared_future<void> sf(std::move(ft));
// 或直接 std::shared_future<void> sf{ps.get_future()};
ps.set_value();
sf.get();
sf.get();
}
|
1
2
3
4
5
6
7
8
9
| #include <future>
int main() {
std::promise<void> ps;
auto sf = ps.get_future().share();
ps.set_value();
sf.get();
sf.get();
}
|
时钟
- 对于标准库来说,时钟是提供了四种信息的类
- 获取当前 UNIX 时间戳,单位为纳秒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| #ifdef _WIN32
#include <chrono>
#elif defined __GNUC__
#include <time.h>
#endif
long long now_in_ns() {
#ifdef _WIN32
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
#elif defined __GNUC__
struct timespec t;
clockid_t clk_id = CLOCK_REALTIME;
clock_gettime(clk_id, &t);
return t.tv_sec * 1e9 + t.tv_nsec;
#endif
}
|
1
2
3
4
5
6
7
8
9
10
| #include <chrono>
#include <iomanip>
#include <iostream>
int main() {
std::chrono::system_clock::time_point t = std::chrono::system_clock::now();
std::time_t c = std::chrono::system_clock::to_time_t(t); // UNIX 时间戳,秒
// %F 即 %Y-%m-%d,%T 即 %H:%M:%S,如 2011-11-11 11:11:11
std::cout << std::put_time(std::localtime(&c), "%F %T");
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| namespace std {
namespace chrono {
using nanoseconds = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds = duration<long long>;
using minutes = duration<int, ratio<60>>;
using hours = duration<int, ratio<3600>>;
// C++20
using days = duration<int, ratio_multiply<ratio<24>, hours::period>>;
using weeks = duration<int, ratio_multiply<ratio<7>, days::period>>;
using years = duration<int, ratio_multiply<ratio<146097, 400>, days::period>>;
using months = duration<int, ratio_divide<years::period, ratio<12>>>;
} // namespace chrono
} // namespace std
|
1
2
3
4
5
6
7
8
9
10
11
12
13
| #include <cassert>
#include <chrono>
using namespace std::literals::chrono_literals;
int main() {
auto a = 45min;
assert(a.count() == 45);
auto b = std::chrono::duration_cast<std::chrono::seconds>(a);
assert(b.count() == 2700);
auto c = std::chrono::duration_cast<std::chrono::hours>(a);
assert(c.count() == 0); // 转换会截断
}
|
1
2
3
4
5
6
7
8
9
| #include <cassert>
#include <chrono>
using namespace std::literals::chrono_literals;
int main() {
assert((1h - 2 * 15min).count() == 30);
assert((0.5h + 2 * 15min + 60s).count() == 3660);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| #include <chrono>
#include <future>
#include <iostream>
#include <thread>
int f() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return 1;
}
int main() {
auto res = std::async(f);
if (res.wait_for(std::chrono::seconds(5)) == std::future_status::ready) {
std::cout << res.get();
}
}
|
1
2
| // 第一个模板参数为开始时间点的时钟类型,第二个为时间单位
std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>
|
1
2
3
4
5
6
7
8
9
10
| #include <cassert>
#include <chrono>
int main() {
std::chrono::system_clock::time_point a = std::chrono::system_clock::now();
std::chrono::system_clock::time_point b = a + std::chrono::hours(1);
long long diff =
std::chrono::duration_cast<std::chrono::seconds>(b - a).count();
assert(diff == 3600);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| #include <cstdint>
#ifdef _WIN32
#include <intrin.h>
#endif
static inline std::uint64_t read_tsc() {
#ifdef _WIN32
return __rdtsc();
#elif defined __GNUC__
std::uint64_t res;
__asm__ __volatile__(
"rdtsc;"
"shl $32, %%rdx;"
"or %%rdx, %%rax"
: "=a"(res)
:
: "%rcx", "%rdx");
return res;
#endif
}
static inline std::uint64_t read_tscp() {
#ifdef _WIN32
std::uint32_t t;
return __rdtscp(&t);
#elif defined __GNUC__
std::uint64_t res;
__asm__ __volatile__(
"rdtscp;"
"shl $32, %%rdx;"
"or %%rdx, %%rax"
: "=a"(res)
:
: "%rcx", "%rdx");
return res;
#endif
}
static inline void fence() {
#ifdef _WIN32
__faststorefence();
#elif defined __GNUC__
__asm__ __volatile__("mfence" : : : "memory");
#endif
}
inline std::uint64_t tsc_begin() {
std::uint64_t res = read_tsc();
fence();
return res;
}
inline std::uint64_t tsc_mid() {
std::uint64_t res = read_tscp();
fence();
return res;
}
inline std::uint64_t tsc_end() { return read_tscp(); }
|
函数式编程(functional programming)
- 函数式编程是一种编程范式,使用的函数为纯函数,即如果函数的调用参数相同,则永远返回相同的结果,纯函数不会改变外部状态,因此对于只使用纯函数的函数式编程,天生就不存在 race condition 的问题。Haskell 是一种常见的函数式编程语言,以快速排序为例,Haskell 中的实现如下
1
2
3
4
5
6
7
8
9
| quickSort :: Ord a => [a] -> [a]
quickSort [] = []
quickSort (x : xs) = l ++ [x] ++ r
where
l = quickSort (filter (<= x) xs)
r = quickSort (filter (> x) xs)
main :: IO ()
main = print (quickSort "downdemo") -- "ddemnoow"
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| #include <algorithm>
#include <iostream>
#include <list>
#include <utility>
template <typename T>
std::list<T> quick_sort(std::list<T> v) {
if (v.empty()) {
return v;
}
std::list<T> res;
res.splice(res.begin(), v, v.begin()); // 将 v 的首元素移到 res 中
// 将 v 按条件划分为两部分,并返回第一个不满足条件元素的迭代器
auto it = std::partition(v.begin(), v.end(),
[&](const T& x) { return x < res.front(); });
std::list<T> low;
low.splice(low.end(), v, v.begin(), it); // 转移左半部分到 low
auto l(quick_sort(std::move(low))); // 递归对左半部分快速排序
auto r(quick_sort(std::move(v))); // 递归对右半部分快速排序
res.splice(res.end(), r); // 右半部分移到结果后
res.splice(res.begin(), l); // 左半部分移到结果前
return res;
}
int main() {
for (auto& x : quick_sort(std::list<int>{1, 3, 2, 4, 5})) {
std::cout << x; // 12345
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| #include <algorithm>
#include <future>
#include <iostream>
#include <list>
#include <utility>
template <typename T>
std::list<T> quick_sort(std::list<T> v) {
if (v.empty()) {
return v;
}
std::list<T> res;
res.splice(res.begin(), v, v.begin());
auto it = std::partition(v.begin(), v.end(),
[&](const T& x) { return x < res.front(); });
std::list<T> low;
low.splice(low.end(), v, v.begin(), it);
// 用另一个线程对左半部分排序
std::future<std::list<T>> l(std::async(&quick_sort<T>, std::move(low)));
auto r(quick_sort(std::move(v)));
res.splice(res.end(), r);
res.splice(res.begin(), l.get());
return res;
}
int main() {
for (auto& x : quick_sort(std::list<int>{1, 3, 2, 4, 5})) {
std::cout << x; // 12345
}
}
|
链式调用
1
2
3
4
5
6
| import { interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';
const source1$ = interval(500);
const source2$ = interval(1000);
source1$.pipe(withLatestFrom(source2$, (x, y) => `${x}${y}`)); // 10 20 31 41 52 62---
|
1
2
3
4
5
6
7
8
9
10
11
| int f(std::experimental::future<int>);
std::experimental::future<int> eft;
auto ft1 = eft(); // std::experimental::future 由本身的构造函数生成
// 与 std::async 不同,不能传入 f 的参数
// 因为参数已经在运行库中定义为了一个就绪的期值
// 这里 f 的返回 int,因此参数就是 std::experimental::future<int>
auto ft2 = ft1.then(f);
// then 后原期值就无效了
assert(!ft1.valid());
assert(ft2.valid());
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| template <typename F>
std::experimental::future<decltype(std::declval<F>()())> new_async(F&& func) {
std::experimental::promise<decltype(std::declval<F>()())> p;
auto ft = p.get_future();
std::thread t([p = std::move(p), f = std::decay_t<F>(func)]() mutable {
try {
p.set_value_at_thread_exit(f());
} catch (...) {
p.set_exception_at_thread_exit(std::current_exception());
}
});
t.detach();
return ft;
}
|
- 假如要实现一个登录逻辑,将用户名和密码发送给后台验证,取得用户信息后更新到显示界面,串行实现如下
1
2
3
4
5
6
7
8
9
| void process_login(const std::string& username, const std::string& password) {
try {
const user_id id = backend.authenticate_user(username, password);
const user_data info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch (std::exception& e) {
display_error(e);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
| std::future<void> process_login(const std::string& username,
const std::string& password) {
return std::async(std::launch::async, [=]() {
try {
const user_id id = backend.authenticate_user(username, password);
const user_data info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch (std::exception& e) {
display_error(e);
}
});
}
|
- 但这个实现仍然会阻塞 UI 线程,为此就需要链式调用的机制,每个任务完成后连接到前一个任务上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| std::experimental::future<void> process_login(const std::string& username,
const std::string& password) {
return new_async(
[=]() { return backend.authenticate_user(username, password); })
.then([](std::experimental::future<user_id> id) {
return backend.request_current_info(id.get());
})
.then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch (std::exception& e) {
display_error(e);
}
});
}
|
- 如果调用后台函数内部阻塞,可能是因为需要等待消息通过网络或者完成一个数据库操作,而这些还没有完成。即使把任务划分为多个独立部分,也仍会阻塞调用,得到阻塞的线程。这时后台调用真正需要的是,在数据准备好时返回就绪的期值,而不阻塞任何线程,所以这里用返回
std::experimental::future<user_id>
的 backend.async_authenticate_user
替代返回 user_id
的 backend.authenticate_user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| std::experimental::future<void> process_login(const std::string& username,
const std::string& password) {
return backend.async_authenticate_user(username, password)
.then([](std::experimental::future<user_id> id) {
return backend.async_request_current_info(id.get());
})
.then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch (std::exception& e) {
display_error(e);
}
});
}
|
- 这样在异步函数链上就不存在阻塞了。最后这里还可以用泛型 lambda 来简化代码
1
2
3
4
5
6
7
8
9
10
11
12
13
| std::experimental::future<void> process_login(const std::string& username,
const std::string& password) {
return backend.async_authenticate_user(username, password)
.then(
[](auto id) { return backend.async_request_current_info(id.get()); })
.then([](auto info_to_display) {
try {
update_display(info_to_display.get());
} catch (std::exception& e) {
display_error(e);
}
});
}
|
1
2
3
4
5
6
| auto ft1 = new_async(some_function).share();
auto ft2 = ft1.then(
[](std::experimental::shared_future<some_data> data) { do_stuff(data); });
auto ft3 = ft1.then([](std::experimental::shared_future<some_data> data) {
return do_other_stuff(data);
});
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| std::future<FinalResult> process_data(std::vector<MyData>& vec) {
const size_t chunk_size = whatever;
std::vector<std::future<ChunkResult>> res;
for (auto begin = vec.begin(), end = vec.end(); beg ! = end;) {
const size_t remaining_size = end - begin;
const size_t this_chunk_size = std::min(remaining_size, chunk_size);
res.push_back(std::async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
return std::async([all_results = std::move(res)]() {
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // 这里会导致反复唤醒,增加了很多开销
}
return gather_results(v);
});
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) {
const size_t chunk_size = whatever;
std::vector<std::experimental::future<ChunkResult>> res;
for (auto begin = vec.begin(), end = vec.end(); beg ! = end;) {
const size_t remaining_size = end - begin;
const size_t this_chunk_size = std::min(remaining_size, chunk_size);
res.push_back(new_async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
return std::experimental::when_all(res.begin(), res.end())
.then([](std::future<std::vector<std::experimental::future<ChunkResult>>>
ready_results) {
std::vector<std::experimental::future<ChunkResult>> all_results =
ready_results.get();
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get());
}
return gather_results(v);
});
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| std::experimental::future<FinalResult> find_and_process_value(
std::vector<MyData>& data) {
const unsigned concurrency = std::thread::hardware_concurrency();
const unsigned num_tasks = (concurrency > 0) ? concurrency : 2;
std::vector<std::experimental::future<MyData*>> res;
const auto chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
std::shared_ptr<std::atomic<bool>> done_flag =
std::make_shared<std::atomic<bool>>(false);
for (unsigned i = 0; i < num_tasks; ++i) { // 生成异步任务到 res 中
auto chunk_end =
(i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
res.push_back(new_async([=] {
for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end);
++entry) {
if (matches_find_criteria(*entry)) {
*done_flag = true;
return &*entry;
}
}
return (MyData**)nullptr;
}));
chunk_begin = chunk_end;
}
std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
std::make_shared<std::experimental::promise<FinalResult>>();
struct DoneCheck {
std::shared_ptr<std::experimental::promise<FinalResult>> final_result;
DoneCheck(
std::shared_ptr<std::experimental::promise<FinalResult>> final_result_)
: final_result(std::move(final_result_)) {}
void operator()(
std::experimental::future<std::experimental::when_any_result<
std::vector<std::experimental::future<MyData*>>>>
res_param) {
auto res = res_param.get();
MyData* const ready_result =
res.futures[res.index].get(); // 从就绪的期值中获取值
// 找到符合条件的值则处理结果并 set_value
if (ready_result) {
final_result->set_value(process_found_value(*ready_result));
} else {
res.futures.erase(res.futures.begin() + res.index); // 否则丢弃值
if (!res.futures.empty()) { // 如果还有需要检查的值则再次调用 when_any
std::experimental::when_any(res.futures.begin(), res.futures.end())
.then(std::move(*this));
} else { // 如果没有其他期值则在 promise 中设置一个异常
final_result->set_exception(
std::make_exception_ptr(std::runtime_error("Not found")));
}
}
}
};
std::experimental::when_any(res.begin(), res.end())
.then(DoneCheck(final_result));
return final_result->get_future();
}
|
- when_all 和 when_any 除了可以接收一对迭代器,也可以直接接受期值
1
2
3
4
5
6
7
8
| std::experimental::future<int> ft1 = new_async(f1);
std::experimental::future<std::string> ft2 = new_async(f2);
std::experimental::future<double> ft3 = new_async(f3);
std::experimental::future<std::tuple<std::experimental::future<int>,
std::experimental::future<std::string>,
std::experimental::future<double>>>
res = std::experimental::when_all(std::move(ft1), std::move(ft2),
std::move(ft3));
|
CSP(Communicating Sequential Processes)
- CSP 是一种描述并发系统交互的编程模型,线程理论上是分开的,没有共享数据,每个线程可以完全独立地思考,消息通过 communication channel 在不同线程间传递,线程行为取决于收到的消息,因此每个线程实际上是一个状态机,收到一条消息时就以某种方式更新状态,并且还可能发送消息给其他线程。Erlang 采用了这种编程模型,并用于 MPI 做 C 和 C++ 的高性能计算。真正的 CSP 没有共享数据,所有通信通过消息队列传递,但由于 C++ 线程共享地址空间,无法强制实现这个要求,所以需要应用或者库的作者来确保线程间不会共享数据
- 考虑实现一个 ATM 应用,它需要处理取钱时和银行的交互,并控制物理机器对银行卡的反应。一个处理方法是分三个线程,分别处理物理机器、ATM 逻辑、与银行的交互,线程间通过消息通讯而非共享数据,比如插卡时机器线程发送消息给逻辑线程,逻辑线程返回一条消息通知机器线程可以给多少钱
- 一个简单的 ATM 逻辑的状态机建模如下
- 这个 ATM 逻辑的状态机与系统的其他部分各自运行在独立的线程上,不需要考虑同步和并发的问题,只要考虑在某个点接受和发送的消息,这种设计方式称为 actor model,系统中有多个独立的 actor,actor 之间可以互相发送消息但不会共享状态,这种方式可以极大简化并发系统的设计。完整的代码实现见此
ref:
https://github.com/downdemo/Cpp-Concurrency-in-Action-2ed