SGI-STL和Nginx内存池剖析

内容

内存池是为了更高效地管理小块内存的频繁开辟、释放。

  1. C++ STL:标准模板库。
  2. SGI STL:第三方厂商开发的,后来被纳入C++标准,成为了C++ STL中管理内存的底层实现。
  3. Nginx内存池设计

C++ STL空间配置器

先用vector举个例子

1
2
3
4
5
template<typename T, typename _Alloc = allocator<T>>
class vector
{

};

可以看到,vector第二个模板参数是一个空间配置器。

主要包含了四个方法:

  1. allocate:负责给容器开辟内存空间=>malloc
  2. deallocate:负责释放容器内存空间=>free
  3. construct:负责在容器中构造对象=>定位new:是基于已经开辟好了的容器空间中直接构造的。
  4. destroy:负责析构容器中的对象=>p->~T()

空间配置器的核心作用:

  1. 拆开了new的两个操作——对象的内存开辟、对象构造;
  2. 拆开了delete的两个操作——对象的析构,内存的内存释放。
    把空间和对象本身分开,在容器这个场景下更为适合。

SGI STL 的两级 allocator

提供了两个allocator的实现:

  1. 一级allocator,实际上就是malloc/free
  2. 二级allocator,是基于内存池的内存管理。
    本文主要剖析SGI的二级allocator,即内存池的实现。

通过阅读源码,发现:

SGI STL底层对于容器的对象的构造、析构是通过自定义的全局模板函数Construct和Destroy完成的。
而点进去发现本质上仍是通过定位new调用对象的析构函数完成的。这些都是内存申请、释放工作之外的动作。

因此,可以推断,SGI STL的空间配置器主要工作的区别在于 allocate 和 deallocate,即对容器内存申请、释放的管理

SGI STL 内存池的实现

内存池的粒度信息

1
2
3
enum { _ALIGN = 8 };
enum { _MAX_BYTES = 128 };
enum { _NFREELISTS = 16 };

每一个内存chunk块的头信息

1
2
3
4
5
union _Obj
{
union _Obj* _M_free_list_link;
char _M_client_data[1];
};

组织所有自由链表的指针数组。
这是静态变量,多线程共享,volatile避免了读取缓存的脏数据。

1
static _Obj* __STL_VOLATILE _S_free_list[_NFREELISTS];

图示:

一个数组,第 1 个位置存放的是 8 字节的内存池,第 2 个位置存放的是 16 字节的内存池,…,最后一个位置存放的是 128 字节的内存池。数组的大小和每个位置对应的字节数由内存池的粒度信息决定。

allocate(size_t)

定义在stl_alloc.h

统一的allocate接口。外部申请 n 个字节。如果是大块内存,则普通malloc。
如果是128字节及以下的内存,则会从内存池中分配,这就是二级空间配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  static void* allocate(size_t __n)
{
void* __ret = 0;

if (__n > (size_t) _MAX_BYTES)
{
__ret = malloc_alloc::allocate(__n);
}
else // 核心代码
{
// 返回 freelist 的某下标的指针(二级指针)
_Obj* __STL_VOLATILE* __my_free_list
= _S_free_list + _S_freelist_index(__n);
// Acquire the lock here with a constructor call.
// This ensures that it is released in exit or during stack
// unwinding.

# ifndef _NOTHREADS
/*REFERENCED*/
_Lock __lock_instance;
# endif
// 去看 freelist 这个下标,有没有已开辟且由空闲的块
_Obj* __RESTRICT __result = *__my_free_list;
if (__result == nullptr) // 没有空闲块,或者没开辟。
{
__ret = _S_refill(_S_round_up(__n));
}
else // 有空闲块。
{
// 让 这个下标 指向下一块空闲块。(见_Obj联合体定义)
*__my_free_list = __result -> _M_free_list_link;
// 返回的是 __result 这个空闲块
__ret = __result;
}
}
return __ret;
};

_S_freelist_index(size_t)

用于返回freelist的下标,0下标存放的是8字节的内存池块。1下标存放的是16字节的内存池块。以此类推。
比如,
外部申请 1 字节,则 (1+81)/81=11=0(1 + 8 - 1) / 8 - 1 = 1 - 1 = 0
外部申请 7 字节,则 (7+81)/81=11=0(7 + 8 - 1) / 8 - 1 = 1 - 1 = 0
外部申请 8 字节,则 (8+81)/81=11=0(8 + 8 - 1) / 8 - 1 = 1 - 1 = 0
外部申请 9 字节,则 (9+81)/81=21=1(9 + 8 - 1) / 8 - 1 = 2 - 1 = 1

1
2
3
4
static  size_t _S_freelist_index(size_t __bytes)
{
return (__bytes + (size_t)_ALIGN - 1 ) / (size_t)_ALIGN - 1;
}

_S_round_up(size_t)

外部申请 n 字节,返回的是 n 字节对应的在内存池块中,一小块的实际大小(8的整数倍)。
类似于向上取整(取8的整倍数的最小值),比如输入1到8,输出8。输入9到16,输出16。(输入0,输出0)
比如,
外部申请 1 字节,则 (1+81)&(81)=8&(7)=1000&(0111)=1000&1000=8(1 + 8 - 1) \& \sim(8 - 1) = 8 \& \sim(7) = 1000 \& \sim(0111) = 1000 \& 1000 = 8
外部申请 7 字节,则 (7+81)&(151)=15&(7)=1110&(0111)=1110&1000=8(7 + 8 - 1) \& \sim(15 - 1) = 15 \& \sim(7) = 1110 \& \sim(0111) = 1110 \& 1000 = 8
外部申请 8 字节,则 (8+81)&(161)=15&(7)=1111&(0111)=1111&1000=8(8 + 8 - 1) \& \sim(16 - 1) = 15 \& \sim(7) = 1111 \& \sim(0111) = 1111 \& 1000 = 8
外部申请 9 字节,则 (9+81)&(171)=16&(7)=10000&(00111)=10000&11000=16(9 + 8 - 1) \& \sim(17 - 1) = 16 \& \sim(7) = 10000 \& \sim(00111) = 10000 \& 11000 = 16

1
2
3
4
5
static size_t
_S_round_up(size_t __bytes)
{
return (__bytes + (size_t)_ALIGN - 1) & ~((size_t)_ALIGN - 1);
}

_S_refill(size_t)

调用_S_chunk_alloc(__n, __nobjs),在内存池块中尽量找一个合适的小字节块。
_S_chunk_alloc内部会帮你处理底层的开辟内存池,处理内存碎片,管理内存池的指示信息等等。
由于_S_chunk_alloc第二个参数__nobjs传入的是引用,有可能__nobjs会被改变。
调用之前,__nobjs是我们想要申请的小字节块的个数。
调用结束后,__nobjs更新为了实际分配到的小字节块的个数。
如果是 1 ,此次分配完之后,这个内存池块正好用完了,不构建freelist下标的链表。
其他情况,构建相应的freelist下标的链表。
for循环中做的是遍历内存池大块中的每个单元小块,联合体_Obj*_M_free_list_link指向紧挨着的下一个单元小块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* Returns an object of size __n, and optionally adds to size __n free list.*/
/* We assume that __n is properly aligned. */
/* We hold the allocation lock. */
template <bool __threads, int __inst>
void*
__default_alloc_template<__threads, __inst>::_S_refill(size_t __n)
{
int __nobjs = 20;
char* __chunk = _S_chunk_alloc(__n, __nobjs);

_Obj* __STL_VOLATILE* __my_free_list;
_Obj* __result;
_Obj* __current_obj;
_Obj* __next_obj;
int __i;

if (1 == __nobjs) return(__chunk);

__my_free_list = _S_free_list + _S_freelist_index(__n);

/* Build free list in chunk */
__result = (_Obj*)__chunk;
*__my_free_list = __next_obj = (_Obj*)(__chunk + __n);
for (__i = 1; ; __i++)
{
__current_obj = __next_obj;
__next_obj = (_Obj*)((char*)__next_obj + __n);
if (__nobjs - 1 == __i)
{
__current_obj -> _M_free_list_link = 0;
break;
}
else
{
__current_obj -> _M_free_list_link = __next_obj;
}
}
return(__result);
}

图示:

_S_chunk_alloc(size_t, int& nobjs)

在内存池块中尽量找一个合适的小字节块。
期间,可能会改变外部传入的__nobjs。(因为是引用,外部会受影响)

__total_bytes记录的是欲开辟的内存池大小(根据__nobjs,这个是小字节块数)。
__bytes_left指的是_S_end_free - _S_start_free,指目前未被开发的大小。

__bytes_left还足够,返回,无需开辟新空间

如果__bytes_left还足够(至少是 1 个小字节块),则返回_S_start_free。同时移动_S_start_free到新的位置。
成功返回。无需额外操作。


__bytes_left不足,开辟新的更大的内存池块

如果__bytes_left不足,则将要开辟新的更大的内存池块。__bytes_to_get = 2 * __total_bytes + _S_round_up(_S_heap_size >> 4)

__bytes_left > 0,头插到合适的一个链表,管理这个内存碎片

如果__bytes_left > 0。这时,为了不浪费__bytes_left而造成内存碎片,把这部分尚余的小内存给其对应的freelist下标的链头插。比如,剩余 32 字节,则找下标 3 ,头插进去。(见下文图示)

开辟新的大块内存_malloc正常

以下是__bytes_left不足时,都需要做的操作,目的是开辟新的大块内存。返回的地址赋给_S_start_free


如果malloc正常(malloc返回 != 0),更新_S_heap_size += __bytes_to_get;,更新_S_end_free = _S_start_free + __bytes_to_get;

malloc成功,递归调用return(_S_chunk_alloc(__size, __nobjs));

开辟新的大块内存_malloc失败:找内存池中已开辟的大单元链表的空间


如果malloc失败(malloc返回 == 0)。则处理:
遍历freelist的下标各个内存池块。从size_t __n对应的freelist下标,依次往后找有没有还有空闲的。

__i初始化为__n,循环,每次__i += (size_t) _ALIGN(即加8)。比如,__n等于 40 字节,我们依次去找40、48、56等等的freelist下标的内存池块,看看有没有能分配出来空间的。

如果有,则_S_start_free指向第一个空闲块。更新_S_end_free = _S_start_free + __i;
好了,成功在更大单元的内存池块找到,递归调用return(_S_chunk_alloc(__size, __nobjs));

开辟新的大块内存_malloc失败:异常处理


如果以上的for循环找了后面更大单元的内存池块,仍没有可用空间,则是系统内存不足的迹象:
需要调用malloc_alloc::allocate(__bytes_to_get);
内部最后一次进行普通malloc的挣扎。
如果malloc仍然返回 0 ,则进行异常处理(绑定的回调)。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* We allocate memory in large chunks in order to avoid fragmenting     */
/* the malloc heap too much. */
/* We assume that size is properly aligned. */
/* We hold the allocation lock. */
template <bool __threads, int __inst>
char*
__default_alloc_template<__threads, __inst>::_S_chunk_alloc(size_t __size,
int& __nobjs)
{
char* __result;
size_t __total_bytes = __size * __nobjs;
size_t __bytes_left = _S_end_free - _S_start_free;

if (__bytes_left >= __total_bytes)
{
__result = _S_start_free;
_S_start_free += __total_bytes;
return(__result);
}
else if (__bytes_left >= __size)
{
__nobjs = (int)(__bytes_left / __size);
__total_bytes = __size * __nobjs;
__result = _S_start_free;
_S_start_free += __total_bytes;
return(__result);
}
else
{
size_t __bytes_to_get = 2 * __total_bytes + _S_round_up(_S_heap_size >> 4);
// Try to make use of the left-over piece.
if (__bytes_left > 0)
{
_Obj* __STL_VOLATILE* __my_free_list =
_S_free_list + _S_freelist_index(__bytes_left);

((_Obj*)_S_start_free) -> _M_free_list_link = *__my_free_list;
*__my_free_list = (_Obj*)_S_start_free;
}
_S_start_free = (char*)malloc(__bytes_to_get);
if (0 == _S_start_free)
{
size_t __i;
_Obj* __STL_VOLATILE* __my_free_list;
_Obj* __p;
// Try to make do with what we have. That can't
// hurt. We do not try smaller requests, since that tends
// to result in disaster on multi-process machines.
for (__i = __size; __i <= (size_t) _MAX_BYTES; __i += (size_t) _ALIGN)
{
__my_free_list = _S_free_list + _S_freelist_index(__i);
__p = *__my_free_list;
if (0 != __p)
{
*__my_free_list = __p -> _M_free_list_link;
_S_start_free = (char*)__p;
_S_end_free = _S_start_free + __i;
return(_S_chunk_alloc(__size, __nobjs));
// Any leftover piece will eventually make it to the
// right free list.
}
}
_S_end_free = 0; // In case of exception.
_S_start_free = (char*)malloc_alloc::allocate(__bytes_to_get);
// This should either throw an
// exception or remedy the situation. Thus we assume it
// succeeded.
}
_S_heap_size += __bytes_to_get;
_S_end_free = _S_start_free + __bytes_to_get;
return(_S_chunk_alloc(__size, __nobjs));
}
}

