半同步半异步线程池模型

内容

  1. 半同步半异步线程池介绍

线程池介绍

在处理大量并发任务的时候,如果按照传统的方式:来一个任务请求,对应一个线程来处理请求任务。那么大量的线程创建和销毁将消耗过多的系统资源,还增加了线程上下文(运行环境)切换的开销,而通过线程池技术就可以很好地解决这些问题。

线程池技术通过在系统中预先创建一定数量的线程,当任务请求到来时从线程池中分配一个预先创建的线程去处理任务,线程在处理完任务之后还可以重用,不会销毁,而是等待下次任务的到来。这样,通过线程池能避免大量的线程创建和销毁动作,从而节省系统资源

这样做的一个好处是,对于多核处理器,由于线程会被分配到多个CPU,会提高并行处理的效率;另一个好处是每个线程独立阻塞,可以防止主线程被阻塞而使主流程被阻塞,导致其他的请求得不到响应的问题。

线程池分为半同步半异步线程池和领导者追随者线程池,本文将主要介绍半同步半异步线程池,这种线程池在实现上更简单,使用得比较多。

半同步半异步线程池结构

半同步半异步线程池分成三层,如图所示。

image-20220330153004372

第一层是同步服务层,它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理,而是将这些任务放到一个同步排队层中,等待处理。

第二层是同步排队层,来自上层的任务请求都会加到排队层中等待处理。

第三层是异步服务层,这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理。

这种三层的结构可以最大程度处理上层的并发请求。对于上层来说只要将任务丢到同步队列中就行了,至于谁去处理,什么时候处理都不用关心,主线程也不会阻塞,还能继续发起新的请求。至于任务具体怎么处理,这些细节都是靠异步服务层的多线程异步并行来完成的,这些线程是一开始就创建的,不会因为大量的任务到来而创建新的线程,避免了频繁创建和销毁线程导致的系统开销,而且通过多核处理能大幅提高处理效率。

关键技术分析

同步排队层居于核心地位,因为上层会将任务加到排队层中,异步服务层同时也会取出任务,这里有一个同步的过程。

在实现时,排队层就是一个同步队列,允许多个线程同时添加或取出任务,并且要保证操作过程是安全的。

线程池有两个活动过程,一个是往同步队列中添加任务,另一个是从同步队列中取任务。活动图如图所示。

image-20220330154305073

从活动图中可以看到线程池的活动过程,一开始线程池会启动一定数量的线程,这些线程属于异步层,主要用来并行处理排队层中的任务。

如果排队层中的任务数为空,则这些线程等待任务的到来,如果发现排队层中有任务了,线程池则会从等待的这些线程中唤醒一个来处理新任务。同步服务层则会不断地将新的任务添加到同步排队层中这里有个问题值得注意,有可能上层的任务非常多,而任务又是非常耗时的,这时,异步层中的线程处理不过来,则同步排队层中的任务会不断增加,如果同步排队层不加上限控制,则可能会导致排队层中的任务过多、内存暴涨的问题。因此,排队层需要加上限的控制,当排队层中的任务数达到上限时,就不让上层的任务添加进来,起到限制和保护的作用。

同步队列

同步队列即为线程中三层结构中的中间那一层,它的主要作用是保证队列中共享数据线程安全,还为上一层同步服务层提供添加新任务的接口,以及为下一层异步服务层提供取任务的接口。同时,还要限制任务数的上限,避免任务过多导致内存暴涨的问题。

同步队列的实现比较简单,我们会用到C++11的锁、条件变量、右值引用、std::move以及std::forward。move是为了实现移动语义,forward是为了实现完美转发。

同步队列的锁是用来线程同步的。

条件变量是用来实现线程通信的,即线程池空了就要等待,不为空就通知一个线程去处理;线程池满了就等待,直到没有满的时候才通知上层添加新任务。

同步队列的具体实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
using namespace std;
template<class T>
class SyncQueue
{
public:
SyncQueue(int maxSize) : m_maxSize(maxSize), m_needStop(false){}
void Put(const T& x)
{
Add(x);
}
void Put(T && x)
{
Add(std::forward<T>(x));
}
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
//如果队列没有停止运行,则继续看任务队列是否空,如果空则阻塞,不空则继续
m_notEmpty.wait(locker,
[this]()->bool{return m_needStop || NotEmpty();});
if(m_needStop)return;
list = std::move(m_queue); //move对于list来说只移动容器内部对象
m_notFull.notify_one();
}
void Take(T & t)
{
std::unique_lock<std::mutex> locker(m_mutex);
//如果队列没有停止运行,则继续看任务队列是否空,如果空则阻塞,不空则继续
m_notEmpty.wait(locker,
[this]()->bool{return m_needStop || NotEmpty();});
if(m_needStop)return;
t = m_queue.front(); //赋值拷贝
m_queue.pop_front();
m_notFull.notify_one();
}
private:
std::list<T> m_queue; //任务
std::mutex m_mutex;
std::condition_variable m_notEmpty; //异步任务层
std::condition_variable m_notFull; //同步任务层
int m_maxSize;
bool m_needStop;
};

