协程_Cpp协程

协程框架的解释

怎么使用框架?
写了摄像头的驱动,驱动有sdk,用户可以调用sdk,来驱动摄像头。
摄像头是用什么形式的API写的?就不一定了,可能是C语言,可能是C++C++可以按MFC提供,也可以按面向对象提供,也可以按类似于COM组件(接口)的形式提供,也可以按协程库的形式提供。
此协程库框架,是给API厂商使用的。不是一个已经实现了的库,更像是一个中间件。
比如我们想要提供一个读文件的函数给用户使用,readFile,从名字来看其一般是阻塞的,读完后才能返回。如果不想阻塞,则开辟一个线程,在新线程中让其readFile。
非阻塞版本可以称其为readFile_Async。用户从名字上就能看到,这是非阻塞的,调用后主线程可以立即抽身去处理其他事情,在子线程执行完毕后,以回调的形式通知主线程。
而在协程版本下的实现形式则不像传统线程间的回调形式(被割裂在两端),而是直接地写在了一个函数中,用co_await连接上下动作,此处的co_await的成功后的返回“相当于”回调函数。回调之后,就继续做co_await之后的动作即可。
协程库框架还可以灵活切换协程所在的线程,比如切换到主线程、工作线程等等。也可以配置线程池。
假如我们是一个厂商,现在使用该协程库框架开发一个readFile API:

1
2
3
4
5
6
7
8
9
10
#include "Agave.hpp"
#include <iostream>
agave::AsyncAction read_file_async(void)
{

}
agave::AsyncOperation<int> compute_async(void)
{

}

协程工作的总体流程

协程的工作流,在形式上,是包装在一个函数中的。
在函数中,间断地进行:work状态和co_await状态交替。
假如,协程在t1线程上进行work状态。此时,需要co_await
co_await表示等待着某一个其他工作的结果。比如等待网络中传来的文件,等待下载完毕。在co_await时,此函数就会返回,协程暂停运行。co_await成功后,才能继续下面的操作。
下面的操作,可能会在t2线程上进行work(也可能继续在t1,只是举个例子)。
随后,可能还会进行co_await等操作。

看样子co_await表示的是“等待”的意思,但在协程的工作环境中,其实语义更像是"不等了",“我先去处理别的事了,先走了”。有点儿异步的感觉。因为co_await语句使用时,会马上退出既有流程,直到被通知,才进行回调,从而返回work状态。

从t1线程到t2线程的转变,相当于调用了一个callback。此callback是被co_await间隔的。

由上述例子说明,协程和线程不冲突(或者说没关系),因为每次co_await成功后不一定会切换到另一个线程。

创建项目

先创建一个空项目。(Agave是根据C++标准协程库编写的,记得设置为20标准)
再copy已有的4个文件(Agave.cppB_Object.hppBJobScheduler.cppBJobScheduler.h)到该空项目文件目录下。
之后,右键VS项目中的Header Files,Add,Existing Item,选择.hpp.h文件。
右键Source Files,Add,Existing Item,选择.cpp文件。
Source Files中新建一个文件demo.cpp。在其中编写测试代码。

初试1

  1. demo函数返回值是agave::IAsyncAction,则代表此函数是一个协程。
  2. 我们在main主线程中调用demo,即在主线程中创建此协程。
  3. demo协程在其中co_await,后面接的read_file_async可以看作是厂家的API。这个API的名称带有async,意味着它是一个异步的操作,在此co_await XXX_async语句后,会立即返回。但是不会进行下一步操作(输出读取完毕),而是暂时结束demo“函数”,直到co_await的操作(read_file_async)完成后,才继续demo之后的操作,相当于被callback。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include "Agave.hpp"
#include <iostream>

using namespace std::chrono_literals;