头插小内存碎片,图示

oom异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template <int __inst>
class __malloc_alloc_template
{
private:
#ifndef __STL_STATIC_TEMPLATE_MEMBER_BUG
static void (* __malloc_alloc_oom_handler)();
#endif
public:
static void* allocate(size_t __n)
{
void* __result = malloc(__n);
if (0 == __result)
{
__result = _S_oom_malloc(__n);
}
return __result;
}
// ...
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
template <int __inst>
void*
__malloc_alloc_template<__inst>::_S_oom_malloc(size_t __n)
{
void (* __my_malloc_handler)();
void* __result;

for (;;)
{
__my_malloc_handler = __malloc_alloc_oom_handler;
if (0 == __my_malloc_handler)
{
__THROW_BAD_ALLOC;
}

(*__my_malloc_handler)();

__result = malloc(__n);

if (__result)
{
return(__result);
}
}
}

_S_start_free_S_end_free_S_heap_size

这三个变量,只会在_S_chunk_alloc(size_t, int&)函数执行中改变。

1
2
3
4
5
6
7
8
template <bool __threads, int __inst>
char* __default_alloc_template<__threads, __inst>::_S_start_free = 0;

template <bool __threads, int __inst>
char* __default_alloc_template<__threads, __inst>::_S_end_free = 0;

template <bool __threads, int __inst>
size_t __default_alloc_template<__threads, __inst>::_S_heap_size = 0;

deallocate(void* p, size_t)

定义于stl_alloc.h

头插归还,看图示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template <bool threads, int inst>
class __default_alloc_template {
// ...
public:
// ...

/* __p may not be 0 */
static void deallocate(void* __p, size_t __n)
{
if (__n > (size_t) _MAX_BYTES)
{
malloc_alloc::deallocate(__p, __n);
}
else
{
_Obj* __STL_VOLATILE* __my_free_list =
_S_free_list + _S_freelist_index(__n);
_Obj* __q = (_Obj*)__p;

// acquire lock
# ifndef _NOTHREADS
/*REFERENCED*/
_Lock __lock_instance;
# endif /* _NOTHREADS */

__q -> _M_free_list_link = *__my_free_list;
*__my_free_list = __q;
// lock is released here
}
}

//...
};

reallocate(void* p, old_sz, new_sz)

对已开辟的内存池块的扩容、缩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template <bool threads, int inst>
void*
__default_alloc_template<threads, inst>::reallocate(void* __p,
size_t __old_sz,
size_t __new_sz)
{
void* __result;
size_t __copy_sz;

if (__old_sz > (size_t)_MAX_BYTES && __new_sz > (size_t)_MAX_BYTES)
{
return(realloc(__p, __new_sz));
}
if (_S_round_up(__old_sz) == _S_round_up(__new_sz))
{
return(__p);
}
__result = allocate(__new_sz);
__copy_sz = __new_sz > __old_sz ? __old_sz : __new_sz;
memcpy(__result, __p, __copy_sz);
deallocate(__p, __old_sz);
return(__result);
}

SGI STL内存池总结

SGI STL 二级空间配置器内存池的实现优点:

  1. 对于每一个字节数的chunk块分配,都是给出一部分进行使用,另一部分作为备用,这个备用可以给当前字节数使用,也可以给其它字节数使用
  2. 对于备用内存池划分完chunk块以后,如果还有剩余的很小的内存块,再次分配的时候,会把这些小的内存块再次分配出去,备用内存池使用的干干净净!防止小块内存频繁的分配,释放,造成内存很多的碎片出来,内存没有更多的连续的大内存块。所以应用对于小块内存的操作,一般都会使用内存池来进行管理。
  3. malloc内存分配失败,还会调用oom_malloc这么一个预先设置好的以后的回调函数,如果没设置,则throw bad_alloc。设置了则for(;;)(*oom_malloc_handler)();

Nginx内存池设计和实现

区分大小内存块的申请和释放,大于池尺寸的定义为大内存块,使用单独的大内存块链表保存,即时分配和释放;小于等于池尺寸的定义为小内存块,直接从预先分配的内存块中提取,不够就扩充池中的内存,在生命周期内对小块内存不做释放,直到最后统一销毁。

Nginx内存池结构图

Nginx源码

本次分析的是Nginx-release-1.13.1的源码。
src目录下,有好多模块,其中内存池的模块位于/src/core目录下。使用的是C语言。
对于不同的操作系统,有不同的实现,位于/src/os/unix/src/os/win32下。

指标

1
2
3
4
5
6
7
8
9
10
11
12
13
// 位于 ngx_palloc.h
/*
* NGX_MAX_ALLOC_FROM_POOL should be (ngx_pagesize - 1), i.e. 4095 on x86.
* On Windows NT it decreases a number of locked pages in a kernel.
*/
#define NGX_MAX_ALLOC_FROM_POOL (ngx_pagesize - 1)

#define NGX_DEFAULT_POOL_SIZE (16 * 1024)

#define NGX_POOL_ALIGNMENT 16
#define NGX_MIN_POOL_SIZE \
ngx_align((sizeof(ngx_pool_t) + 2 * sizeof(ngx_pool_large_t)), \
NGX_POOL_ALIGNMENT)
  1. NGX_MAX_ALLOC_FROM_POOL定义了可以从内存池中申请的最大内存。默认是Nginx页面大小减1。x86系统下是4096字节减1
  2. NGX_DEFAULT_POOL_SIZE定义了Nginx内存池默认大小。是16 * 1024B16KB
  3. NGX_POOL_ALIGNMENT,内存池,分配内存时的对齐大小。默认是16。
  4. NGX_MIN_POOL_SIZE定义了内存池的最小大小。
    1. 其需要通过ngx_align计算,定义如下:发现和STL的round_up一样,向上取 d 的整 a 倍数。比如,a等于16、d等于7的话,那d取整后就是16,d等于17的话,取整后就是32。
    2. 其中d是(sizeof(ngx_pool_t) + 2 * sizeof(ngx_pool_large_t))。a是NGX_POOL_ALIGNMENT,默认是16。
1
2
// 位于 ngx_config.h
#define ngx_align(d, a)     ( ( (d) + (a - 1) ) & ~(a - 1))

关键数据结构

定义于ngx_palloc.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typdef strcut
{
u_char *last; // 当前数据块中内存分配指针的当前位置
u_char *end; // 内存块的结束位置
ngx_pool_t *next; // 内存池由多块内存块组成,指向下一个数据块的位置
ngx_uint_t failed; // 当前数据块内存不足引起分配失败的次数
} ngx_pool_data_t;

struct ngx_pool_s
{
ngx_pool_data_t d; // 上面那个结构体:内存池当前的数据区指针的结构体

size_t max; // max 指的是 Nginx 一次分配小块内存大小 的最大值。
ngx_pool_t *current; // 当前正在使用的数据块的指针
ngx_chain_t *chain; // 把内存池链接起来
ngx_pool_large_t *large; // 指向大数据块的指针(大数据块是指size > max的数据)
ngx_pool_cleanup_t *cleanup; // 类似于析构函数,在 内存free 之前,对内存上的数据进行处理,比如释放指针对应的外部资源
ngx_log_t *log;
}; // in ngx_core.h: typedef struct ngx_pool_s ngx_pool_t;

ngx_core.h中,

1
typedef struct ngx_pool_s ngx_pool_t;

npx_pool_tstruct ngx_pool_s的别名。

在Nginx内存池中,npx_pool_t这个结构只出现在第一个内存池块的头部上,后续链接的内存池块头部只有ngx_pool_data_t

struct ngx_pool_s(别名ngx_pool_t)结构示意图(大小为1024的池)

npx_create_pool创建内存池

声明于ngx_palloc.h,定义于ngx_palloc.c

1
ngx_pool_t * ngx_create_pool(size_t size, ngx_log_t *log);

返回npx_pool_t *

开辟指定大小的内存池。根据不同系统、不同的对齐方法,调用不同的API。
一般是普通的malloc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ngx_pool_t * ngx_create_pool(size_t size, ngx_log_t *log)
{
ngx_pool_t *p;
// 如果不用内存对齐,则实际就是普通的 malloc
// 如果需要内存对齐,则调用posix_memalign(void ** p, size_t alignment, size_t size)
// 或 memalign(size_t alignment, size_t size)
p = ngx_memalign(NGX_POOL_ALIGNMENT, size, log);
if (p == NULL)
{
return NULL;
}
// 最后一个内存池大块,指向这个新开辟的内存池大块的头信息之后。
p->d.last = (u_char *) p + sizeof(ngx_pool_t);
// 内存池的结束位置,整个大块的尾部。
p->d.end = (u_char *) p + size;
p->d.next = NULL;
p->d.failed = 0;

// size 更新为 除去头信息大小的 实际存储数据的大小
size = size - sizeof(ngx_pool_t);
// 若 size 小于 MAX_ALLOC,则 max为size,size 大于 MAX_ALLOC 则 max 为 MAX_ALLOC
// max 指的是 Nginx 一次分配小块内存大小 的最大值。如果用户申请大于max,则按大块数据块处理。
p->max = (size < NGX_MAX_ALLOC_FROM_POOL) ? size : NGX_MAX_ALLOC_FROM_POOL;
// 指向自己,首地址(包含头信息)
p->current = p;
p->chain = NULL;
p->large = NULL;
p->cleanup = NULL;
p->log = log;

return p;
}

ngx_memalign(POOL_ALIGNMENT, size, log)ngx_alloc(size, log)

这是个宏定义,定义于/src/os/unix/src/os/win32下的ngx_alloc.h

在Unix下,有对齐的区别:如果要做内存对齐,则size_t alignment参数生效,否则忽略对齐参数

1
2
3
4
5
#if (NGX_HAVE_POSIX_MEMALIGN || NGX_HAVE_MEMALIGN)
void *ngx_memalign(size_t alignment, size_t size, ngx_log_t *log);
#else
#define ngx_memalign(alignment, size, log) ngx_alloc(size, log)
#endif

在Win32下,没有对齐限制。#define ngx_memalign(alignment, size, log)  ngx_alloc(size, log)

名字上,暂时一样。但是相应的ngx_alloc(size, log)函数,在两个操作系统上就是不同的实现了。
我们看Unix的:实际就是套了个普通malloc。然后输出了一些日志信息。
定义于/src/os/unix/ngx_alloc.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void * ngx_alloc(size_t size, ngx_log_t *log)
{
void *p;

p = malloc(size);
if (p == NULL)
{
ngx_log_error(NGX_LOG_EMERG, log, ngx_errno,
"malloc(%uz) failed", size);
}

ngx_log_debug2(NGX_LOG_DEBUG_ALLOC, log, 0, "malloc: %p:%uz", p, size);

return p;
}

从创建好的内存池中申请内存

定义于ngx_palloc.c
3个接口:ngx_pallocngx_pnallocngx_pcalloc

