协程框架的解释
怎么使用框架?
写了摄像头的驱动,驱动有sdk,用户可以调用sdk,来驱动摄像头。
摄像头是用什么形式的API写的?就不一定了,可能是C语言,可能是C++
。C++
可以按MFC提供,也可以按面向对象提供,也可以按类似于COM组件(接口)的形式提供,也可以按协程库的形式提供。
此协程库框架,是给API厂商使用的。不是一个已经实现了的库,更像是一个中间件。
比如我们想要提供一个读文件的函数给用户使用,readFile,从名字来看其一般是阻塞的,读完后才能返回。如果不想阻塞,则开辟一个线程,在新线程中让其readFile。
非阻塞版本可以称其为readFile_Async
。用户从名字上就能看到,这是非阻塞的,调用后主线程可以立即抽身去处理其他事情,在子线程执行完毕后,以回调的形式通知主线程。
而在协程版本下的实现形式则不像传统线程间的回调形式(被割裂在两端),而是直接地写在了一个函数中,用co_await
连接上下动作,此处的co_await
的成功后的返回“相当于”回调函数。回调之后,就继续做co_await
之后的动作即可。
协程库框架还可以灵活切换协程所在的线程,比如切换到主线程、工作线程等等。也可以配置线程池。
假如我们是一个厂商,现在使用该协程库框架开发一个readFile API:
1 2 3 4 5 6 7 8 9 10 #include "Agave.hpp" #include <iostream> agave::AsyncAction read_file_async (void ) {} agave::AsyncOperation<int > compute_async (void ) {}
协程工作的总体流程
协程的工作流,在形式上,是包装在一个函数中的。
在函数中,间断地进行:work状态和co_await
状态交替。
假如,协程在t1线程上进行work状态。此时,需要co_await
。
co_await
表示等待着某一个其他工作的结果。比如等待网络中传来的文件,等待下载完毕。在co_await
时,此函数就会返回,协程暂停运行。co_await
成功后,才能继续下面的操作。
下面的操作,可能会在t2线程上进行work(也可能继续在t1,只是举个例子)。
随后,可能还会进行co_await
等操作。
看样子co_await
表示的是“等待”的意思,但在协程的工作环境中,其实语义更像是"不等了",“我先去处理别的事了,先走了”。有点儿异步的感觉。因为co_await
语句使用时,会马上退出既有流程,直到被通知,才进行回调,从而返回work状态。
从t1线程到t2线程的转变,相当于调用了一个callback。此callback是被co_await
间隔的。
由上述例子说明,协程和线程不冲突(或者说没关系),因为每次co_await
成功后不一定会切换到另一个线程。
创建项目
先创建一个空项目。(Agave是根据C++
标准协程库编写的,记得设置为20标准)
再copy已有的4个文件(Agave.cpp
、B_Object.hpp
、BJobScheduler.cpp
、BJobScheduler.h
)到该空项目文件目录下。
之后,右键VS项目中的Header Files
,Add,Existing Item,选择.hpp
和.h
文件。
右键Source Files
,Add,Existing Item,选择.cpp
文件。
在Source Files
中新建一个文件demo.cpp
。在其中编写测试代码。
初试1
demo函数返回值是agave::IAsyncAction
,则代表此函数是一个协程。
我们在main主线程中调用demo,即在主线程中创建此协程。
demo协程在其中co_await
,后面接的read_file_async
可以看作是厂家的API。这个API的名称带有async
,意味着它是一个异步的操作,在此co_await XXX_async
语句后,会立即返回。但是不会进行下一步操作(输出读取完毕),而是暂时结束demo“函数”,直到co_await
的操作(read_file_async
)完成后,才继续demo之后的操作,相当于被callback。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include "Agave.hpp" #include <iostream> using namespace std::chrono_literals;agave::AsyncAction read_file_async (void ) { std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id () << '\n' ; std::cout << "Reading..." << std::endl; co_await 5 s; co_return ; } agave::AsyncAction demo () { std::cout << "demo() on thread: " << std::this_thread::get_id () << '\n' ; co_await read_file_async () ; std::cout << "Reading finished on thread: " << std::this_thread::get_id () << '\n' ; } int main (void ) { std::cout << "Main on thread: " << std::this_thread::get_id () << '\n' ; auto action = demo (); std::this_thread::sleep_for (8 s); return 0 ; }
关于为什么不能用sleep_for
经测试,如果用sleep_for
输出是这样:发现没有执行输出demo的第三个语句(Reading finished)
1 2 3 4 5 6 Main on thread: 25912 demo() on thread: 25912 Read File Coroutine started on thread: 25912 Reading... (Program End)
为什么呢?且看下面的正常输出结果中对线程运行的分析。
输出结果
1 2 3 4 5 6 Main on thread: 6260 demo() on thread: 6260 Read File Coroutine started on thread: 6260 Reading... Reading finished on thread: 20516
demo协程在主线程6260启动,另一个read协程也在主线程6260启动。而读取完毕后,demo协程此时却处于20516线程(内容是通知信息,通知read操作完毕)。
因此上面为什么在read协程函数中用sleep_for
就不能正常打印最后一句话,可能和read协程函数调用this_thread
睡眠导致主线程睡眠有关系。
总之,测试的结果就是read协程睡眠后,再也没回到demo中去。
这样的线程模式是不好的:
demo协程在主线程启动无所谓,没事。
但是read协程以及read操作在主线程上就不妥了,应该将read操作另起一个线程
在read完毕callback后,执行demo中的第三句,应该给主线程通知才对,不应该通知给一个无关的线程。我们此例通知给了一个无关的线程,是因为read协程函数中co_await 5s
会导致它另起一个时间线程。导致返回到demo中时,跑到了这个时间线程中去通知了。
初试2
更改模式,应该在read协程函数中另起线程,调用的是co_await agave::resume_background()
。意为唤醒一个后台的线程。默认是即刻构造一个std::jthread
。我们后期也可以拓展功能,将这个接口连接到一个线程池中,从线程池中拿取线程。
在这种情况下,就可以使用sleep_for
了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include "Agave.hpp" #include <iostream> using namespace std::chrono_literals;agave::AsyncAction read_file_async (void ) { co_await agave::resume_background () ; std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id () << '\n' ; std::cout << "Reading..." << std::endl; std::this_thread::sleep_for (5 s); co_return ; } agave::AsyncAction demo () { std::cout << "demo() on thread: " << std::this_thread::get_id () << '\n' ; co_await read_file_async () ; std::cout << "Reading finished on thread: " << std::this_thread::get_id () << '\n' ; } int main (void ) { std::cout << "Main on thread: " << std::this_thread::get_id () << '\n' ; auto action = demo (); std::this_thread::sleep_for (8 s); return 0 ; }
输出结果:
1 2 3 4 5 6 Main on thread: 29580 demo() on thread: 29580 Read File Coroutine started on thread: 15800 Reading... Reading finished on thread: 15800
但是此时demo的read操作完毕消息还是没有发送给主线程,而是发送给了read另起的线程中。
如何回到主线程呢?不同的操作系统具体实现不一样,但是我们可以自己写一个统一的接口resume_mainThread
。在demo中的co_await read_file_async
语句的下一条写下co_await resume_mainThread()
。之后的输出操作便的在主线程中操作了。
返回带值
不返回东西时返回值类型写为agave::IAsyncAction
。
返回东西时,返回值类型写为agave::IAsyncOperation<T>
。模板参数类型是要携带的值类型。
对应地,co_await
不能裸着用了,要用一个变量接收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 agave::AsyncOperation<int > read_file_async (void ) { co_await agave::resume_background () ; std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id () << '\n' ; std::cout << "Reading..." << std::endl; std::this_thread::sleep_for (5 s); co_return 50 ; } agave::AsyncAction demo () { std::cout << "demo() on thread: " << std::this_thread::get_id () << '\n' ; auto result = co_await read_file_async (); std::cout << "Reading finished: " << result << ", on thread: " << std::this_thread::get_id () << '\n' ; }
协程取消
调用co_await agave::get_cancellation_token()
获取一个此协程的取消变量。随时可以查看can.is_canceled()
来判断是否有取消信号。
在其他线程中,通过auto async_var = read_file_async()
拿到协程的标识,提供这个标识async_var.cancel()
,来给该协程发送取消信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include "Agave.hpp" #include <iostream> using namespace std::chrono_literals;agave::AsyncAction read_file_async (void ) { co_await agave::resume_background () ; auto can = co_await agave::get_cancellation_token (); std::cout << "Read File Coroutine started on thread: " << std::this_thread::get_id () << '\n' ; for (int i = 0 ; i < 5 ; ++i) { std::this_thread::sleep_for (1 s); if (can.is_canceled ()) { std::cout << "Canceled on thread: " << std::this_thread::get_id () << '\n' ; break ; } else { std::cout << "Reading... " << i << std::endl; } } co_return ; } agave::AsyncAction demo () { std::cout << "demo() on thread: " << std::this_thread::get_id () << '\n' ; auto async_var = read_file_async (); std::this_thread::sleep_for (3 s); async_var.cancel (); co_await async_var; std::cout << "Reading finished on thread: " << std::this_thread::get_id () << '\n' ; } int main (void ) { std::cout << "Main on thread: " << std::this_thread::get_id () << '\n' ; auto action = demo (); std::this_thread::sleep_for (8 s); return 0 ; }
输出:
1 2 3 4 5 6 7 8 Main on thread: 18768 demo() on thread: 18768 Read File Coroutine started on thread: 19036 Reading... 0 Reading... 1 Canceled on thread: 19036 Reading finished on thread: 19036
Async的机制怎么实现
Async换句话来说是让函数(或任务)的结果在“成功之后”返回(但实际上函数调用后当即返回了)。如果是在C语言下做,是函数返回一个标号,我们在外部轮询一个范围的标号,如果有某个标号上有消息,再去获取它。
场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <thread> #include <chrono> #include <functional> using namespace std::literals;void call_back () ;int main () { std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; std::jthread ([](std::function<void (void )> cb) -> void { std::wcout << L"Thread Id: " << std::this_thread::get_id () << std::endl; std::wcout << L"doing on worker..." << std::endl; std::this_thread::sleep_for (5 s); cb (); }, call_back).join (); return 0 ; } void call_back () { std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; }
输出结果:
1 2 3 4 5 6 7 Main Thread Id: 39288 // Main Thread 是UI线程 Thread Id: 24632 // Worker 线程 doing on worker... update UI on Thread: 24632 // 更新不在UI线程上,这种的表现不好 updating... finish update!
传统回调模式-工作线程自己执行cb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> #include <chrono> #include <functional> using namespace std::literals;void call_back () ;int main () { std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; std::jthread ([](std::function<void (void )> cb) -> void { std::wcout << L"Thread Id: " << std::this_thread::get_id () << std::endl; std::wcout << L"doing on worker..." << std::endl; std::this_thread::sleep_for (5 s); cb (); }, call_back).join (); return 0 ; } void call_back () { std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; }
运行结果:
1 2 3 4 5 6 Main Thread Id: 23092 Thread Id: 11032 doing on worker... update UI on Thread: 11032 updating... finish update!
发现,UI的改变是在工作者线程完成的,而不是UI线程。这是错误的。
传统回调模式改进-消息队列-工作者线程传送cb函数对象到main
现在,加一个消息任务队列,工作者线程可以添加cb函数对象到此队列。
然后,主线程(UI线程)可以循环去任务队列拿。拿出一个cb函数对象,由UI线程亲自调用。
这样就可以达到在UI线程上更改内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <iostream> #include <thread> #include <chrono> #include <functional> #include "TaskQueue.h" using namespace std::literals;void call_back () ;TaskQueue queue; void run_on_main (std::function<void (void )> fn) ;int main () { std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; std::jthread ([](std::function<void (void )> cb) -> void { std::wcout << L"Thread Id: " << std::this_thread::get_id () << std::endl; std::wcout << L"doing on worker..." << std::endl; std::this_thread::sleep_for (5 s); run_on_main (cb); }, call_back).detach (); queue.run (); return 0 ; } void call_back () { std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; } void run_on_main (std::function<void (void )> fn) { queue.add_task (fn); }
TaskQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #pragma once #include <list> #include <functional> #include <condition_variable> #include <mutex> using Task = std::function<void (void )>;class TaskQueue { public : void add_task (Task task) ; void run (void ) ; private : std::list<Task> _task_queue; std::condition_variable _cv; std::mutex _mx; }; ----------------------------------- #include "TaskQueue.h" void TaskQueue::add_task (Task task) { std::unique_lock lck{ _mx }; _task_queue.push_back (task); _cv.notify_one (); } void TaskQueue::run () { std::list<Task> queue; std::unique_lock lck{ _mx }; while (true ) { _cv.wait (lck, [this ]()->bool { return _task_queue.size () > 0 ; }); if (!_task_queue.empty ()) { _task_queue.swap (queue); } lck.unlock (); while (queue.size () > 0 ) { auto task = queue.front (); queue.pop_front (); task (); } lck.lock (); } }
结果
1 2 3 4 5 6 Main Thread Id: 41388 // Main Thread 是UI线程 Thread Id: 45076 // Worker线程 doing on worker... update UI on Thread: 41388 // 发现,工作在UI线程,成功。 updating... finish update!
使用协程-自动启动新线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int main () { std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; DoWorkAsync ().get (); return 0 ; } agave::AsyncAction DoWorkAsync () { std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id () << std::endl; co_await agave::resume_background () ; std::wcout << L"doing on worker: " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (5 s); std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; }
结果
1 2 3 4 5 Main Thread Id: 1984 doing on worker: 12268 update UI on Thread: 12268 updating... finish update!
以上程序由于在协程函数中,worker线程操作完成后,没有切换到前台UI线程。所以UI更新错误地在worker线程中执行。
应该加一个切换到主线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 agave::AsyncAction DoWorkAsync () { std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id () << std::endl; co_await agave::resume_background () ; std::wcout << L"doing on worker: " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (5 s); co_await agave::resume_foreground () ; std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; }
结果:
1 2 3 4 5 Main Thread Id: 8796 doing on worker: 25564 update UI on Thread: 25564 updating... finish update!
现在,加了一句切换到前台,但是最终UI更新还是在worker线程上。
这是因为,我们是以同步方式完成协程的:DoWorkAsync().get();
。所以无法切换。
结合消息队列
去掉DoWorkAsync()
后面的get()
。异步形式执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 agave::AsyncAction DoWorkAsync () ;int main () { agave::set_fg_entry ([](std::function<void (void )> procedure) { queue.add_task (procedure); }); std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; DoWorkAsync (); queue.run (); } agave::AsyncAction DoWorkAsync () { std::wcout << L"DoWorkAsync started on thread: " << std::this_thread::get_id () << std::endl; co_await agave::resume_background () ; std::wcout << L"doing on worker: " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (5 s); co_await agave::resume_foreground () ; std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; }
结果:
1 2 3 4 5 Main Thread Id: 2936 doing on worker: 10572 update UI on Thread: 2936 updating... finish update!
UI和worker协同工作示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 #include <iostream> #include <thread> #include <chrono> #include <functional> #include "TaskQueue.h" #include "../Agave/Agave.hpp" using namespace std::chrono_literals;void call_back (void ) ;void run_on_main (std::function<void (void )> fn) ;TaskQueue queue; agave::AsyncAction DoWorkAsync (void ) ;int main (void ) { agave::set_fg_entry ([](std::function<void (void )> procedure) { queue.add_task (procedure); }); std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; std::jthread{ []() { for (int i = 0 ; i < 20 ; ++i) { run_on_main ([]() { std::wcout << L"doing on UI Thread: " << std::this_thread::get_id () << std::endl; }); std::this_thread::sleep_for (350 ms); } } }.detach (); DoWorkAsync (); queue.run (); return 0 ; } void call_back (void ) { std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; } void run_on_main (std::function<void (void )> fn) { queue.add_task (fn); } agave::AsyncAction DoWorkAsync (void ) { co_await agave::resume_background () ; for (int i = 0 ; i < 10 ; ++i) { std::wcout << L"doing on worker: " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (500 ms); } co_await agave::resume_foreground () ; std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; co_return ; }
厂商模拟开发异步SDK示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 #include <iostream> #include <thread> #include <chrono> #include <functional> #include "TaskQueue.h" #include "../Agave/Agave.hpp" #include "manufacturer.h" using namespace std::chrono_literals;void run_on_main (std::function<void (void )> fn) ;TaskQueue queue; agave::AsyncAction DoWorkAsync (void ) ;agave::AsyncAction DoWorkAsync2 (void ) ;int main (void ) { agave::set_fg_entry ([](std::function<void (void )> procedure) { queue.add_task (procedure); }); std::wcout << L"Main Thread Id: " << std::this_thread::get_id () << std::endl; std::jthread{ []() { for (int i = 0 ; i < 20 ; ++i) { run_on_main ([]() { std::wcout << L"doing on UI Thread: " << std::this_thread::get_id () << std::endl; }); std::this_thread::sleep_for (350 ms); } } }.detach (); DoWorkAsync2 (); queue.run (); return 0 ; } agave::AsyncAction DoWorkAsync2 (void ) { int result = co_await ReadFileAsync (L"aa.txt" ); co_await agave::resume_foreground () ; std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"got a result: " << result << std::endl; } void run_on_main (std::function<void (void )> fn) { queue.add_task (fn); } agave::AsyncAction DoWorkAsync (void ) { co_await agave::resume_background () ; for (int i = 0 ; i < 10 ; ++i) { std::wcout << L"doing on worker: " << std::this_thread::get_id () << std::endl; std::this_thread::sleep_for (500 ms); } co_await agave::resume_foreground () ; std::wcout << L"update UI on Thread: " << std::this_thread::get_id () << std::endl; std::wcout << L"updating..." << std::endl; std::this_thread::sleep_for (3 s); std::wcout << L"finish update!" << std::endl; co_return ; }