agave::AsyncAction read_file_async(void)
{
std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id() << '\n';
std::cout << "Reading..." << std::endl;
// 注意此处不能使用this_thread::sleep_for
// std::this_thread::sleep_for(5s); // error
co_await 5s; // 等待5秒,模拟文件读取
co_return; // 相当于callback
}
agave::AsyncAction demo()
{
std::cout << "demo() on thread: " << std::this_thread::get_id() << '\n';
co_await read_file_async();
std::cout << "Reading finished on thread: " << std::this_thread::get_id() << '\n';
}
int main(void)
{
std::cout << "Main on thread: " << std::this_thread::get_id() << '\n';
auto action = demo();
// 主线程等待协程运行完毕
std::this_thread::sleep_for(8s);
return 0;
}

关于为什么不能用sleep_for

经测试,如果用sleep_for
输出是这样:发现没有执行输出demo的第三个语句(Reading finished)

1
2
3
4
5
6
Main on thread: 25912
demo() on thread: 25912
Read File Coroutine started on thread: 25912
Reading...

(Program End)

为什么呢?且看下面的正常输出结果中对线程运行的分析。

输出结果

1
2
3
4
5
6
Main on thread: 6260 
demo() on thread: 6260
Read File Coroutine started on thread: 6260
Reading...
Reading finished on thread: 20516

demo协程在主线程6260启动,另一个read协程也在主线程6260启动。而读取完毕后,demo协程此时却处于20516线程(内容是通知信息,通知read操作完毕)。

因此上面为什么在read协程函数中用sleep_for就不能正常打印最后一句话,可能和read协程函数调用this_thread睡眠导致主线程睡眠有关系。

总之,测试的结果就是read协程睡眠后,再也没回到demo中去。

这样的线程模式是不好的:

  1. demo协程在主线程启动无所谓,没事。
  2. 但是read协程以及read操作在主线程上就不妥了,应该将read操作另起一个线程
  3. 在read完毕callback后,执行demo中的第三句,应该给主线程通知才对,不应该通知给一个无关的线程。我们此例通知给了一个无关的线程,是因为read协程函数中co_await 5s会导致它另起一个时间线程。导致返回到demo中时,跑到了这个时间线程中去通知了。

初试2

更改模式,应该在read协程函数中另起线程,调用的是co_await agave::resume_background()。意为唤醒一个后台的线程。默认是即刻构造一个std::jthread。我们后期也可以拓展功能,将这个接口连接到一个线程池中,从线程池中拿取线程。
在这种情况下,就可以使用sleep_for了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "Agave.hpp"
#include <iostream>

using namespace std::chrono_literals;

agave::AsyncAction read_file_async(void)
{
co_await agave::resume_background(); // 另起线程
std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id() << '\n';
std::cout << "Reading..." << std::endl;

std::this_thread::sleep_for(5s);

co_return; // 相当于callback
}
agave::AsyncAction demo()
{
std::cout << "demo() on thread: " << std::this_thread::get_id() << '\n';
co_await read_file_async();
std::cout << "Reading finished on thread: " << std::this_thread::get_id() << '\n';
}
int main(void)
{
std::cout << "Main on thread: " << std::this_thread::get_id() << '\n';
auto action = demo();
// 主线程等待协程运行完毕
std::this_thread::sleep_for(8s);
return 0;
}

输出结果:

1
2
3
4
5
6
Main on thread: 29580
demo() on thread: 29580
Read File Coroutine started on thread: 15800
Reading...
Reading finished on thread: 15800

但是此时demo的read操作完毕消息还是没有发送给主线程,而是发送给了read另起的线程中。
如何回到主线程呢?不同的操作系统具体实现不一样,但是我们可以自己写一个统一的接口resume_mainThread。在demo中的co_await read_file_async语句的下一条写下co_await resume_mainThread()。之后的输出操作便的在主线程中操作了。

返回带值