  1. ngx_pallocngx_pnalloc的区别在于在申请小块内存时,前者考虑对齐,后者不考虑对齐
  2. ngx_pcalloc是调用ngx_palloc,之后的额外操作是清零申请的区域

ngx_palloc(pool, size)

如果用户申请小于等于pool头信息中max大小的内存,则按小块管理。

1
2
3
4
5
6
7
8
9
10
11
void * ngx_palloc(ngx_pool_t *pool, size_t size)
{
#if !(NGX_DEBUG_PALLOC)
if (size <= pool->max)
{
// 第3个参数是标志位,1表示考虑对齐,0表示不考虑对齐。
return ngx_palloc_small(pool, size, 1);
}
#endif
return ngx_palloc_large(pool, size);
}
1
2
3
4
5
6
7
8
9
10
11
void * ngx_pnalloc(ngx_pool_t *pool, size_t size)
{
#if !(NGX_DEBUG_PALLOC)
if (size <= pool->max)
{
// 第3个参数是标志位,1表示考虑对齐,0表示不考虑对齐。
return ngx_palloc_small(pool, size, 0);
}
#endif
return ngx_palloc_large(pool, size);
}
1
2
3
4
5
6
7
8
9
10
void * ngx_pcalloc(ngx_pool_t *pool, size_t size)
{
void *p;
p = ngx_palloc(pool, size);
if (p)
{
ngx_memzero(p, size);
}
return p;
}

ngx_palloc_small(pool, size, align)

第3个参数是标志位,1表示考虑对齐,0表示不考虑对齐。

找current,即从该内存池大块之中分配内存。

ngx_align_ptr,将p->d.last指向的可用数据地址按NGX_ALIGNMENT对齐到指定数(默认是32)的倍数,比如如果地址是8,则舍弃一部分内存,对齐到32。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static ngx_inline void *
ngx_palloc_small(ngx_pool_t *pool, size_t size, ngx_uint_t align)
{
u_char *m;
ngx_pool_t *p;
// 从 pool 头信息找到当前可用的内存池大块
p = pool->current;

do {
// 当前数据块中可分配的内存的开始位置
m = p->d.last;

if (align)
{
m = ngx_align_ptr(m, NGX_ALIGNMENT);
}
// end 是当前数据块中可分配的内存的结束位置,与开始位置相减,如果大于等于用户申请的大小
// 则 直接返回 m ,即开始位置,把 last 位置 下移 size,表示分配了 size 大小
if ((size_t) (p->d.end - m) >= size)
{
p->d.last = m + size;

return m;
}
// 如果 目前的数据块 大小 不够 size,则遍历找下一个数据块。
p = p->d.next;

} while (p);
// 直到所有数据块都不够用,则
return ngx_palloc_block(pool, size);
}

ngx_align_ptr(p, a)内存对齐

定义于ngx_config.h

如果没有定义NGX_ALIGNMENT,则默认是sizeof(unsigned long),按照32位对齐。这是内存单元对齐。

注意要和ngx_palloc.h中定义的#define NGX_POOL_ALIGNMENT 16区分。那是内存池对齐。

即向定义的对齐大小(32)向上取整32倍。

操作是:传入的指针地址,加上指针大小32(4字节,32位)减1,和32减1取反后,相与。
比如:传入的指针地址是8:0000 1000(前面补0),则加32减1:0010 1000
0010 0000 - 1 = 0001 1111取反1110 0000(前面补1)。0010 10001110 0000相与后:0010 0000,由原先的8,对齐到了32。

1
2
3
4
5
6
#ifndef NGX_ALIGNMENT
#define NGX_ALIGNMENT   sizeof(unsigned long)    /* platform word */
#endif

#define ngx_align_ptr(p, a) \
(u_char *) (((uintptr_t) (p) + ((uintptr_t) a - 1)) & ~((uintptr_t) a - 1))

ngx_palloc_block(pool, size)再开辟一大块内存池

从pool的头块找到end - pool的大小,这是内存池每一个大块的大小(包含头信息的大小)

这是为了再开辟一大块内存池做准备。

ngx_memalign和第一次创建内存池一样,如果没有对齐要求,则普通malloc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void *
ngx_palloc_block(ngx_pool_t *pool, size_t size)
{
u_char *m;
size_t psize;
ngx_pool_t *p, *new;
// 一整个内存池大块(包含头信息)的大小。
psize = (size_t) (pool->d.end - (u_char *) pool);

m = ngx_memalign(NGX_POOL_ALIGNMENT, psize, pool->log);
if (m == NULL)
{
return NULL;
}

new = (ngx_pool_t *) m;
// psize是整个内存池块的大小。m是开始,则end指向这个新开辟的内存池块的末尾
new->d.end = m + psize;
// next暂时指向NULL。
new->d.next = NULL;
new->d.failed = 0;

// 存放普通内存池块的头信息
m += sizeof(ngx_pool_data_t);
// 内存对齐 m ,存储实际数据的开始地址
m = ngx_align_ptr(m, NGX_ALIGNMENT);
// 分配出去 size 大小的内存,last(下一个空闲内存的开始地址)更新到新位置
new->d.last = m + size;

// 从整个内存池的current到后续一长串,给每一个内存池块的failed计数加1。
// 如果发现当前这个内存池块的failed是4,则current指向这个failed次数过多内存池块的下一个内存池块。
for (p = pool->current; p->d.next; p = p->d.next)
{
if (p->d.failed++ > 4)
{
pool->current = p->d.next;
}
}
// 退出for 循环后,p 更新到了 最后一个内存池块。同时,当前pool的current指向的是第一个failed不为4的内存池块
// 尾插。链接起来新开辟的这个内存池块。
p->d.next = new;

return m;
}

ngx_palloc_large(pool, size)大块内存分配管理

用户申请比pool->max大的内存时,按大块内存分配管理。
ngx_alloc分配size大小的内存(和分配内存池块的方法一样)见ngx_memalign(POOL_ALIGNMENT, size, log)ngx_alloc(size, log)
遍历找large链的前5个,看是否有large的alloc为空的,直接让alloc指向一开始malloc得到的p。
如果连续找了5个,发现large的alloc都不为空,则跳出循环,不再找了。
这个for循环是为了快速在前5个large中,找到一个之前开辟的,但已经空闲了的大内存块头信息,其alloc管理的大内存块已经释放了,所以为NULL,直接让alloc指向一开始malloc得到的p即可。

如果没有进入循环,说明large一开始就是空的,内存池没有申请过大块内存;或者是找了5个发现large的alloc都不空:执行下面的操作(头插大内存块的头信息到pool的large链):

记录大内存块(large管理)的 头信息ngx_pool_large_t,是按照ngx_palloc_small方法,存放到了内存池块的小块内存中。
ngx_palloc_small返回空说明系统内存不够用了,失败,释放一开始malloc得到的大块内存,返回NULL。

如果正常,则在这个大块内存头信息填写:
alloc,即这个大块内存的地址,为一开始malloc得到的地址。
next,指向pool的large链头,pool的large指向这个新大块内存的头信息。相当于头插!只不过插的是大内存块的头信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static void *
ngx_palloc_large(ngx_pool_t *pool, size_t size)
{
void *p;
ngx_uint_t n;
ngx_pool_large_t *large;

p = ngx_alloc(size, pool->log);
if (p == NULL)
{
return NULL;
}

n = 0;

for (large = pool->large; large; large = large->next)
{
if (large->alloc == NULL)
{
large->alloc = p;
return p;
}

if (n++ > 3)
{
break;
}
}
// 记录 大块 内存(large管理)的 头信息,存放到内存池块管理的小块内存中。
// 返回空说明 系统 内存不够用了,失败。
large = ngx_palloc_small(pool, sizeof(ngx_pool_large_t), 1);
if (large == NULL)
{
ngx_free(p);
return NULL;
}

large->alloc = p;
large->next = pool->large;
pool->large = large;

return p;
}

图示:

如图,大块内存的头信息,是按小块内存管理,分配到了内存池块中。

下面要提到的外部资源所绑定的清理的头信息,也像是大块内存头信息一样,按小块内存管理,分配到了内存池块中。

ngx_pool_cleanup_add分配一个需要管理外部资源的数据(比如指针、fd)

按小块内存分配ngx_pool_cleanup_t,这是清理的头信息块。
有:handler、data、next。

之后才是分配size。

之后,像头插大内存块的头信息链表一样,头插清理类的头信息链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ngx_pool_cleanup_t * ngx_pool_cleanup_add(ngx_pool_t *p, size_t size)
{
ngx_pool_cleanup_t *c;

c = ngx_palloc(p, sizeof(ngx_pool_cleanup_t));
if (c == NULL)
{
return NULL;
}

if (size)
{
c->data = ngx_palloc(p, size);
if (c->data == NULL)
{
return NULL;
}
}
else
{
c->data = NULL;
}

c->handler = NULL;
c->next = p->cleanup;

p->cleanup = c;

ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, p->log, 0, "add cleanup: %p", c);

return c;
}

通过返回的ngx_pool_cleanup_t *绑定清理回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct Data
{
char * p;
// ...
};
struct Data *pData = ngx_alloc(512);
pData->p = (char*)malloc(128);
strcpy(pData->p, "hello world");

void my_release(void *p)
{
free(p);
}

ngx_pool_cleanup_t *pclean = ngx_pool_cleanup_add(pool, sizeof(char*));
pclean->handler = &my_release;
pclean->data = pData->p;

ngx_pfree(pool, void *p)大块内存释放

用于释放大块内存。先free,后把large头信息中的alloc置空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ngx_int_t
ngx_pfree(ngx_pool_t *pool, void *p)
{
ngx_pool_large_t *l;

for (l = pool->large; l; l = l->next)
{
if (p == l->alloc)
{
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
"free: %p", l->alloc);

ngx_free(l->alloc);
l->alloc = NULL;

return NGX_OK;
}
}
return NGX_DECLINED;
}

小块内存释放:无

Nginx的小块内存一旦分配了之后,就无法精确地回收。

不像SGI STL一样,假设A、B、C三个小内存单元相邻,A、C空闲,A是freelist的第一个空闲块,A的next是C。现在释放B,则B头插到freelist,现在是B连着A连着C。这样可以精确地释放,并下次还能重新分配(可以发现,由于是头插到了freelist的第一个空闲区域,所以最后释放的最先分配)。

Nginx呢,每个内存池块只是由lastend两个指针管理,只能指示当前内存池块的未分配的部分。

当已分配的部分中,有 1 个小块内存要释放,无法精确管理。所以Nginx只能连续地释放一整段空间,与last相连。

ngx_reset_pool(pool)

遍历large链表,释放每个大块内存。最后large置空。
遍历每个内存池块,把last拉到头信息的末尾即可,相当于释放了内存池的数据。最后current置为pool。
注意,只是释放了大块内存,所有内存池块都没有free,只是更新了last。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void ngx_reset_pool(ngx_pool_t *pool)
{
ngx_pool_t *p;
ngx_pool_large_t *l;

for (l = pool->large; l; l = l->next)
{
if (l->alloc)
{
ngx_free(l->alloc);
}
}

for (p = pool; p; p = p->d.next)
{
if (p == pool) // 第一个内存池,有pool头信息
{
p->d.last = (u_char*)p + sizeof(ngx_pool_t);
}
else // 其余的内存池,只有小头信息,没有pool大头
{
p->d.last = (u_char *) p + sizeof(ngx_pool_data_t);
}
p->d.failed = 0;
}

pool->current = pool;
pool->chain = NULL;
pool->large = NULL;
}

ngx_destroy_pool(pool)

  1. 遍历“清理”链表,每个都按照绑定的handler释放其data外部资源。
  2. 遍历large链表,释放每个大块内存。
  3. 遍历每个内存池块,释放每个内存池块。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void
ngx_destroy_pool(ngx_pool_t *pool)
{
ngx_pool_t *p, *n;
ngx_pool_large_t *l;
ngx_pool_cleanup_t *c;

for (c = pool->cleanup; c; c = c->next)
{
if (c->handler)
{
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
"run cleanup: %p", c);
c->handler(c->data);
}
}
#if (NGX_DEBUG)
/*
* we could allocate the pool->log from this pool
* so we cannot use this log while free()ing the pool
*/
for (l = pool->large; l; l = l->next)
{
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0, "free: %p", l->alloc);
}
for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next)
{
ngx_log_debug2(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
"free: %p, unused: %uz", p, p->d.end - p->d.last);
if (n == NULL)
{
break;
}
}
#endif
for (l = pool->large; l; l = l->next)
{
if (l->alloc)
{
ngx_free(l->alloc);
}
}
for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next)
{
ngx_free(p);
if (n == NULL)
{
break;
}
}
}

Nginx和STL释放内存的策略适用的场景

Nginx大块内存分配=》内存释放ngx_free函数
Nginx小块内存分配=》没有提供任何的内存释放函数。

实际上,从小块内存的分配方式来看(直接通过last指针偏移来分配内存),根本没法进行中间部分的小块内存的回收。

Nginx本质:HTTP服务器是一个短链接的服务器,客户端(浏览器)发起一个request请求,到达Nginx服务器以后,处理完成,Nginx给客户端返回一个response响应,HTTP服务器就主动断开tcp连接。
假设HTTP 1.1 keep-alive:60s,HTTP服务器(nginx)返回响应以后,需要等待60s,60s之内客户端又发来请求,重置这个时间;
否则60s之内没有客户端发来的响应,Nginx也是最终会主动断开连接,此时Nginx可以调用ngx_reset_pool重置内存池了,等待下一次客户端的请求。