其中Take函数取的方法是“钓鱼”的方式。

如果想通过函数的return直接返回_Ty是不现实的,因为如果栈空的话会产生返回值混淆。

所以在外部先定义一个对象名,通过传入引用去接收对象值(通过赋值拷贝)。

1
2
3
4
5
6
7
8
9
10
11
12
_Ty Top();
_Ty GetTop()
{
/* if(Empty())return -1; */
}
bool GetPop(_Ty &v)
{
if(Empty())return false;
v = data[top];
top -= 1;
return true;
}

Stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class SyncQueue
{
public:
void Stop()
{
{
std::unique_lock<std::mutex> locker(m_mutex);
m_needStop = true;
}
//需要唤醒所有阻塞的线程,让他们尽快退出业务操作。
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()/* 此方法需要修改成员锁的属性,不能设置为const方法 */
{
std::unique_lock<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::unique_lock<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
std::unique_lock<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if(full)
{
cout << "m_queue 满了" << endl;
}
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if(empty)
{
cout << "m_queue 空了" << endl;
}
return !empty;
}
};

Add

1
2
3
4
5
6
7
8
9
10
11
12
13
class SyncQueue
{
private:
template<class F>
void Add(F&& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker,
[this]()->bool {return m_needStop||NotFull();});
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
};

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const int MaxTaskCount = 100;
class ThreadPool
{
public:
using Task = std::function<void()>;
ThreadPool(int numThreads = std::thread::hardware_concurrency)
: m_queue(MaxTaskCount) /* 设置m_queue的最大容纳任务量 */
{
Start(numThreads);
}
~ThreadPool()
{
Stop();
}
private:
std::list<std::shared_ptr<std::thread> > m_threadgroup;
SyncQueue<Task> m_queue;
atomic_bool m_running; //初始化为假
std::once_flag m_flag; //确保函数执行一次的标志
}
  • Start、RunInThread、StopThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class ThreadPool
{
public:
void Stop()
{
std::call_once(m_flag, [this]() {StopThreadGroup();});
}

void Start(int numThreads) //创建若干线程,放到线程组
{
m_running = true;
for(int i = 0; i < numThreads; ++i)
{
//注意,RunInThread是类成员函数,需要指明其是哪个类中的,而且要在前面加&
m_threadgroup.push_back(
make_shared<thread>(&ThreadPool::RunInThread, this)
);

}
}
private:
void RunInThread() //主线程对应的函数,属于类成员函数,需传入this
{
while(m_running) //Start后running为true
{
std::list<Task> list;
m_queue.Take(list);
for(auto & task : list)
{
if(!m_running)return;
task();
}
}
}
void StopThreadGroup()
{
m_queue.Stop();
m_running = false;
for(auto & th : m_threadgroup)
{
if(th)
{
th->join();
}
}
m_threadgroup.clear();
}
};
  • AddTask
1
2
3
4
5
6
7
8
9
10
11
12
class ThreadPool
{
public:
void AddTask(Task && task)
{
m_queue.Put(task);
}
void AddTask(const Task & task)
{
m_queue.Put(task);
}
};

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void funa(ThreadPool &pool)
{
for(int i = 0;i < 10; ++i)
{
auto thdid = this_thread::get_id();
pool.AddTask();
}
}
void TestThPool()
{
ThreadPool pool;
std::thread tha(fun, 1);
}
int main()
{
TestThPool();
return 0;
}

下去思考的问题

  1. 还有一个概念是协程。是在用户态的模拟并发,因而比线程速度更快,但是协程本质上不是并行、高并发,因为实际上它相对于CPU来说是处于串行的状态。
  2. 下去思考线程池如何与对象池连接,对象中可以连接MySQL数据库,如何使用线程池调用对象的查询、更新数据方法。
  3. 创建一个线程对象与某个线程挂接后,能不能在中间暂停?暂停时改变线程的属性,然后再继续跑动?
  4. 能不能精准地创建一个线程,准确地挂在某个核上?
  5. 下去看看谷歌的线程池
  6. 下去看看protobuf

Cpp_functional

内容

  1. std::function
  2. std::bind

可调用对象

Cpp中,存在“可调用对象(Callable Objects)”这一概念。准确来说,可调用对象有如下几种定义:

  1. 是一个函数指针
  2. 是一个具有operator()成员函数的类对象(仿函数)。
  3. 是一个可被转换为函数指针的类对象。
  4. 是一个类成员(函数)指针。
  • 可调用对象的使用示例

    • 函数指针

      1
      2
      3
      4
      void func(void)
      {
      // ...
      }
    • 仿函数

      1
      2
      3
      4
      5
      6
      7
      struct Foo
      {
      void operator()(void)
      {
      // ...
      }
      };
    • 可转换为函数指针的类对象

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      struct Bar
      {
      using fr_t = void(*)(void);
      static void func(void)
      {
      // ...
      }
      operator fr_t(void)
      {
      return func;
      }
      }
    • 类成员函数指针

      1
      2
      3
      4
      5
      6
      7
      8
      struct A
      {
      int a_;
      void mem_func(void)
      {
      // ...
      }
      };
    • 测试

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      int main(void)
      {
      void (* func_ptr)(void) = &func;
      func_ptr();

      Foo foo;
      foo();

      Bar bar;
      bar();

      void (A::*mem_func_ptr)(void) = &A::mem_func;
      int A::*mem_obj_ptr = &A::a_;
      A aa;
      (aa.*mem_func_ptr)();
      aa.*mem_obj_ptr = 123;

      return 0;
      }

从上述可看到,除了类成员指针之外,上面定义涉及的对象均可以像一个函数那样做调用操作。Cpp11中,这些对象(func_ptr、foo、bar、mem_func_ptr、mem_obj_ptr)都被称作可调用对象。相对应地,这些对象的类型被统称为“可调用类型”。

上面对可调用类型的定义里并没有包括函数类型,这是因为函数类型并不能直接用来定义对象;也没有包括函数引用,因为引用从某种意义来说,可以看作一个const的函数指针。

Cpp中的可调用对象具有统一的操作形式,即后面加括号进行调用(除了类成员函数指针),但是定义方法却五花八门。我们试图使用统一的方式进行保存,或传递一个可调用对象时。于是Cpp11通过提供std::funciton和std::bind统一了可调用对象的各种操作。

可调用对象包装器std::function

std::function是可调用对象的包装器,是一个类模板,可以容纳除了类成员函数指针之外的所有可调用对象。通过指定它的模板参数,它可以用统一的方式处理函数、函数对象、函数指针,并允许保存和延迟执行它们。

  • std::function的基本用法示例

    • 绑定一个普通函数

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      #include<iostream>
      #include<functional>
      void func(void)
      {
      std::cout << __FUNCTION__ << std::endl;
      }
      int main()
      {
      std::function<void(void)> fr1 = func;
      fr1();
      return 0;
      }
      /* func */
    • 绑定一个类的静态成员函数

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      #include<iostream>
      #include<functional>
      class Foo
      {
      public:
      static int foo_func(int a)
      {
      std::cout << __FUNCTION__ << "(" << a << ") ->: ";
      return a;
      }
      };
      int main()
      {
      std::function<int(int)> fr2 = Foo::foo_func;
      std::cout << fr2(123) << std::endl;
      return 0;
      }
      /* foo_func(123) ->: 123 */
    • 绑定一个仿函数

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      class Bar
      {
      public:
      int operator()(int a)
      {
      std::cout << __FUNCTION__ << "(" << a << ") ->: ";
      return a;
      }
      };
      int main()
      {
      Bar bar;
      fr2 = bar;
      std::cout << fr2(123) << std::endl;
      return 0;
      }
      /* operator()(123) ->: 123 */

从上面我们可以看到std::function的使用方法,当我们给std::function填入合适的函数签名(即一个函数类型,只需要包括返回值和参数表)之后,它就变成了一个可以容纳所有这一类调用方式的“函数包装器”。

  • std::function作为回调函数的示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #include<iostream>
    #include<functional>
    class A
    {
    std::function<void()> callback_;
    public:
    A(const std::function<void()> & f) : callback_(f)
    {}
    void notify(void)
    {
    callback_(); //回调到上层
    }
    };
    class Foo
    {
    public:
    void operator()(void)
    {
    std::cout << __FUNCTION__ << std::endl;
    }
    };
    int main()
    {
    Foo foo;
    A aa(foo);
    aa.notify();
    return 0;
    }

从上面例子中可以看到,std::function可以取代函数指针的作用。因为它可以保存函数延迟执行,所以比较适合作为回调函数。

  • std::function还可以作为函数入参

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include<iostream>
    #include<functional>
    void call_when_even(int x, const std::funciton<void(int)> & f)
    {
    if(!(x&1)) //x % 2 == 0
    {
    f(x);
    }
    }
    void output(int x)
    {
    std::cout << x << " ";
    }
    int main()
    {
    for(int i = 0; i < 10; ++i)
    {
    call_when_even(i, output);
    }
    std::cout << std::endl;
    return 0;
    }
    /* 0 2 4 6 8 */

从上例可以看到,std::function比普通函数指针更灵活、便利。

std::bind绑定器

当std::function和std::bind配合起来使用时,所有的可调用对象(包括类成员函数指针和类成员指针)都将具有统一的调用方式。

std::bind用来将可调用对象与其参数一起进行绑定,绑定后的结果可以使用std::function进行保存,并延迟调用到任何我们需要的时候。

通俗来讲,bind主要有两大作用:

  1. 将可调用对象与其参数一起绑定成一个仿函数。
  2. 将多元(参数个数为n, n>1)可调用对象转成一元或者(n-1)元可调用对象,即只绑定部分参数。
  • 实际使用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    #include<iostream>
    #include<functional>
    void call_when_even(int x, const std::function<void(int)> & f)
    {
    if(!(x&1)) //x % 2 == 0
    {
    f(x);
    }
    }
    void output(int x)
    {
    std::cout << x << " ";
    }
    void output_add_2(int x)
    {
    std::cout << x + 2 << " ";
    }
    int main()
    {
    {
    auto fr = std::bind(output, std::placeholders::_1);
    for(int i = 0; i < 10; ++i)
    {
    call_when_even(i, fr);
    }
    std::cout << std::endl;
    }
    {
    auto fr = std::bind(output_add_2, std::placeholders::_1);
    for(int i = 0; i < 10; ++i)
    {
    call_when_even(i, fr);
    }
    std::cout << std::endl;
    }
    return 0;
    }
    /*
    0 2 4 6 8
    2 4 6 8 10
    */

同样还是上面std::function中最后的一个例子,只是在这里我们使用了std::bind,在函数外部通过绑定不同的函数,控制了最后的执行结果。

我们使用auto fr保存std::bind的返回结果,是因为我们并不关心std::bind真正的返回类型(实际上std::bind的返回类型是一个stl内部定义的仿函数类型),只需要知道它是一个仿函数,可以直接赋值给一个std::function。当然,这里直接使用std::function类型来保存std::bind的返回值也是可以的。

std::placeholders::_1是一个占位符,代表这个位置将在函数调用时,被传入的第一个参数所替代。

因为有了占位符的概念,std::bind的使用非常灵活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<iostream>
#include<functional>
void output(int x, int y)
{
std::cout << x << " " << y << std::endl;
}
int main()
{
std::bind(output, 1, 2)(); //输出1 2
std::bind(output, std::placeholders::_1, 2)(1); //输出1 2
std::bind(output, 2, std::placeholders::_1)(1); //输出2 1
/* std::bind(output, 2, std::placeholders::_2)(1);*/ //error, 缺少第二个参数
std::bind(output, 2, std::placeholders::_2)(1, 2); //输出2 2, 相当于第1个参数仅仅是标志
std::bind(output, std::placeholders::_1,
std::placeholders::_2)(1, 2); //输出1 2
std::bind(output, std::placeholders::_2,
std::placeholders::_1)(1, 2); //输出2 1
return 0;
}

上面例子对std::bind的返回结果直接施以调用,可以看到,std::bind可以直接绑定函数的所有参数,也可以仅绑定部分参数

在绑定部分参数的时候,通过使用std::placeholders,来决定空位参数将会属于调用发生时的第几个参数。

  • 下面来看bind和function配合使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include<iostream>
    #include<functional>
    class A
    {
    public:
    int i_ = 0;
    void output(int x, int y)
    {
    std::cout << x << " " << y << std::endl;
    }
    };
    int main()
    {
    A a;
    std::functional<void(int, int)> fr =
    std::bind(&A::output, &a, std::placeholders::_1,
    std::placeholders::_2);//目标函数A::output, 第一个参数是this指针, 还有其他两个参数。
    fr(1, 2); //输出1 2
    std::function<int&(void)> fr_i = std::bind(&A::i_, &a);
    fr_i() = 123;
    std::cout << a.i_ << std::endl; //输出123
    return 0;
    }

fr的类型是std::function<void(int, int)>。我们通过使用std::bind,将A的成员函数output的指针和a绑定,并转换为一个仿函数放入fr中存储。

之后,std::bind将A的成员i_的指针和a绑定,返回的结果被放入std::function<int&(void)>中存储,并可以在需要时修改访问这个成员。

实质

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Alloc
{
size_t _sz;
int *& _ptr;
public:
Alloc(size_t sz, int *& p) : _sz(sz), _ptr(p) {}
void operator()()
{
_ptr = (int*)malloc(_sz);
}
}
void alloc(size_t sz, int *& p)
{
p = (int*)malloc(sz);
}
int main()
{
int * p = nullptr;
int sz = sizeof(int) * 10;
auto fr = std::bind(alloc, sz, std::ref(p));
fr();
if(p != nullptr)
{
cout << p << endl;
}
}