不返回东西时返回值类型写为agave::IAsyncAction
返回东西时,返回值类型写为agave::IAsyncOperation<T>。模板参数类型是要携带的值类型。
对应地,co_await不能裸着用了,要用一个变量接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
agave::AsyncOperation<int> read_file_async(void)
{
co_await agave::resume_background(); // 另起线程
std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id() << '\n';
std::cout << "Reading..." << std::endl;

std::this_thread::sleep_for(5s);

co_return 50; // callback + 携带值
}

agave::AsyncAction demo()
{
std::cout << "demo() on thread: " << std::this_thread::get_id() << '\n';

auto result = co_await read_file_async();

std::cout << "Reading finished: " << result << ", on thread: " << std::this_thread::get_id() << '\n';
}

协程取消

  1. 调用co_await agave::get_cancellation_token()获取一个此协程的取消变量。随时可以查看can.is_canceled()来判断是否有取消信号。
  2. 在其他线程中,通过auto async_var = read_file_async()拿到协程的标识,提供这个标识async_var.cancel(),来给该协程发送取消信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include "Agave.hpp"
#include <iostream>

using namespace std::chrono_literals;

agave::AsyncAction read_file_async(void)
{
co_await agave::resume_background(); // 另起线程
auto can = co_await agave::get_cancellation_token();
std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id() << '\n';
for (int i = 0; i < 5; ++i)
{
std::this_thread::sleep_for(1s);
if (can.is_canceled())
{
std::cout << "Canceled on thread: " << std::this_thread::get_id() << '\n';
break;
}
else
{
std::cout << "Reading... " << i << std::endl;
}
}

co_return; // 相当于callback
}
agave::AsyncAction demo()
{
std::cout << "demo() on thread: " << std::this_thread::get_id() << '\n';
auto async_var = read_file_async();
std::this_thread::sleep_for(3s);
async_var.cancel();
co_await async_var;

std::cout << "Reading finished on thread: " << std::this_thread::get_id() << '\n';

}
int main(void)
{
std::cout << "Main on thread: " << std::this_thread::get_id() << '\n';
auto action = demo();
// 主线程等待协程运行完毕
std::this_thread::sleep_for(8s);
return 0;
}

输出:

1
2
3
4
5
6
7
8
Main on thread: 18768
demo() on thread: 18768
Read File Coroutine started on thread: 19036
Reading... 0
Reading... 1
Canceled on thread: 19036
Reading finished on thread: 19036

Async的机制怎么实现

Async换句话来说是让函数(或任务)的结果在“成功之后”返回(但实际上函数调用后当即返回了)。如果是在C语言下做,是函数返回一个标号,我们在外部轮询一个范围的标号,如果有某个标号上有消息,再去获取它。

场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
using namespace std::literals;
void call_back();
int main()
{
std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;
std::jthread([](std::function<void(void)> cb) -> void
{
std::wcout << L"Thread Id: " << std::this_thread::get_id() << std::endl;
std::wcout << L"doing on worker..." << std::endl;
std::this_thread::sleep_for(5s);
cb();
}, call_back).join();
return 0;
}
void call_back()
{
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;
}

输出结果:

1
2
3
4
5
6
7
Main Thread Id: 39288           // Main Thread 是UI线程
Thread Id: 24632 // Worker 线程
doing on worker...
update UI on Thread: 24632 // 更新不在UI线程上,这种的表现不好
updating...
finish update!

传统回调模式-工作线程自己执行cb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// main.cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
using namespace std::literals;
void call_back();
int main()
{
std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;
std::jthread([](std::function<void(void)> cb) -> void
{
std::wcout << L"Thread Id: " << std::this_thread::get_id() << std::endl;
std::wcout << L"doing on worker..." << std::endl;
std::this_thread::sleep_for(5s);
cb();
}, call_back).join();
return 0;
}
void call_back()
{
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;
}

运行结果:

1
2
3
4
5
6
Main Thread Id: 23092
Thread Id: 11032
doing on worker...
update UI on Thread: 11032
updating...
finish update!