因此,Nginx内存池的设计适用于间歇性、短连接的服务。虽然有内存泄漏,但效率高,空间换时间。

如果是长连接,且小块内存分配、释放较多,最好用STL二级空间配置器,避免内存泄漏过多。

设计并实现线程池_future原理

知识储备

并发和并行

  • 并发

单核上,CPU时间片轮转,多个线程轮流执行。物理上是串行执行的,但是由于每个线程占用的CPU时间片非常短(比如10ms),宏观上看就像是多个线程在共同执行,这样的场景称作并发(concurrent)。

  • 并行

在多核或者多CPU上,多个线程在同一时刻执行,这样的场景才是真正的并行(parallel)。

两种密集型程序

多线程程序一定好吗?不一定,要看具体的应用场景。

  • IO密集型:程序里面指令的执行,涉及IO操作较多,比如设备、文件、网络操作(等待客户端的连接),可能会把程序阻塞。如果CPU时间片再分配给这种线程,相当于浪费了CPU资源。
  • CPU密集型:程序里面的指令都是做计算用的,不会阻塞。

  • 多核情况下
    1. IO密集型和CPU密集型虽然对并行计算有不同的需求,IO密集型更适合设计成多线程程序。但是在多核情况下两种密集型程序都是有必要用多线程来处理的。
    2. 线程进行调度时,内核中有这样两个队列:runnable,就绪的或正在调度的队列。如果因IO操作有线程阻塞了,则将会进入阻塞队列,blocking queue。
  • 单核情况下
    1. IO密集型的程序依然适合设计为多线程程序。
    2. CPU密集型程序不再适合!这就相当于只有一个计算器,却让多个人分段算。线程的调度有额外的花费:线程的上下文切换。CPU寄存器信息会保存在线程栈上,下次还要再恢复到CPU中,实属麻烦。

线程的代价

为了完成任务,创建很多线程可以吗?线程越多越好吗?

  • 线程的创建和销毁都是非常“重”的操作,需要进入内核态。在执行任务的过程中,没有集中资源去干正事,而是去花费很大力度创建/销毁?
    • 需要给线程创建PCB(task_struct)、线程的内核栈、页目录、页表
    • 描述地址空间相应的数据结构:vm_area_struct
    • 内核创建完后,还要返回用户态
    • 线程执行完业务,还要销毁线程,又要切换一次
  • 线程栈本身占用大量内存,每一个线程都需要线程栈,栈几乎都被占用完了,还怎么做事情?
    • 32位地址空间,共4G,用户空间有3G。
    • 线程共享进程的地址空间。
    • 可在linux下执行ulimt -a命令观察stack size默认栈大小,为8192字节即8M。
    • 31024M=3072M,3072M/8M=3843*1024M=3072M, 3072M/8M=384个。这说明32位环境下,最多创建384个线程。
  • 线程的上下文切换要占用大量时间
    • 线程过多,线程的调度是需要进行上下文切换的,上下文切换花费CPU时间也特别多,CPU的利用率就不高了。
  • 大量线程同时唤醒会使系统经常出现锯齿状负载或者瞬间负载量很大导致宕机
    • 如果在某一时刻,大量的IO操作准备好了,那么一时间线程是来不及处理的。

线程同步

线程互斥

某段代码能不能多线程环境下执行?看这段代码是否存在竞态条件,即有无临界区代码段。(代码片段在多线程环境下执行,随着线程的调度顺序不同而得到不同的执行结果)。如果有,则要通过线程同步来保证它的原子操作。

如果在多线程环境下不存在竞态条件,则称之为可重入的

互斥锁

  • lock
  • try_lock
  • lock_guard
  • unique_lock

atomic原子类型

  • CAS操作(无锁机制)
    • 无锁队列、无锁链表、无锁数组
    • 实际上使用的是轻量级、效率高的锁,不是没用锁。

线程通信

GDB调试C++11多线程死锁

条件变量

信号量

看作资源计数没有限制的mutex互斥锁。mutex互斥锁的资源计数只能是0或者1。

区别

  • 二元信号量和互斥锁的区别
    • mutex只能是哪个线程获取锁,由哪个线程释放锁。
    • sem.wait()sem.post()则可以处在不同的线程中调用。

线程池

线程池的优势:

服务进程启动之初,事先创建好线程池里面的线程,当业务到来需要分配线程时直接从线程池中获取一个空闲线程执行task任务即可,task执行完成之后把线程归还到线程池中继续给后续task提供服务,而不用释放线程。

项目介绍

本项目所实现的线程池和对象池、内存池、STL库的意义一样,只能称作一个库,而不能作为一个独立运行的中间件,必须镶嵌在应用程序中。最终项目表现形式为一种提供给他人的动态库,比如用到了mysql.h头文件,libmysqlclient.so动态库。动态库需要编译出来。

使用方式

  • 启动线程池

如果你想在应用程序中或者代码中使用本项目的线程池,你可以

  1. 直接ThreadPool pool;定义一个pool对象;
  2. 而后则可以调用pool.sexMode(fixed(default) | cached);接口设置线程池的运行模式,默认为固定模式。
  3. 然后pool.start();启动线程池。start不会阻塞。

启动线程池意味着线程池开始创建若干线程,就绪,等待任务过来执行任务。

  • 提交任务

调用方只要按以下形式调用API即可:Result result = pool.submitTask(concreteTask);

调用方无需关心内部操作,包括线程分配、执行过程。

有时调用方需要获取任务执行的结果,可用T res = result.get().Cast<T>();获得任务结果。任务结果的返回值是任意类型,具体类型T由用户指出。(此处用到了C++17中的Any类型)

线程池的设计

类成员

首先说一说抽象出的类:线程池类、线程池中的线程类。

  1. 既然是线程池,就要有一个存放线程的容器
    1. 我们最好能实时监控线程池中线程的数量、以及上限阈值,避免线程数量走向极端从而影响性能(线程数量不是越多越好,坏处:1、线程栈空间冗余;2、上下文切换过程时间多于执行操作)
  2. 还要有一个存放待完成任务的容器,即任务队列
    1. 考不考虑线程安全问题?必须考虑,外层用户提交任务要放数据,下层线程执行任务要取数据。
    2. 任务不能堆积过多。对于任务队列,也要有一个上限阈值。

通用化的实现 - Task设计 - 继承多态思想

任务类型需要达到通用性,所以要用到继承、多态的思想。用基类指针可以指向各种各样的派生类对象。

所以需要设计一个抽象类Task。内部提供一个纯虚函数virtual void run() = 0。如果要设计特定的任务,则继承(实现)之,重写run函数即可。

线程通信的保证 - mtx+cv

因为涉及到放任务、取任务,所以很明显是个生产者消费者模型。

必然用到互斥锁+条件变量,从而对任务队列进行互斥保护、达到正确的线程通信。

线程池的模式

  • fixed模式
    • 线程池里面的线程个数是固定不变的,一般是ThreadPool创建时根据当前机器的CPU核心数量进行指定。
  • cached模式
    • 线程池里面的线程个数是可动态增长的,根据任务的数量动态地增加线程的数量,但是会设置一个线程数量的阈值。任务处理完成后,如果动态增长的线程空闲60s而没有其他任务待处理,那么就关闭线程,保持池中线程的最初数量。

代码形式

1
2
3
4
5
6
int main()
{
ThreadPool pool;
pool.setMode(fixed(default) | cached);
pool.start();
}

提交任务API

1
2
Result result = pool.submitTask(concreteTask);
result.get().Cast<T>();

线程池类(ThreadPool)

线程池,不光要管理线程,而且要集成一个任务队列。要对外提供提交任务的接口。

其中的线程去取任务,做,然后返回任务结果。(这是线程类里的事情)

管理的线程,用 vector 管理。
管理的任务,用 queue 管理。任务队列要保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
enum class PoolMode
{
MODE_FIXED, // 固定数量的线程
MODE_CACHED, // 线程数量可动态增长
};
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();
/* 设置线程池的工作模式 */
void setMode(PoolMode mode);
/* 设置初始的线程数量 */
void setInitThreadSize(int size);
/* 设置task队列任务数量最大阈值 */
void setTaskQueMaxThreshHold(int threshHold);
/* 启动线程池 */
void start();
/* 给线程池提交任务 */
void submitTask(std::shared_ptr<Task> task);
private:
PoolMode m_poolMode; //当前线程池工作模式
std::vector<Thread*> m_threads; //线程列表
int m_initThreadSize; //初始的线程数量
/**
* 任务队列容器。
* 特别要注意,需要用 shared_ptr 强引用用户传来的 task ,
* 以保证任务对象的生命期。
*/
std::queue<std::shared_ptr<Task>> m_taskQueue;
/* 目前任务队列中的任务数量 */
std::atomic_int m_taskNum;
/* 任务队列最大上限阈值 */
int m_taskQueMaxThreshHold;
/* 保护安全地操作任务队列 */
std::mutex m_taskQueMtx;
/* 表示任务队列不满 */
std::condition_variable m_taskQueNotFull;
/* 表示任务队列不空 */
std::condition_variable m_taskQueNotEmpty;
private:
ThreadPool(const ThreadPool &) = delete;
ThreadPool& operator=(const ThreadPool &) = delete;
};

ThreadPool构造、析构

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 线程池构造 */
ThreadPool::ThreadPool()
: m_initThreadSize(4),
m_taskNum(0),
m_taskQueMaxThreshHold(TASK_MAX_THRESHHOLD),
m_poolMode(PoolMode::MODE_FIXED)
{

}
ThreadPool::~ThreadPool()
{

}

设置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* 设置线程池的工作模式 */
void ThreadPool::setMode(PoolMode mode)
{
m_poolMode = mode;
}
/* 设置初始的线程数量 */
void ThreadPool::setInitThreadSize(int size)
{
m_initThreadSize = size;
}
/* 设置task队列任务数量最大阈值 */
void ThreadPool::setTaskQueMaxThreshHold(int threshHold)
{
m_taskQueMaxThreshHold = threshHold;
}

启动线程池

1
2
/* 启动线程池 */
void ThreadPool::start(int initThreadSize);

给线程池提交任务

1
2
/* 给线程池提交任务 */
void ThreadPool::submitTask(std::shared_ptr<Task> task);

Thread类

线程函数定义在哪个位置呢?

  • 思考:线程函数定义在哪个位置呢?
    • 如果写在Thread类中,那么定义在 ThreadPool 的变量则不容易被函数所访问。
    • 定义为全局函数呢?线程池里的变量都是私有的,也不易访问。
    • 结论:OOP的手法,写在ThreadPool中。
1
2
3
4
5
6
7
/* threadpool.h */
class ThreadPool
{
private:
/* 定义线程函数 */
void threadFunc();
};

怎么把线程函数扔给Thread对象

  • 线程对象是在线程池里构建的,线程启动执行也是在线程池里做的,
  • 那么创建 thread 线程对象时需要把线程函数给到 thread 线程对象。怎么把函数扔给对象?怎么解决这个技术问题?
    • 引入<functional>,用std::bind()把函数对象绑定。在线程池 start 时,构造 thread 时传入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ThreadPool::start(int initThreadSize)
{
m_initThreadSize = initThreadSize;
for(int i = 0; i < m_initThreadSize; ++i)
{
m_threads.emplace_back(
new Thread(std::bind(&ThreadPool::threadFunc, this))
);
}
for(int i = 0; i < m_initThreadSize; ++i)
{
m_threads[i]->start();
}
}

头文件

由上述 Thread 类构造时对函数对象的处理,可以得到 Thread 类的大致属性需求、构造参数。

1
2
3
4
5
6
7
8
9
10
11
class Thread
{
public:
/* 线程函数对象类型别名 */
using ThreadFunc = std::function<void()>;
Thread(ThreadFunc func);
~Thread();
void start();
private:
ThreadFunc m_func;
};

start 函数(创建线程后,分离线程)

  • start 函数 - 启动线程,创建一个线程来执行一个线程函数
    • 需要注意:出了start函数作用域之后线程对象会析构,但是线程函数不能消失,他还要去消费任务队列上的任务。所以线程对象需要设置为分离线程,否则程序会挂掉
    • 分离的效果就是:线程对象和它所启动的线程(实质的线程)分离开了。独立存在,互不关心对方的生命期。
1
2
3
4
5
6
7
8
/* threadpool.cpp */
#include<thread>
/* 启动线程 */
void Thread::start()
{
std::thread t(m_func); //线程对象t,线程函数m_func
t.detach();
}

