线程池_bubo

创建项目

在已有或新建的解决方案里,新建项目,取名“ThreadPool”。
项目配置标准为C++20标准。

ITask

新建头文件ITask.h

1
2
3
4
5
6
7
8
namespace thpool
{
class ITask
{
public:
virtual void run_task(void) = 0;
};
}

基于信号量的ThreadPool

新建头文件“ThreadPool.h”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ThreadPool.h
#include <thread>
#include <mutex>
#include <semaphore>
#include <list>
namespace thpool
{
class ITask;
class ThreadPool
{
public:
ThreadPool(int max_num);
~ThreadPool();
void add_task(std::shared_ptr<ITask> task);
void add_task(std::list<std::shared_ptr<ITask>> task_lst);
private:
// capacity
int _max_num;
// current running task threads
int _alive_num;
std::counting_semaphore<100> _semaphore;
std::mutex _access_mx;
bool _is_exit{ false };
std::list<std::shared_ptr<ITask>> _task_queue;
};
}

实现:

  1. 初始化_max_num,规定线程池最大线程数
  2. 初始化_alive_num,线程池刚创建时,正在执行任务的线程为0。
  3. 初始化_semaphore为0。

共创建_max_num个线程,每个线程基于信号量获取任务,如果成功则_alive_num加1,如果此时_is_exit标志为真则意味着线程池即将析构,_alive_num减1,并退出循环。
如果_is_exit标志不为真,则尝试从任务队列中提取任务,需要用互斥锁同步,在外层先简单判断队列大小是否大于0,然后再用锁去再次获取真实值(这样外层先判断,内层再加锁判断,可以加大条件为真的概率,避免锁太急切地加,降低性能),再去执行提取出的任务。循环,直到任务队列大小为0,退出while循环后_alive_num减1。
退出大的while时(_is_exit标志为真时),_max_num减1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ThreadPool.cpp
#include "ThreadPool.h"
#include "ITask.h"

thpool::ThreadPool::ThreadPool(int max_num)
: _max_num{ max_num }, _alive_num{ 0 }, _semaphore{ 0 }
{
for (int i = 0; i < _max_num; ++i)
{
std::jthread th([this](void)
{
while (true)
{
_semaphore.acquire();
++_alive_num;
if (_is_exit)
{
--_alive_num;
break;
}

while (_task_queue.size() > 0)
{
std::unique_lock lck{ _access_mx };
if (_task_queue.size() > 0)
{
auto task = _task_queue.front();
_task_queue.pop_front();
lck.unlock();
task->run_task();
}
}
--_alive_num;
}
--_max_num;
});
th.detach();
}
}

void thpool::ThreadPool::add_task(std::shared_ptr<ITask> task)
{
if (!task)
return;
std::unique_lock lck{ _access_mx };
_task_queue.push_back(task);
lck.unlock();
// 判断是否有空余的线程,如果有则唤醒一个
if (_max_num - _alive_num > 0)
{
_semaphore.release(1);
}
}

但是以上程序存在一些问题:

  1. _semaphore.acquire()++_alive_num不是同步的。
  2. _is_exit的判断和--_alive_num也不是同步的。
  3. 在每个线程中都进行while循环反复判断任务队列是否为空,可能会导致一个线程一直独占_access_mx互斥量。
  4. 在执行完任务后的--_alive_num步骤,是和执行结束是不同步的,有可能_max_num个线程同时卡在这一步,导致add_task函数中的_max_num - _alive_num > 0的条件不成立,导致_semaphore.release(1)不会执行。这将导致系统实际增加了任务,却没有增加任务的信号量。有小概率死锁(死锁在_semaphore.acquire())。
  5. 综上所述,最好把_semaphore_alive_num融为一体,不要割裂两者。(也就是说,能不能让信号量的量和_alive_num无缝保持一致)
    1. 操作系统的具体实现,如Windows、Linux是可以随时获取信号量的量的,但是我们现在使用的是跨平台C++信号量,没有提供获取量大小的方法,因此必须有一个_alive_num记录。
    2. 可以在_semaphore.acquire()++_alive_num这两个动作整体加锁吗?不可以,因为_semaphore本身就是会阻塞的东西,如果加了锁后,_semaphore也阻塞了,那么锁就不能解开了。
    3. 上面提到的,加了锁后,里面的东西阻塞了,想要把锁解开,有一样东西可以实现:条件变量。但是,条件变量没有像信号量记录数目的功能(要么是notify_one,要么是notify_all),因此不行。
  6. 最简单的彻底解决4死锁的问题的方法是,add_task方法中不再判断_max_num - _alive_num > 0的条件,即无论如何,在添加任务时,都要_semaphore.release(1)。这样做的副作用就是要把std::counting_semaphore _semaphore在声明时,定义其为一个最大值为无限大(最大数)的信号量,可以用std::counting_semaphore<> _semaphore表示。