发现,UI的改变是在工作者线程完成的,而不是UI线程。这是错误的。

传统回调模式改进-消息队列-工作者线程传送cb函数对象到main

现在,加一个消息任务队列,工作者线程可以添加cb函数对象到此队列。
然后,主线程(UI线程)可以循环去任务队列拿。拿出一个cb函数对象,由UI线程亲自调用。
这样就可以达到在UI线程上更改内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// main.cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
#include "TaskQueue.h"
using namespace std::literals;
void call_back();
TaskQueue queue;

void run_on_main(std::function<void(void)> fn);
int main()
{
std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;
std::jthread([](std::function<void(void)> cb) -> void
{
std::wcout << L"Thread Id: " << std::this_thread::get_id() << std::endl;
std::wcout << L"doing on worker..." << std::endl;
std::this_thread::sleep_for(5s);
run_on_main(cb);
}, call_back).detach();
queue.run();
return 0;
}
void call_back()
{
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;
}
void run_on_main(std::function<void(void)> fn)
{
queue.add_task(fn);
}

TaskQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// TaskQueue.h
#pragma once
#include <list>
#include <functional>
#include <condition_variable>
#include <mutex>
using Task = std::function<void(void)>;
class TaskQueue
{
public:
void add_task(Task task);
void run(void);
private:
std::list<Task> _task_queue;
std::condition_variable _cv;
std::mutex _mx;
};
-----------------------------------
// TaskQueue.cpp
#include "TaskQueue.h"
void TaskQueue::add_task(Task task)
{
std::unique_lock lck{ _mx };
_task_queue.push_back(task);
_cv.notify_one();
}
void TaskQueue::run()
{
std::list<Task> queue;
std::unique_lock lck{ _mx };
while (true)
{
_cv.wait(lck, [this]()->bool { return _task_queue.size() > 0; });
if (!_task_queue.empty())
{
_task_queue.swap(queue);
}
lck.unlock();
while (queue.size() > 0)
{
auto task = queue.front();
queue.pop_front();
task();
}
lck.lock();
}
}

结果

1
2
3
4
5
6
Main Thread Id: 41388         // Main Thread 是UI线程
Thread Id: 45076 // Worker线程
doing on worker...
update UI on Thread: 41388 // 发现,工作在UI线程,成功。
updating...
finish update!

使用协程-自动启动新线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int main()
{
std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;
DoWorkAsync().get();
return 0;
}
agave::AsyncAction DoWorkAsync()
{
std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id() << std::endl;

// 切换到一个新线程,后台执行下面的语句
co_await agave::resume_background();

std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(5s);

std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);

std::wcout << L"finish update!" << std::endl;

}

结果

1
2
3
4
5
Main Thread Id: 1984
doing on worker: 12268
update UI on Thread: 12268
updating...
finish update!

以上程序由于在协程函数中,worker线程操作完成后,没有切换到前台UI线程。所以UI更新错误地在worker线程中执行。
应该加一个切换到主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agave::AsyncAction DoWorkAsync()
{
std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id() << std::endl;

// 切换到一个新线程,后台执行下面的语句
co_await agave::resume_background();

std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(5s);

co_await agave::resume_foreground();

std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);

std::wcout << L"finish update!" << std::endl;
}

结果:

1
2
3
4
5
Main Thread Id: 8796
doing on worker: 25564
update UI on Thread: 25564
updating...
finish update!

现在,加了一句切换到前台,但是最终UI更新还是在worker线程上。
这是因为,我们是以同步方式完成协程的:DoWorkAsync().get();。所以无法切换。

结合消息队列