简单测试

  • 简单测试,默认启动 4 个线程。启动后在不同的线程分别执行 threadFunc 函数。
    • 注意:创建的线程分离之后,执行完毕后会自动回收。但是可能存在主线程启动后看不到打印结果的情况,那是因为主线程结束地太快,导致没能看到(实际中的服务器主线程不会很快结束,而是保持)。为了看到执行结果,可以睡眠一段时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void ThreadPool::threadFunc()
{
std::cout << "begin threadFunc tid:"
<< std::this_thread::get_id() << std::endl;
std::cout << "end threadFunc"
<< std::this_thread::get_id() << std::endl;
}
/* 线程池项目测试.cpp */
#include"threadpool.h"
#include<iostream>
#include<chrono>
#include<thread>
int main()
{
ThreadPool pool;
pool.start();
std::this_thread::sleep_for(std::chrono::seconds(5));
}

智能指针解决避免手动释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void ThreadPool::start(int initThreadSize)
{
m_initThreadSize = initThreadSize;
for(int i = 0; i < m_initThreadSize; ++i)
{
auto ptr = std::make_unique<Thread>(
std::bind(&ThreadPool::threadFunc, this)
);
m_threads.emplace_back(ptr);
}
for(int i = 0; i < m_initThreadSize; ++i)
{
m_threads[i]->start();
}
}

但是这样会出现编译不过的问题,为什么呢?因为报错发现unique_ptr的拷贝构造已经删除,这是唯一性智能指针的语义决定的。而移动构造没有删除,意味可以用右值进行资源转移,所以我们需要在ptr前加std::move

1
m_threads.emplace_back(std::move(ptr));

submitTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* 给线程池提交任务 */
void ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
/* 获取锁 */
std::unique_lock<std::mutex> lock(m_taskQueMtx);
/* 线程的通信,等待任务队列有空余 */
//while(taskQue_.size() == taskQueMaxThreshHold_){ notFull_.wait(); }
m_taskQueNotFull.wait(
lock,
[&]()->bool {
return m_taskQueue.size() < m_taskQueMaxThreshHold;
}
);
/* 如果有空余,把任务放到任务队列中 */
m_taskQueue.emplace(sp);
++m_taskNum;
/* 因为新放了任务,任务队列肯定不空了,通知notEmpty_上的等待线程 */
m_taskQueNotEmpty.notify_all();
}

服务降级(wait_for(time)

为了性能更加优化,我们限制用户提交任务的最长阻塞时间是1s,如果提交任务超过了 1s 说明目前线程池的任务队列压力比较大,防止短时间内积压很多任务,则规定为提交任务失败,返回。称为服务降级

需要用到wait的两个延伸,wait_for(time)wait_until(endtime)。返回值为bool值,false表示到时间后条件依然没满足。

以下说的是:如果等了超过 1 秒,说明满的状态已经超过了 1 秒,wait_for返回 falseif 条件成立,提交失败。

1
2
3
4
5
6
7
8
9
10
if(!m_taskQueNotFull.wait_for(
lock,
std::chrono::seconds(1),
[&]()->bool {
return taskQue_.size() < (size_t)taskQueMaxThreshHold_;
}))
{
std::cerr<<"task queue is full, submit out of time failed."<<endl;
return;
}

ThreadFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void ThreadPool::threadFunc()
{
for(;;)
{
std::shared_ptr<Task> task;
/* 块作用域 */
{
/* 先获取锁 */
std::unique_lock<std::mutex> lock(m_taskQueMtx);
std::cout << "tid:" << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;
/* 等待notEmpty条件 */
m_taskQueNotEmpty.wait(
lock,
[&]()->bool {
return m_taskQueue.size() > 0;
}
);
std::cout << "tid:" << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
/* 从任务队列中取一个任务出来 */
task = m_taskQueue.front();
m_taskQueue.pop();
--m_taskNum;
/* 如果有剩余任务,继续通知其他线程来取任务 */
if (m_taskQueue.size() > 0)
{
m_taskQueNotEmpty.notify_all();
}
/* 取出了一个任务,任务队列此时肯定不满了,对等待提交任务的人进行通知 */
m_taskQueNotFull.notify_one();
}
/* 当前线程负责执行这个任务, 没必要拥有锁, 脱离块作用域, 释放锁 */
if(task != nullptr)
task->run();
}
}

测试1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MyTask : public Task
{
public:
void run()
{
std::cout << "tid:" << std::this_thread::get_id()
<< "begin!" << std::endl;

std::this_thread::sleep_for(std::chrono::seconds(2));

std::cout << "tid:" << std::this_thread::get_id()
<< "end!" << std::endl;
}
};
-------------------------------------------------------------------
test example:
/* 以下为提交3个任务,预估结果为3个线程获取任务成功,1个没有获取到,一直阻塞 */
int main()
{
ThreadPool pool;
pool.start(4);
pool.submitTask(std::make_shared<MyTask>());
pool.submitTask(std::make_shared<MyTask>());
pool.submitTask(std::make_shared<MyTask>());
getchar();
}
/* 若4个线程,提交5个任务,则最先完成并抢到锁的线程能再次获得第5个任务 */
/* 若4个线程,提交10个任务,并把队列容量设为4,则可能有2个任务提交失败 */

线程执行的返回值(future和packaged_task的实现原理)

比如,计算1到30000的和。线程1计算1到10000,2计算10001到20000,3计算20001到30000。
主线程给每一个线程分配计算的区间,并等待他们算完之后返回结果,合并最终的结果即可。
但是,怎么能拿到线程的返回值呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MyTask : public Task
{
public:
MyTask(int begin, int end)
: begin_(begin), end_(end)
{

}
void run()
{
std::cout << "tid:" << std::this_thread::get_id()
<< "begin!" << std::endl;
int sum = 0;
for(int i = begin_; i <= end_; ++i)
{
sum += i;
}

std::cout << "tid:" << std::this_thread::get_id()
<< "end!" << std::endl;

return sum;
}
};
  • 问题1:怎么设计run函数的返回值,可以表示任意的类型?
  • 问题2:如何设计这里的Result机制?
1
2
3
4
5
6
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 10000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(10001, 20000));
Result res3 = pool.submitTask(std::make_shared<MyTask>(20001, 30000));
res1.get();
res2.get();
res3.get();

Any类型 - 按需返回具体类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Any
{
public:
Any() = default;
~Any() = default;
Any(const Any&) = delete;
Any& operator=(const Any&) = delete;
Any(Any &&) = default;
Any& operator=(Any &&) = default;

/* 这个构造函数可以让 Any 类型接收任意其他的数据 */
template<typename T>
Any(T data) : base_(std::make_unique<Derived<T>>(data))
{}
/* 这个方法能把 Any 对象里面存储的 data 数据提取出来 */
template<typename T>
T cast_()
{
// base_ 是定义在 Any 类中的一个 std::unique_ptr<Base>
// get() 可以获得 其实际指针
Derive<T>* pd = dynamic_cast<Derive<T>*>(base_.get());
if(pd == nullptr)
{
throw "type is unmatch!";
}
return pd->data_;
}
private:
class Base
{
public:
virtual ~Base() = default;
};

template<typename T>
class Derived : public Base
{
public:
Derived(T data) : data_(data)
{}
private:
// 实际的 data 存在 Derived 类中。
T data_;
};

private:
/* 定义一个基类的指针 */
std::unique_ptr<Base> base_;
};
int main()
{
ThreadPool pool;
pool.start(4);

Result res = pool.submitTask(std::make_shared<MyTask>(1, 10000));
int sum = res.get().cast_<int>();
}

如此一来:
res调用get,等待 task 执行完毕,get()就能返回一个Any。这个 Any 的模板参数指明了里面存放的数据是 int 型, Any 对象调用其 cast_, 取出了里面存放的数据。(通过dynamic_cast<Derive<int>*>)如果类型不匹配,则抛出异常。

自实现信号量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Semaphore
{
public:
Semaphore(int limit = 0)
: resLimit_(limit)
{
}
~Semaphore() = default;
/* 获取一个信号量资源 */
void wait()
{
std::unique_lock<std::mutex> lock(mtx_);
/* 等待信号量有资源,没有资源的话,阻塞当前线程 */
cond_.wait(lock, [&]()->bool {return resLimit_ > 0;});
--resLimit_;
}
/* 增加一个信号量资源 */
void post()
{
std::unique_lock<std::mutex> lock(mtx_);
++resLimit_;
cond_.notify_one();
}
private:
// 信号量目前拥有的资源数
int resLimit_;
std::mutex mtx_;
std::condition_variable cond_;
};

(⭐)Result设计

先分析一下局势:

  1. Task里面肯定有结果
  2. 外部有一个Result要接收结果
  3. Task里面的结果怎么巧妙地转移到Result上面?
  4. 其实Task不用专门有一个Any成员变量保存。
  5. 可以直接在Task中保存一个Result指针。
  6. 外部Result也绑定一个Task作为成员对象。为了让Task延长生命期,Result需要用shared_ptr<Task>构造
  7. Result构造函数中,只需要执行:task_->setResult(this);,便可以移花接木,把Task成员变量result_指向外部的Result。
  8. 这样,Task内部,run完之后,返回了Any,Task便可以主动调用result_->setVal(run())。把结果写回外部的Result内部。

submitTask接口返回类型需要让用户能接收到线程任务的返回值,并且要求可以是任意类型,所以改为Result。相应的,我们需要设计这样的Result类型。

  • 思考,return时用下面哪种方式?
    • task->getResult();还是Result(task);
    • 要执行的task从队列中taskQue_.pop(),接着调用完毕后Task就会析构(注意,submitTask传入的是shared_ptr,引用计数减1,如果此时没有其他人引用该Task,将会析构,里面的any存储的结果就失效了),即task生命期现在只存在于threadFunc函数中。
    • 如果是task->getResult();
      • 若task中的Result是以值形式存的,则肯定不行,因为task析构之后,Result也会析构。
      • 若task中的Result是以指针形式存的,则必须指定到一个外部资源保存。比如存到堆上。
        • 如果是在堆上保存,还必须提供一个getResult接口,返回Result的指针,即result_成员变量。这样,可以做到把Result安全地保存下来。
    • 但是,可以以更为巧妙的方式!Result(task);
      • Result绑定了这个task(用shared智能指针管理,让task对象的生命期延至和Result对等),在Result的构造函数中,调用task->setResult(this);!!!居然巧妙地把“外部资源地址”指向了外部待接收结果的Result自己!避免了堆上建立的烦恼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ThreadPool
{
/* ... */
public:
Result submitTask(std::shared_ptr<Task> sp);
/* ... */
};
--------------------------
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
/* ... */
if(...)
{
...
/* 思考,return时用下面哪种方式? */
/* return task->getResult(); */
// 此时是提交任务失败的返回
return Result(task);
}

/* ... */

/* 思考,return时用下面哪种方式? */
/* return task->getResult(); */
// 此时是提交任务成功后的返回
return Result(task);
}

经过上面的讨论,Result类成员里需要绑定一个Task对象。即下面的task_。下面是Result成员和其构造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 实现接收提交到线程池的task任务执行完成后的返回值类型Result */
class Result
{
public:
Result(std::shared_ptr<Task> task, bool isValid = true);
~Result() = default;
private:
/* 存储任务的返回值 */
Any any_;
/* 线程通信信号量 */
Semaphore sem_;
/* 指向对应获取返回值的任务对象 */
std::shared_ptr<Task> task_;
/* 任务执行是否有效/成功 */
std::atomic_bool isValid_;
}
------------------
//threadpool.cpp
Result::Result(std::shared_ptr<Task> task, bool isValid)
: isValid_(isValid), task_(task)
{
}