修改后的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ThreadPool.h
#include <thread>
#include <mutex>
#include <semaphore>
#include <list>
namespace thpool
{
class ITask;
class ThreadPool
{
public:
ThreadPool(int max_num);
~ThreadPool();
void add_task(std::shared_ptr<ITask> task);
void add_task(std::list<std::shared_ptr<ITask>> task_lst);
private:
// capacity
int _max_num;
// current running task threads
int _alive_num;
std::counting_semaphore<> _semaphore;
std::mutex _access_mx;
bool _is_exit{ false };
std::list<std::shared_ptr<ITask>> _task_queue;
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ThreadPool.cpp
#include "ThreadPool.h"
#include "ITask.h"

thpool::ThreadPool::ThreadPool(int max_num)
: _max_num{ max_num }, _alive_num{ 0 }, _semaphore{ 0 }
{
for (int i = 0; i < _max_num; ++i)
{
std::jthread th([this](void)
{
while (true)
{
_semaphore.acquire();
++_alive_num;
if (_is_exit)
{
--_alive_num;
break;
}

std::unique_lock lck{ _access_mx };
if (_task_queue.size() > 0)
{
auto task = _task_queue.front();
_task_queue.pop_front();
lck.unlock();
task->run_task();
}
--_alive_num;
}
--_max_num;
});
th.detach();
}
}

void thpool::ThreadPool::add_task(std::shared_ptr<ITask> task)
{
if (!task)
return;
std::unique_lock lck{ _access_mx };
_task_queue.push_back(task);
lck.unlock();

_semaphore.release(1);
}

线程池的析构——latch的运用

latch是C++20引入的标准。
实际上是对操作系统同步量的操作的封装,比如在Windows下,latch就是对事件的封装,或者是对WaitFor Single/Mutiple Object的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ThreadPool.h
#include <thread>
#include <mutex>
#include <semaphore>
#include <list>
#include <latch>
namespace thpool
{
class ITask;
class ThreadPool
{
public:
// ...
private:
// ...
std::latch _latch;
// ...
};
}

在ThreadPool构造时初始化_latchmax_num,表示需要等待max_num个线程的结束,latch才放行。
同时,要在退出大的while循环时,_max_num减1之后,进行_latch.count_down(),参数默认为1,意为对latch值减1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ThreadPool.cpp
#include "ThreadPool.h"
#include "ITask.h"

thpool::ThreadPool::ThreadPool(int max_num)
: _max_num{ max_num }, _alive_num{ 0 }, _semaphore{ 0 }, _latch{ max_num }
{
for (int i = 0; i < _max_num; ++i)
{
std::jthread th([this](void)
{
while (true)
{
_semaphore.acquire();
++_alive_num;
if (_is_exit)
{
--_alive_num;
break;
}

// ...
}
--_max_num;
_latch.count_down();
});
th.detach();
}
}

析构函数,则可以利用latch,让其在析构函数设置_is_exit标志为true且释放_max_num信号量后,wait直到latch值为0时,代表所有线程都结束了,就可以返回了。

1
2
3
4
5
6
7
8
// ThreadPool.cpp
thpool::ThreadPool::~ThreadPool()
{
_is_exit = true;
_semaphore.release(_max_num);
// 等待线程结束,对应latch其值为0时
_latch.wait();
}

测试

新建main_entry.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// main_entry.cpp
#include "ThreadPool.h"
#include "ITask.h"
#include <iostream>
using namespace std::chrono_literals;
class Task : public thpool::ITask
{
public:
virtual void run_task(void) override
{
std::wcout << L"task" << std::endl;
}
};
int main(void)
{
thpool::ThreadPool thread_pool{ 10 };
thread_pool.add_task(std::shared_ptr<thpool::ITask>(new Task));
thread_pool.add_task(std::shared_ptr<thpool::ITask>(new Task));
std::this_thread::sleep_for(10s);
return 0;
}

测试结果:

两个task黏在一起,说明多线程输出的。

在Debug-Windows-Threads中,可以看到目前程序中的线程状况:

可以看到有10个子线程在其中:

7z_常用命令

命令格式

1
Usage: 7zz <command> [<switches>...] <archive_name> [<file_names>...] [@listfile]

安装

将7zz文件移动到系统的bin目录
sudo cp 7zz /usr/local/bin/7zz
并且确保/usr/local/bin在系统PATH中
打开或创建.zshrc(如果你使用zsh)或.bash_profile(如果你使用bash)文件:
确认有如下内容(如果没有,请添加):
export PATH="/usr/local/bin:$PATH"
保存并关闭文件,然后执行以下命令使更改生效:
source ~/.zshrc~/.bash_profile

command(必须)

Command Description
a Add files to archive
b Benchmark
d Delete files from archive
e Extract files from archive (without using directory names)
h Calculate hash values for files
i Show information about supported formats
l List contents of archive
rn Rename files in archive
t Test integrity of archive
u Update files to archive
x eXtract files with full paths
  1. a是代表压缩文件到压缩包
  2. e中的“without using directory names”表示的是解压缩后的文件将不会有文件夹一级的目录,而是全部为零散的文件。
  3. x代表解压缩文件,解压缩后有压缩包中原来的文件夹。

switches(可选)

下面只列出了常用的。

Switches Description
-m{Parameters} set compression Method
-mmt[N] set number of CPU threads
-mx[N] set compression level: -mx1 (fastest) … -mx9 (ultra)
-o{Directory} set Output directory
-p{Password} set Password
-slt show technical information for l (List) command
-snh store hard links as links
-snl store symbolic links as links
-ssp do not change Last Access Time of source files while archiving
-stx{Type} exclude archive type
-t{Type} Set type of archive
-y assume Yes on all queries
注意,它给出的形式是形如这种的:-o{Directory}-o和后面的参数名是没有空格间隙的,否则无法识别。

现在比如说,我想要解压缩QuickGeometryUI.zip到当前目录下的dir文件夹(未创建):

1
7zz x QuickGeometryUI.zip -odir

我想要压缩当前目录所有文件(a、b、.test)到my7zip

1
7zz a my7zip *

结果是:

  1. 只压缩了a、b,隐藏文件没有,因为*不会通配隐藏文件。
  2. 压缩包名字自动带上了后缀my7zip.7z
    所以,如果想要压缩包含隐藏文件,建议将其作为整个文件夹(放到了Test7zip目录中)压缩。
1
7zz a my7zip Test7zip