去掉DoWorkAsync()后面的get()。异步形式执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
agave::AsyncAction DoWorkAsync();
int main()
{
agave::set_fg_entry([](std::function<void(void)> procedure)
{
queue.add_task(procedure);
});
std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;

DoWorkAsync();
queue.run();
}
agave::AsyncAction DoWorkAsync()
{
std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id() << std::endl;

// 切换到一个新线程,后台执行下面的语句
co_await agave::resume_background();

std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(5s);

// main函数中,设置了前台fg入口,下面的内容就会打包成一个函数对象,传给前台。
co_await agave::resume_foreground();

std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);

std::wcout << L"finish update!" << std::endl;
}

结果:

1
2
3
4
5
Main Thread Id: 2936
doing on worker: 10572
update UI on Thread: 2936
updating...
finish update!

UI和worker协同工作示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
#include "TaskQueue.h"
#include "../Agave/Agave.hpp"

using namespace std::chrono_literals;

void call_back(void);
void run_on_main(std::function<void(void)> fn);
TaskQueue queue;
agave::AsyncAction DoWorkAsync(void);

int main(void)
{
agave::set_fg_entry([](std::function<void(void)> procedure)
{ queue.add_task(procedure); });

/*agave::set_bg_entry([](std::function<void(void)> procedure)
{ std::jthread{ procedure }.detach(); });*/

std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;

std::jthread{ []()
{
for (int i = 0; i < 20; ++i)
{
run_on_main([]()
{
std::wcout << L"doing on UI Thread: " << std::this_thread::get_id() << std::endl;
});
std::this_thread::sleep_for(350ms);
}
} }.detach();


/*std::jthread([](std::function<void(void)> cb) -> void
{
std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(5s);
run_on_main(cb);
}, call_back).detach();*/
DoWorkAsync();

queue.run();

return 0;
}

void call_back(void)
{
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;
}

void run_on_main(std::function<void(void)> fn)
{
queue.add_task(fn);
}

agave::AsyncAction DoWorkAsync(void)
{
co_await agave::resume_background();
for (int i = 0; i < 10; ++i)
{
std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(500ms);
}
//co_await 5s;
//std::wcout << L"doing on worker2: " << std::this_thread::get_id() << std::endl;
co_await agave::resume_foreground();
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;

co_return;
}

厂商模拟开发异步SDK示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <iostream>
#include <thread>
#include <chrono>
#include <functional>
#include "TaskQueue.h"
#include "../Agave/Agave.hpp"
#include "manufacturer.h"

using namespace std::chrono_literals;

void run_on_main(std::function<void(void)> fn);
TaskQueue queue;
agave::AsyncAction DoWorkAsync(void);
agave::AsyncAction DoWorkAsync2(void);

int main(void)
{
agave::set_fg_entry([](std::function<void(void)> procedure)
{ queue.add_task(procedure); });

std::wcout << L"Main Thread Id: " << std::this_thread::get_id() << std::endl;

std::jthread{ []()
{
for (int i = 0; i < 20; ++i)
{
run_on_main([]()
{
std::wcout << L"doing on UI Thread: " << std::this_thread::get_id() << std::endl;
});
std::this_thread::sleep_for(350ms);
}
} }.detach();

DoWorkAsync2();

queue.run();

return 0;
}

agave::AsyncAction DoWorkAsync2(void)
{
int result = co_await ReadFileAsync(L"aa.txt");
co_await agave::resume_foreground();
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"got a result: " << result << std::endl;
}

void run_on_main(std::function<void(void)> fn)
{
queue.add_task(fn);
}

agave::AsyncAction DoWorkAsync(void)
{
co_await agave::resume_background();
for (int i = 0; i < 10; ++i)
{
std::wcout << L"doing on worker: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(500ms);
}
//co_await 5s;
//std::wcout << L"doing on worker2: " << std::this_thread::get_id() << std::endl;
co_await agave::resume_foreground();
std::wcout << L"update UI on Thread: " << std::this_thread::get_id() << std::endl;
std::wcout << L"updating..." << std::endl;
std::this_thread::sleep_for(3s);
std::wcout << L"finish update!" << std::endl;

co_return;
}