成员函数

  • 问题1:setVal函数,获取任务执行完的返回值,记录在any成员。
  • 问题2:get函数,用户调用这个方法获取task的返回值(没执行完需要阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Result
{
public:
void setVal(Any any);
Any get();
private:
Any any_;
Semaphore sem_;
std::shared_ptr<Task> task_;
std::atomic_bool isValid_;
};
-------------
/* 用户调用的 */
Any Result::get()
{
if(!isValid_)
{
return "";
}
/* 阻塞用户的线程直到task通知其执行完毕 */
sem_.wait();
return std::move(any_);
}
/* 谁调用?答案是 Task 的 run 结束之后,通过Result * result_ 间接调用次函数,通知 get() 唤醒拿结果 */
void Result::setVal(Any any)
{
/* 存储task的返回值 */
this->any_ = std::move(any);
sem_.post();
}

Task增加方法,把结果写入Result

思想:在threadFun函数中,不仅要靠task对象Task类中的run方法执行具体哪种任务,还要把任务的返回值给到result,可以用exec来封装,exec没有多态,run有多态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void Task::exec()
{
if(result_ != nullptr)
{
// run()结束之后,setVal才能被调用成功,此时影响到了 result_ 中保存的信号量 +1,因此 外部的result.get()不再阻塞
result_->setVal(run()); //如此可知,task中需要封装一个result对象。
}
}
----------------
class Task
{
public:
Task();
~Task() = default;
void exec();
void setResult(Result *res);
virtual Any run() = 0;
private:
/* 没必要用shared智能指针,否则就会出现:Result中有shared_ptr<Task>,Task中有shared_ptr<Result>。循环引用,无法释放!*/
Result *result_; //result对象的生命周期长于task,不怕。
};
----------------
void Task::setResult(Result *res)
{
result_ = res;
}
Task::Task()
: result_(nullptr)
{
}
----------------
/* 谁调用setResult?此 Result 不是 Result 里的 val , 而是保存 Val 的一个地方,不要搞混了 */
// 外部可以通过 task 来 指定 Result 要保存到哪个地方。
// 比如可以传入 外部 Result 的 this 指针;也可以 new 一个 Result 在 堆上,set 到堆上地址
// 如果 Result 绑定了一个 task 就可以 通过这个 task 设置 Result 为 自己(this)
Result::Result(std::std::shared_ptr<Task> task, bool isValid)
: isValid_(isValid), task_(task)
{
task_->setResult(this);
}

测试2(Master - Slave模型)

Master - Slave模型,Master线程用来分解任务,然后给各个Slave线程分配任务,等待各个Slave线程执行完任务,返回结果。最后Master线程合并各个任务结果,输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class MyTask : public Task
{
public:
MyTask(int begin, int end)
: begin_(begin), end_(end)
{
}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id()
<< "begin!" << std::endl;
uLong sum = 0;
for(uLong i = begin_; i <= end_; ++i)
sum += i;
std::cout << "tid:" << std::this_thread::get_id()
<< "end!" << std::endl;
return sum;
}
};
using uLong = unsigned long long;
int main()
{
ThreadPool pool;
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));
Result res2 = pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));
Result res3 = pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));
uLong sum1 = res1.get().cast_<uLong>();
uLong sum2 = res2.get().cast_<uLong>();
uLong sum3 = res3.get().cast_<uLong>();
cout << sum1 + sum2 + sum3 << endl;
}

测试结果:

image-20220407095057418

cached模式线程池

主要的使用点:submitTask函数中,可能需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程。

  • 需要处理的问题
    • 问题1,用户自己如何设置线程池的工作模式
    • 问题2,submitTask函数中,根据任务数量和空闲线程的数量,判断是否需要创建新的线程
    • 问题3,threadFunc函数中,有可能已经创建了很多的线程,如果空闲时间超过60s,需要结束、回收。
1
2
3
4
5
6
7
8
int main()
{
ThreadPool pool;
/* 用户自己设置线程池的工作模式 */
pool.setMode(PoolMode::MODE_CACHED);
/* 开始启动线程池 */
pool.start(4);
}

切换工作模式

为了防止用户在线程池启动后再去切换线程池的工作模式,我们需要给线程池添加一个状态变量,以控制用户能否对线程池的工作模式进行切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ThreadPool
{
public:
/* ... */
private:
/* ... */
/* 检查pool的运行状态 */
bool checkRunningState() const;
private:
/* ... */

/* 表示当前线程池的启动 */
std::atomic_bool isPoolRunning_;
};
-------------------------------------------
ThreadPool::ThreadPool()
: initThreadSize_(0),
taskSize_(0),
taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD),
poolMode_(PoolMode::MODE_FIXED),
isPoolRunning_(false)
{
}
void ThreadPool::start(int initThreadSize)
{
/* 设置线程池的状态为运行态 */
isPoolRunning_ = true;

/* ... */
}
bool ThreadPool::checkRunningState() const
{
return isPoolRunning_;
}
------------------------------------------
void ThreadPool::setMode(PoolMode mode)
{
if(checkRunningState())
return;
poolMode_ = mode;
}

创建更多线程

cached模式:任务处理比较紧急,场景是小而快的任务。

ThreadPool需要添加记录一个空闲线程数量的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class ThreadPool
{
public:
/* ... */
private:
/* ... */
private:
/* ... */

/* 记录空闲线程的数量 */
std::atomic_int idleThreadSize_;
}
-------------------------------------------
ThreadPool::ThreadPool()
: initThreadSize_(0),
taskSize_(0),
idleThreadSize_(0),
taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD),
poolMode_(PoolMode::MODE_FIXED),
isPoolRunning_(false)
{
}
/* 空闲线程数量需要加1 */
void ThreadPool::start(int initThreadSize)
{
/* ... */
/* 每启动一个线程就给idleThreadSize_加1,表示多了一个空闲线程 */
for(int i = 0; i < initThreadSize_; ++i)
{
threads_[i]->start();
++idleThreadSize_;
}
}
/* 空闲线程数量需要减1 */
void ThreadPool::threadFunc()
{
for(;;)
{
/* ... */
std::shared_ptr<Task> task;
{
notEmpty_.wait(lock, [&]()->bool {return taskQue_.size()>0;});
--idleThreadSize_;
}
/* ... */
notFull_.notify_all();
}
if(task != nullptr) task -> exec();
++idleThreadSize_;
}

现在来增加 submitTask 函数对 cached 模式处理的细节。
有一点要注意,就是尽管任务非常多,但是我们要对线程的数量设一定的上限值。
即需要给 ThreadPool 类增加一个线程数量阈值变量。

然后为了比较线程池当前线程的数量状况,也要添加一个记录线程总数量的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
const int THREAD_MAX_THRESHHOLD = 10;
class ThreadPool
{
public:
/* ... */
/* 设置线程池cached模式下的线程阈值 */
void setThreadSizeThreshHold(int threshhold);
/* ... */
private:
/* ... */
private:
/* ... */
/* 线程数量上限阈值 */
int threadSizeThreshHold_;
/* 记录当前线程池里面线程的总数量 */
std::atomic_int curThreadSize_;
/* ... */
};
ThreadPool::ThreadPool()
: initThreadSize_(0),
taskSize_(0),
idleThreadSize_(0),
curThreadSize_(0),
taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD),
threadSizeThreshHold_(THREAD_MAX_THRESHHOLD),
poolMode_(PoolMode::MODE_FIXED),
isPoolRunning_(false)
{
}
void ThreadPool::start(int initThreadSize)
{
/* ... */
/* 记录初始线程个数 */
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;
/* 创建线程对象 */
/* ... */
/* 启动所有线程 */
/* ... */
}
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{
if(checkRunningState() || poolMode_==PoolMode::MODE_FIXED)
return;
threadSizeThreshHold_ = threshhold;
}
-------------------------------------------------------
/* 关注三个状态:
* 1.线程池的工作模式是否是cached;
* 2.任务数量是否已经大于空闲线程数量;
* 3.线程总数量是否没有超过线程数量上限;
*/
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
/* ... */
if(poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
/* 创建新线程 */
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this));
threads_.emplace_back(std::move(ptr));
threads_[threadId]->start(); //启动线程
/* 修改线程个数相关的变量 */
++curThreadSize_;
++idleThreadSize_;
}

return Result(sp);
}

任务处理完毕,回收多余线程

超过initThreadSize_数量的线程要进行回收。

当前时间 比 上一次线程执行完毕的时间 大于 60s 后回收。

C++11中提供了高精度时间API - std::chrono::high_resolution_clock().now();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
const int THREAD_IDLE_TIME = 60;	//单位s
void ThreadPool::threadFunc(int threadid)
{
/* 上一次线程执行完任务的时间 */
auto lastTime = std::chrono::high_resolution_clock().now();
for(;;)
{
Task task;
{
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid: " << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;

// cached模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程
// 结束回收掉(超过initThreadSize_数量的线程要进行回收)
// 当前时间 - 上一次线程执行的时间 > 60s

// 每一秒中返回一次 怎么区分:超时返回?还是有任务待执行返回
// 锁 + 双重判断
while (taskQueue_.size() == 0)
{
// 线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid: " << std::this_thread::get_id() << " exit!"
<< std::endl;
exitCond_.notify_all();
return;
}

if (poolMode_ == PoolMode::MODE_CACHED)
{
//while(taskQue_.size() > 0) // 不再去判断 taskQue_.size() > 0,即使size是0,也一直等待60s
//{

/* 超时 1s 返回 */
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now(); //返回的是time_point类型
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if(dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
/* 开始回收当前线程 */
/* 修改记录线程数量的相关变量 */
/* 把线程从线程列表容器中删除 */
/* 问题:怎么知道线程函数对应的是线程列表容器中的哪一个线程对象 */
/* 我们需要有一个映射关系来记录:threadid => thread对象 => 删除 */
thread_.erase(threadid);
--curThreadSize_;
--idleThreadSize_;
std::cout << "threadid: " << std::this_thread::get_id() << " exit!"
<< std::endl;
return;
}
}
//} // while(taskQue_.size() > 0) 结束
}
else // poolMode_ != PoolMode::MODE_CACHED
{
notEmpty_.wait(lock /* , [&]()->bool { return taskQue_.size() > 0; } */);
}
} // end while (taskQueue_.size() == 0)

// 此时 taskQueue_.size() != 0 而 我们又拿到了 任务队列的锁,可以直接取走任务
--idleThreadSize_;
std::cout << "tid: " << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
task = taskQue_.front();
taskQue_.pop();
--taskSize_;

if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}

notFull_.notify_all();
} // end std::unique_lock<std::mutex> lock(taskQueMtx_);
if(task != nullptr)
{
task();
}
++idleThreadSize_;
/* 线程执行完任务后更新lastTime */
lastTime = std::chrono::high_resolution_clock().now();
}
}

问题:怎么知道线程函数对应的是线程列表容器中的哪一个线程对象?

我们需要有一个映射关系来记录:threadid - thread对象

1
std::unordered_map<int, std::unique_ptr<Thread>> threads_;

所以,Thread对象需要封装一个id值。
然后,在线程函数threadFunc传入int threadid参数。
这样,该 id 的Thread在线程函数中计时自己的空闲时间,若连续 wait 了 60s(每wait一次1s超时返回),说明任务过少,空闲线程过多。
只有知道了 id 号,才能让 ThreadPool 记录的 线程map 精确地删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Thread
{
public:
using ThreadFunc = std::function<void()>;
Thread(ThreadFunc func);
~Thread();
void start();
/* 启动线程 */
int getId() const;
private:
ThreadFunc func_;
// 这是属于 Thread类 共享的计数器。表示曾经已生成了多少个线程对象
static int generateId;
// 当前Thread 对象的ID
int threadId_;
}
----------------------------------------------
int Thread::generateId_ = 0;
Thread::Thread(ThreadFunc func)
: func_(func), threadId(++generateId_)
{

}
int Thread::getId() const
{
return threadId_;
}

然后,最大的变化来了,把原先的线程列表的vector容器变成了unordered_map。

1
2
3
4
5
6
7
class ThreadPool
{
// ...
private:
    std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表
// ...
};

因此,在 start 启动线程池的操作中,生成线程的动作就要有所变化

要预留一个占位符_1,这是给线程函数的传入参数的位置,threadid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 开启线程池
void ThreadPool::start(int initThreadSize = std::thread::hardware_concurrency())
{
// 设置线程池的运行状态
isPoolRunning_ = true;

// 记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;

// 创建线程对象
for (int i = 0; i < initThreadSize_; i++)
{
// 创建thread线程对象的时候,把线程函数给到thread线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// threads_.emplace_back(std::move(ptr));
}

// 启动所有线程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start(); // 需要去执行一个线程函数
idleThreadSize_++; // 记录初始空闲线程的数量
}
}

改为无序哈希表存储id、Thread对象映射关系,后的代码调整

submitTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
/* ... */
if(poolMode == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
auto ptr = std::make_unique<Thead>(std::bind(&ThreadPool::threadFunc, this));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();
/* 修改线程个数相关的变量 */
++curThreadSize_;
++idleThreadSize_;
}
return Result(sp);
}

ThreadPool的threadFunc(int threadid)接口添加int参数

1
2
3
4
5
6
7
8
9
10
11
class ThreadPool
{
public:
...
private:
/* 定义线程函数,加了一个参数threadid */
void threadFunc(int threadid);
...
private:
...
}

Thread中using ThreadFuncstd::function<void(int)>

1
2
3
4
5
6
class Thread
{
public:
/* 要和threadFunc函数参数一致 */
using ThreadFunc = std::function<void(int)>;
}

Thread的start,多传入一个threadId_

1
2
3
4
5
void Thread::Start()
{
std::thread t(func_, threadId_);
t.detach();
}

测试3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class MyTask : public Task
{
public:
MyTask(int begin, int end)
: begin_(begin), end_(end)
{
}
Any run()
{
std::cout << "tid:" << std::this_thread::get_id()
<< "begin!" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
uLong sum = 0;
for(uLong i = begin_; i <= end_; ++i)
sum += i;
std::cout << "tid:" << std::this_thread::get_id()
<< "end!" << std::endl;
return sum;
}
};
---------------------------------------------------------------------------------------
/* 在submitTask函数中 加打印线程的相关信息 */
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{
/* ... */
if(poolMode == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << ">>> create new thread ..." << std::endl;
auto ptr = std::make_unique<Thead>(
std::bind(&ThreadPool::threadFunc,
this, std::placeholders::_1)
);
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start(); //启动线程
/* 修改线程个数相关的变量 */
++curThreadSize_;
++idleThreadSize_;
}
return Result(sp);
}
--------------------------------------------------------------------------------------
/* 暂时修改超时时间,方便测试观察 */
const int THREAD_MAX_IDLE_TIME = 10;
void ThreadPool::threadFunc()
{
/* ... */
if(dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
...
}
/* ... */
}
--------------------------------------------------------------------------------------
/* 默认初始四线程,六任务。
* 每个任务都至少消耗3秒,
* 开始时会把四个线程都占住,会创建新的线程来完成后两个任务
* 全部任务完成后,超过10秒后,会把多余的2个线程回收。
*/
const int TASK_MAX_THRESHHOLD = INT32_MAX;
const int THREAD_MAX_THRESHHOLD = 10;
int main()
{
{
ThreadPool pool;
pool.setMode(PoolMode::MODE_CACHED);
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1,100000000)); //1
Result res2 = pool.submitTask(std::make_shared<MyTask>(100000001,200000000)); //2
Result res3 = pool.submitTask(std::make_shared<MyTask>(200000001,300000000)); //3
pool.submitTask(std::make_shared<MyTask>(200000001,300000000)); //4

pool.submitTask(std::make_shared<MyTask>(200000001,300000000)); //5
pool.submitTask(std::make_shared<MyTask>(200000001,300000000)); //6
uLong sum1 = res1.get().cast_<uLong>();
uLong sum2 = res2.get().cast_<uLong>();
uLong sum3 = res3.get().cast_<uLong>();
}
getchar();
}

测试结果

image-20220407172417250

image-20220407172451064

image-20220407172642102

问题:ThreadPool对象析构以后,怎么样把线程池相关的线程资源全部回收

在析构函数中,用户线程需要等待线程池线程,这是两类不同的线程,需要通过线程间通信来达到等待完成。

线程间通信可以用信号量、条件变量,都可以,我们在ThreadPool类中使用一个条件变量exitCond_

1
2
3
4
5
6
7
8
9
10
11
12
class ThreadPool
{
public:
...
private:
...
private:
...
/* 等待线程资源全部回收 */
std::condition_variable exitCond_;
...
}

在析构函数中,

  1. isPoolRunning_为false,如此线程函数进去之后发现线程池要关闭了,
    1. 如果任务队列为0,则就直接退出。
    2. 如果任务队列不为0则取任务。
  2. notEmpty_.notify_all();,最后一波唤醒,让所有线程醒来,看有没有任务,没任务就收工。
  3. ThreadPool关注exitCond_,若有人通知了,则说明是线程陆陆续续在退出了,直到threads_.size() == 0,说明所有线程都退出了。这时候,线程池就可以放心析构了。
1
2
3
4
5
6
7
8
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
notEmpty_.notify_all();
/* 等待线程池中所有的线程返回,有两种状态:阻塞/执行中 */
std::unique_lock<std::mutex> lock(taskQueMtx_);
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0;});
}

项目推进时遇到了什么问题?

  1. 实现核心功能时的问题:如何通用地获取提交任务后的返回值,即Any,Result的设计。
  2. 设计线程池资源回收的,是以…的方式实现的,测试时,有时会出现死锁的现象

调试方法

gdb调试,attach到正在死锁的进程,把线程栈打印出来,在哪一个函数的哪一句话不动了。

问题要素:
1、线程池要结束,要释放整个池子的资源了。
2、线程池的成员isPoolRunning_状态置为了false

以上两个要素

1,不受线程池内的线程目前的状态而影响,线程的状态:在等待任务、在执行任务。
2,isPoolRunning_必将影响线程的代码路径。

isPoolRunning_在线程池start时置为true

我们设计的ThreadPool不用考虑成员的析构问题,最主要的两个成员容器:
1、无序map中的线程对象是unique_ptr管理的。
2、queue中的任务队列是shared_ptr管理的。
3、其余的变量都是非指针变量。

ThreadPool析构时仅仅只是把isPoolRunning_false就可以了吗?
当然不可以。线程池要等待线程池里面所有的线程返回。
目前线程可能在threadFunc函数中阻塞在notEmpty上,另一种是正在执行任务中。

此时就需要不同线程的通信。即用户线程和线程池中的线程之间进行通信。
用信号量、条件变量都可以。
我们用条件变量。
在ThreadPool中定义一个成员,std::condition_variable exitCond_; 等待线程资源全部回收

1
2
3
4
5
6
7
8
9
10
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
/* 我们暂且在这个位置 唤醒notEmpty_上的所有线程 */ //实际上,是留了一个坑
notEmpty_.notify_all();

std::unique_lock<std::mutex> lock(taskQueMtx_);
/* 表明:要等待到threads_.size()等于0 */
exitCond_.wait(lock, [&]()->bool {return threads_.size()==0;});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//可能会产生死锁的情况:
int main()
{
ThreadPool pool;
pool.start(4);
Result res1 = pool.submitTask(std::make_shared<MyTask>(1,10000000));
uLong sum1 = res1.get().cast_<uLong>();
cout << sum1 << endl;
cout << "main over!" << endl;
}
/* 运行结果
* tid: 9624尝试获取任务..
* tid: 9624获取任务成功...
* tid:tid: 1664尝试获取任务...
* 9624begin!
* tid: 15228尝试获取任务...
* tid:6796尝试获取任务...
* tid:9624end!
* tid:9624尝试获取任务...5000000050000000
* main over!
*
* threadid:6796 exit!
* threadid:s1664 exit!
* threadid:15228 exit!
* //发现,抢到任务的线程回不来了。
*/

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/* main函数所在的主线程,即用户创建线程池的线程 */
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
notEmpty_.notify_all();
std::unique_lock<std::mutex> lock(taskQueMtx_);
exitCond_.wait(lock, [&]()->bool {return threads_.size()==0;});
}
/* 线程池中的每个线程的线程函数 */
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = ...;
while(isPoolRunning_)
{
std::shared_ptr<Task> task;
{
std::unique_lock<std::mutex> lock(taskQueMtx_); //抢锁动作
std::cout << "tid: " << std::this_thread::get_id()
<< "尝试获取任务... " << std::endl;
while(taskQue_.size() == 0)
{
if(poolMode_ == PoolMode::MODE_CACHED)
{
/* timeout==wait_for的返回值 意味着等待超时了 */
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
//回收长时间空闲的线程
return;
}
/* 否则,说明被唤醒了,有任务了,且抢到锁了,继续往下 */
}
else /* 若不是cached模式,则按部就班老实等待 */
{
notEmpty_.wait(lock);
}
}
if(!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id()
<< " exit:" << std::endl;
/* 每一个线程删除后,都通知exitCond_一下,唤醒用户主线程 */
exitCond_.notify_all();
return;
}
}
/* 若被唤醒了,有任务了,且抢到锁了,而且确认了isPoolRunning,则可以取得任务 */
--idleThreadSize_;
std::cout << "tid:" << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
}
}

第一种情况:线程是固定模式,任务队列空时,等待在notEmpty_上。这种情况我们不怕,因为线程池析构函数已经写了notEmpty_.notify_all();

第二种情况:task->exec()中,即线程执行任务中。这种也没事,线程执行完任务再次进入while循环判断时发现isPoolRunning_为false了,则删除线程,退出。

第三种情况才是关键,线程task->exec() 执行完后进入了while (isPoolRunning_)循环,到了获取锁语句之前的位置。这时,线程池关闭了,也就是说在Running状态切换为false!现在,线程池、子线程的下一个动作都是对taskQueMtx_进行加锁!而最关键的,如果能够“阴差阳错地”进入第二个while循环,那么就会在notEmpty_上等死,因为此时线程池在语义上是已经关闭了,没人再去唤醒它。

第三种情况中的第一种情况:线程池抢到锁,又在exitCond_wait阻塞,而子线程虽然能抢到锁,但是会死在notEmpty_上。

第三种情况中的第二种情况:子线程抢到锁,往下执行,则没有任务时,在notEmpty_上等待会放弃锁,阻塞自己,线程池之后会抢到锁,等待线程结束。这时没有人去唤醒notEmpty_上的子线程了。死锁。这是问题之关键

那么,第三种情况怎么解决死锁问题呢?

我们注意到,第三种情况中第一种情况的问题是因为线程池exitCond_.wait()后,进入了第一个while后争抢lock的线程可能拿到锁后,顺理成章进入第二个while中,若恰逢没有任务,则死在了notEmpty_上。所以为了防止错误的时机进入到第二个while循环中,在条件处首先再判断一次isPoolRunning_

3.1的解决方案:锁+双重判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void ThreadPool::threadFunc(int threadid)
{
auto lastTime = ...;
while(isPoolRunning_)
{
std::shared_ptr<Task> task;
{
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << ... << std::endl;
while(isPoolRunning_ && taskQue_.size()==0)
{
if(poolMode_ == PoolMode::MODE_CACHED)
{
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
//...
return;
}
}
else
{
notEmpty_.wait(lock);
}
}//退出while的两种情况:1.!Running, 2.Running && taskQue_.size()!=0
if(!isPoolRunning_)
{
break;
}
--idleThreadSize_;
std::cout << "获取任务成功" << std::endl;
task = taskQue_.front();taskQue_.pop();
--taskSize_;
notFull_.notify_all();
}
if(task != nullptr)task->exec();
++idleThreadSize_;
lastTime = ...;
}
threads_.erase(threadid);
std::cout << "线程退出" << std::endl;
exitCond_.notify_all();
return;
}

如此一来,线程池先抢到锁,再在exitCond_wait阻塞释放锁后,子线程得到锁向下走到了第二个while语句,由于再次判断isPoolRunning_,这时发现改变为false了,就不会走到notEmpty_.wait()了。灰溜溜去做删除动作了。

3.2的解决方案:调整加锁位置

虽然3.1解决了isPoolRunning状态脏读这个漏洞问题,但是依旧不能解决“子线程先抢到锁”从而在notEmpty_上等待这种情况发生。即使子线程释放了锁,但是没有人再去唤醒notEmpty_,因为原来的语序是先唤醒,再抢锁。所以我们要让用户线程的加锁、唤醒放在子线程wait之后,要让子线程wait释放了锁之后,才让用户线程唤醒notEmpty_!于是:

1
2
3
4
5
6
7
ThreadPool::~ThreadPool()
{
isPoolRunning_ = false;
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all();
exitCond_.wait(lock, [&]()->bool {return threads_.size()==0;});
}

调换了一下第二、第三句,即子线程先抢到锁后wait在notEmpty_上后释放锁,用户线程再抢到锁之后才去notify_all()它,那么阻塞的子线程被唤醒了,往下执行,发现isPoolRunning_变为false了,灰溜溜去做删除动作了。完美解决问题。

编译为动态库

直接在命令行使用g++编译

1
g++ -o libxcg-threadpool.so -fPIC -shared [源文件如: threadpool.cpp] -std=c++17

用cmake构建编译

根目录

1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.0.0)
project(xcg-threadpool)
#配置编译选项
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g)
#配置最终的库文件输出的路径
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
#配置头文件的搜索路径
include_directories(${PROJECT_SOURCE_DIR}/include)
#加载子目录
add_subdirectory(src)

src目录

1
2
3
4
5
6
#定义了SRC_LIST变量,包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
#指定生成可执行文件
add_library(${PROJECT_NAME} SHARED ${SRC_LIST})
#指定可执行文件链接时 需要依赖的库文件
target_link_libraries(xcg-threadpool pthread)

使用动态库编译可执行文件

首先需要把动态库libxcg-threadpool.so移动到/usr/lib/usr/local/lib;把动态库对应的头文件threadpool.h移动到/usr/include/usr/local/include下。

1
g++ -o main main.cpp -std=c++17 -lxcg-threadpool -lpthread

重构简洁版的线程池

要对外提供一个简单易用的submitTask接口。
让用户直接传一个:future result = submitTask(func, 1, 2);
而不用再去继承一个抽象的Task了,甚至还得make_shared<MyTask()>(func, 1, 2);,这样太繁琐了。

主要用到的技巧就是:可变参模板,万能引用,函数绑定器,完美转发。
submitTask的返回类型无法写出,需要用到auto占位,之后用decltype进行推导。

ThreadPool成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private:
std::unordered_map<int, std::unique_ptr<Thread>> threads_;
int initThreadSize_;
int threadSizeThreshHold_;
std::atomic_int curThreadSize_;
std::atomic_int idleThreadSize_;

using Task = std::function<void()>;
// 此处之前的版本是智能指针管理的task,目前不需要了。
// 因为我们把sharedptr对Task的包装都在submitTask内部进行封装了。
std::queue<Task> taskQue_;

std::atomic_int taskSize_;
int taskQueMaxThreshHold_;

std::mutex taskQueMtx_;
std::condition_variable notFull_;
std::condition_variable notEmpty_;
std::condition_variable exitCond_;

PoolMode poolMode_;
std::atomic_bool isPoolRunning_;
};

优化submitTask

以下的关键点:任务队列中存储的任务(lambda)捕获了shared_ptr,因此当这个任务被执行时,它仍然持有一个shared_ptr,这样packaged_task就不会在执行前被销毁。
当任务执行完毕,这个lambda就会被销毁,那么它持有的shared_ptr也会被销毁(引用计数减一)。如果这时没有其他shared_ptr引用(比如在submitTask函数中,除了任务队列中的那个shared_ptr,还有submitTask函数中的局部变量task),那么packaged_task就会在任务执行完毕后被销毁。

但是,在submitTask函数中,我们返回了一个future,这个future是与packaged_task关联的。值得注意的是,std::packaged_task的析构不会影响其关联的future吗?不会。因为std::future是通过共享状态(shared state)与std::packaged_task关联的,这个共享状态独立于packaged_task对象。也就是说,即使packaged_task对象销毁了,只要共享状态还存在(因为future还持有),那么future仍然可以正常工作。所以,packaged_task对象销毁后,futureget()仍然可以正常返回结果(或者异常)。

我们要明确的:
future只是负责持有结果。但
future 不会延长 task 的生命周期​​,

  • std::future 只管理​​共享状态​​(shared state)的生命周期(存储结果的内存区域)
  • ​不会​​延长关联的 packaged_task 对象的生命周期
  • future 只依赖共享状态,不依赖 packaged_task 对象本身。即共享状态的生命周期独立于 packaged_task
    • 由最后一个引用它的 future 或 shared_future 管理
    • 与 packaged_task 对象的销毁无关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
classDiagram
class PackagedTask {
+operator()()
+get_future()
-shared_state*
}

class SharedState {
+result
+ready_flag
}

class Future {
+shared_state*
}

PackagedTask --> SharedState : 独占拥有
Future --> SharedState : 只读引用

关键点:

  1. packaged_task ​​独占拥有​​共享状态的生命周期
  2. future ​​只持有对共享状态的弱引用​
  3. 移动、销毁 packaged_task 会​​转移共享状态的所有权​。但是不会销毁SharedState,因为外部还有Future在观察。
    1. 因此,其实也可以在之前就保存下来task的future,然后再去把task转移给任务队列。这样无所谓task是否在外部获得结果之前销毁了。
      1. 但是这样会限制task在移动之后的操作,无法在移动之后获得future了。

关键点:

  1. ​共享所有权​​:
    • 任务队列和工作线程共同持有 shared_ptr
      • (⭐)当 lambda 被放入队列时,shared_ptr 的引用计数增加
    • 最后一个持有者释放时 packaged_task 才会销毁
      • 当队列中的 lambda 被销毁时,shared_ptr 引用计数减少
  2. ​生命周期安全​​:
    • 即使 submitTask 返回,只要队列或线程持有 shared_ptr
    • packaged_task 对象会一直存活到任务执行完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
template<typename Func, typename... Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>
{
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();

std::unique_lock<std::mutex> lock(taskQueMtx_);
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]() -> bool { return taskQue_.size() < taskQueMaxThreshHold_; }))
{
std::cerr << "task queue is full, submit task failed." << std::endl;
// 这里,用 lambda 表达式 充当 线程函数 func,函数中只返回一个空RType()
// RType()即一个默认构造的 future
// 但是可能会出现两个问题:
// 1. RType 没有默认构造函数(如 std::atomic),
// 2. 语义不准确,应该显式指出 提交失败的原因
// auto task = std::make_shared<std::packaged_task<RType()>>(
// []() -> RType { return RType(); });
// task();
// return task->get_future();

// 改进方案一:
// return std::async(std::launch::deferred,
// [] { throw std::runtime_error("Task queue full"); }).share().get_future();
// 改进方案二:
std::promise<RType> p;
p.set_exception(std::make_exception_ptr(std::runtime_error("Task queue full")));
return p.get_future();
}

// 关键:task这个shared_ptr,由这个lambda表达式捕获了,之后这个task的生命周期就和任务队列一致了。
taskQue_.emplace([task]() { (*task)(); });
++taskSize_;

notEmpty_.notify_all();

if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshHold_)
{
std::cout << ">>> create new thread..." << std::endl;
// 创建新的线程对象
auto ptr = std::make_unique<Thread(std::bind(&ThreadPool::threadFunc, this, std::placeholder::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
threads_[threadId]->start();
++curThreadSize_;
++idleThreadSize_;
}
return result;
}

再次优化submitTask

  • future 只依赖共享状态,不依赖 packaged_task 对象本身。即共享状态的生命周期独立于 packaged_task
    • 由最后一个引用它的 future 或 shared_future 管理
    • 与 packaged_task 对象的销毁无关
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
classDiagram
class PackagedTask {
+operator()()
+get_future()
-shared_state*
}

class SharedState {
+result
+ready_flag
}

class Future {
+shared_state*
}

PackagedTask --> SharedState : 独占拥有
Future --> SharedState : 只读引用

关键点:

  1. packaged_task ​​独占拥有​​共享状态的生命周期
  2. future ​​只持有对共享状态的弱引用​
  3. 移动、销毁 packaged_task 会​​转移共享状态的所有权​。但是不会销毁SharedState,因为外部还有Future在观察。
    1. 因此,其实也可以在之前就保存下来task的future,然后再去把task转移给任务队列。这样无所谓task是否在外部获得结果之前销毁了。
      1. 移动task的缺点:但是这样会限制task在移动之后的操作,无法在移动之后获得future了。

优化点:

  1. 完美转发捕获(C++20)代替 std::bind,更高效且避免额外开销
  2. 移动task到lambda中,而不是建立shared_ptr来管理它。比shared_ptr版本更轻量高效(减少一次堆分配)。
    注意点:
    为什么需要mutable?

因为packaged_taskoperator()调用会改变packaged_task对象(它内部会执行任务并将结果存储到共享状态,所以会改变packaged_task的状态),因此lambda的调用运算符必须声明为mutable,否则编译器会认为你在一个constlambda中试图修改捕获的对象。
所以,这里mutable是必须的。

在完美转发捕获可变参时,为什么也需要加mutable?
我们通过完美转发捕获了funcargs。在lambda体内,我们调用了func,并使用了std::forward<Args>(args)...来转发参数。由于转发可能意味着移动(当参数是右值时),这会修改捕获的args(移动后,对象的状态被改变,通常为空或有效状态被转移)。因此,我们需要mutable来允许对捕获的变量进行修改(包括移动操作)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
template<typename Func, typename... Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>
{
using RType = decltype(func(args...));

// 1. 创建任务和future
auto task = std::packaged_task<RType()>(
[func = std::forward<Func>(func), ...args = std::forward<Args>(args)] () mutable
{
return func(std::forward<Args>(args)...);
}); // ✅ 完美转发捕获(C++20)

std::future<RType> result = task.get_future();

// 2. 锁保护队列操作
std::unique_lock<std::mutex> lock(taskQueMtx_);

// 3. 检查队列容量
if (!notFull_.wait_for(lock, 1s,
[&] { return taskQue_.size() < taskQueMaxThreshHold_; }))
{
// 优雅的错误处理
std::promise<RType> p;
p.set_exception(
std::make_exception_ptr(std::runtime_error("Task queue full")));
return p.get_future(); // ⏱️ 立即就绪的异常future
}

// 4. 提交任务到队列
taskQue_.emplace([task = std::move(task)]() mutable {
task(); // ✅ 执行移动后的任务
});
++taskSize_;

// 5. 通知工作线程
notEmpty_.notify_all();

// 6. 动态扩缩容逻辑 (CACHED模式)
if (poolMode_ == PoolMode::MODE_CACHED &&
taskSize_ > idleThreadSize_ &&
curThreadSize_ < threadSizeThreshHold_)
{
// ... 创建新线程 (省略细节)
}

return result; // 🎯 返回有效的future
}

threadFunc的重构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
const int THREAD_IDLE_TIME = 60;	//单位s
void ThreadPool::threadFunc(int threadid)
{
/* 上一次线程执行完任务的时间 */
auto lastTime = std::chrono::high_resolution_clock().now();
for(;;)
{
// 此处之前的版本是智能指针管理的task,目前不需要了。
// 因为我们把sharedptr对Task的包装都在submitTask内部进行封装了。
Task task;
{
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid: " << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;

// cached模式下,有可能已经创建了很多的线程,但是空闲时间超过60s,应该把多余的线程
// 结束回收掉(超过initThreadSize_数量的线程要进行回收)
// 当前时间 - 上一次线程执行的时间 > 60s

// 每一秒中返回一次 怎么区分:超时返回?还是有任务待执行返回
// 锁 + 双重判断
while (taskQueue_.size() == 0)
{
// 线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid: " << std::this_thread::get_id() << " exit!"
<< std::endl;
exitCond_.notify_all();
return;
}

if (poolMode_ == PoolMode::MODE_CACHED)
{
//while(taskQue_.size() > 0) // 不再去判断 taskQue_.size() > 0,即使size是0,也一直等待60s
//{

/* 超时 1s 返回 */
if(std::cv_status::timeout ==
notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now(); //返回的是time_point类型
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if(dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
/* 开始回收当前线程 */
/* 修改记录线程数量的相关变量 */
/* 把线程从线程列表容器中删除 */
/* 问题:怎么知道线程函数对应的是线程列表容器中的哪一个线程对象 */
/* 我们需要有一个映射关系来记录:threadid => thread对象 => 删除 */
thread_.erase(threadid);
--curThreadSize_;
--idleThreadSize_;
std::cout << "threadid: " << std::this_thread::get_id() << " exit!"
<< std::endl;
return;
}
}
//} // while(taskQue_.size() > 0) 结束
}
else // poolMode_ != PoolMode::MODE_CACHED
{
notEmpty_.wait(lock /* , [&]()->bool { return taskQue_.size() > 0; } */);
}
} // end while (taskQueue_.size() == 0)

// 此时 taskQueue_.size() != 0 而 我们又拿到了 任务队列的锁,可以直接取走任务
--idleThreadSize_;
std::cout << "tid: " << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
task = taskQue_.front();
taskQue_.pop();
--taskSize_;

if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}

notFull_.notify_all();
} // end std::unique_lock<std::mutex> lock(taskQueMtx_);
if(task != nullptr)
{
task();
}
++idleThreadSize_;
/* 线程执行完任务后更新lastTime */
lastTime = std::chrono::high_resolution_clock().now();
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int sum(int a, int b)
{
return a + b;
}
int main()
{
ThreadPool pool;
pool.setMode(PoolMode::MODE_CACHED);
pool.start(2);
future<int> res1 = pool.submitTask(sum, 1, 2);
future<int> res2 = pool.submitTask([](int begin, int end) -> int
{
int sum = 0;
for (int i = begin; i <= end; ++i)
{
sum += i;
}
return sum;
}, 1, 100);
future<int> res3 = pool.submitTask(sum, 1, 2);
future<int> res4 = pool.submitTask(sum, 1, 2);

cout << res1.get() << endl;
cout << res2.get() << endl;
cout << res3.get() << endl;
cout << res4.get() << endl;
}