Linux_libevent_Reactor模式

内容

  1. IO框架库
  2. Reactor模式的IO框架库包含哪些组件
  3. libevent是一个轻量级的I/O框架库。

I/O框架库

I/O框架库以库函数的形式,封装了较为底层的系统调用。
各种I/O框架库的实现原理基本相似,要么以Reactor模式实现,要么以Proactor模式实现,要么同时两种模式实现。

Reactor模式

Reactor模式要求主线程(IO处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即向工作线程(逻辑单元)通知该事件。除此之外,主线程不做其他实质性的工作。即读写数据、接受新的连接、处理客户请求均在工作线程中完成。

工作流程

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

  1. 主线程往epoll内核事件表中注册socket上的读就绪事件
  2. 主线程调用epoll_wait等待socket上有数据可读。
  3. socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列
  4. 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后工作线程往epoll内核事件表中注册该socket上的写就绪事件
  5. 主线程调用epoll_wait等待socket可写。
  6. socket可写时,epoll_wait通知主线程主线程将socket可写事件放入请求队列
  7. 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

总结:主线程注册读事件,可读时,主线程放入请求队列;工作线程读数据,处理请求,工作线程注册写事件;可写时,主线程放入请求队列;工作线程写数据。

image-20220524104612105

组件框架

基于Reactor模式的I/O框架库包含如下几个组件:

  1. 句柄(Handle)
  2. 事件多路分发器(Event Demultiplexer)
  3. 事件处理器(Event Handler)和具体的事件处理器(Concrete EventHandler)
  4. Reactor。

这些组件的关系如下图所示。

image-20220423142048988

  • 句柄
    • 说白了就是文件描述符,句柄在windows上某个资源的id,因为libevent库是跨平台的,所以叫法容易混用。
  • 事件多路分发器
    • 事件的到来是随机的、异步的。比如我们无法预知程序何时收到一个客户连接请求,又亦或收到一个暂停信号,所以程序需要循环地等待判断有无事件产生,这就是事件循环
    • 在事件循环中,等待事件一般使用I/O复用技术来实现。I/O框架库一般将系统支持的各种I/O复用系统调用封装成统一的接口,称为事件多路分发器。因此事件多路分发器可以理解为封装了IO复用,提供了一个更便于使用的接口。
    • 事件多路分发器的demultiplex方法是等待事件的核心函数,其内部调用的是select、poll、epoll_wait等函数。
    • 事件多路分发器还需要实现register_eventremove_event方法,以供调用者给事件多路分发器中添加事件和从中删除事件。
  • 事件处理器和具体事件处理器
    • 事件处理器执行事件对应的业务逻辑。它通常包含一个或多个handle_event回调函数,这些回调函数在事件循环中被执行。
    • I/O框架库提供的事件处理器通常是一个接口,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般被声明为虚函数,以支持用户的扩展
    • 此外,事件处理器一般还提供一个get_handle方法,它返回与该事件处理器关联的句柄。那么,事件处理器和句柄有什么关系?当事件多路分发器检测到有事件发生时,它是通过句柄来通知应用程序的。因此,我们必须将事件处理器和句柄绑定,才能在事件发生时获取到正确的事件处理器。
  • Reactor是I/O框架库的核心。它提供的几个主要方法是:
    • handle_events,该方法执行事件循环。重复过程:等待事件,然后依次处理所有就绪事件对应的事件处理器。
    • register_handler,该方法调用事件多路分发器的register_event方法来给事件多路分发器中注册一个事件。
    • remove_handler,该方法调用事件多路分发器的remove_event方法来删除事件多路分发器中的一个事件。

libevent

libevent支持的事件类型

1
2
3
4
5
6
#define EV_TIMEOUT		0x01	/* 定时事件 */
#define EV_READ 0x02 /* 可读事件 */
#define EV_WRITE 0x04 /* 可写事件 */
#define EV_SIGNAL 0x08 /* 信号事件 */
#define EV_PERSIST 0x10 /* 永久事件 */
#define EV_ET 0x20 /*边沿触发事件,需要IO复用系统调用支持,如epoll*/

编程流程

  1. 定义、创建框架示例
  2. 向框架示例注册、注销事件:指定具体哪个base、哪个描述符,哪种事件,绑定回调函数参数
    1. 有哪些事件:IO事件(fdEV_READfun_cb)、信号事件(sigEV_SIGNALsig_cb)、定时器事件(-1EV_TIMEOUTtv_cb
  3. 开启事件循环,实际上就是框架底层调用select/poll/epoll
  4. 事件发生之后,调用回调函数如fun_cb

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include<sys/signal.h>	//SIGINT
#include<event.h>
void signal_cb(int fd, short event, void * argc)
{
struct event_base* base = (event_base*)argc;
struct timeval delay = {2, 0};
printf("Caught an interrupt signal; exiting cleanly in 2 seconds...\n");
event_base_loopexit(base, &delay);
}
void timeout_cb(int fd, short event, void * argc)
{
printf("timeout\n");
}
int main()
{
struct event_base* base = event_init();
struct event* signal_event = evsignal_new(base, SIGINT, signal_cb, base);
event_add(signal_event, NULL);

timeval tv = {1, 0};
struct event* timeout_event = evtimer_new(base, timeout_cb, NULL);
event_add(timeout_event, &tv);

event_base_dispatch(base);

event_free(timeout_event);
event_free(signal_event);
event_base_free(base);
}

上面的代码描述了使用Libevent库的主要逻辑:

  1. 调用event_init函数创建event_base对象。一个event_base相当于一个Reactor实例。
  2. 创建具体的事件处理器,并设置他们所从属的Reactor实例。本例中的**evsignal_new用于创建信号事件处理器,evtimer_new**用于创建定时事件处理器,它们是定义在/include/event2/event.h文件中的宏,代码如下。其中evtimer_new的原型event_new的第二个参数默认赋-1,第三个参数默认赋0
1
2
3
4
#define evsignal_new(b, x, cb, arg) \
event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
#define evtimer_new(b, cb, arg) \
event_new((b), -1, 0, (cb), (arg))
  1. 回调函数的格式需要统一:void fun_cb(int fd, short event, void* argc)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include<sys/signal.h>	//SIGINT
#include<event.h>
void sig_fun(int fd, short event, void * argc)
{
printf("sig=%d\n", fd);
}
void timeout_fun(int fd, short event, void * argc)
{
if(ev & EV_TIMEOUT)
{
printf("timeout\n");
}
}
int main()
{
struct event_base* base = event_init();
assert(base != NULL);

struct event* sig_ev = evsignal_new(base, SIGINT, sig_fun, NULL);
event_add(sig_ev, NULL);

struct timeval tv = {5, 0};
//定时器不需要fd描述符、也不需要信号代号。
//所以,相应地: evtimer_new对应的timeout_fun回调函数中的fd参数默认赋-1
// evtimer_new对应的event_new函数的信号代号参数默认赋-1
struct event* timeout_ev = evtimer_new(base, timeout_fun, NULL);
event_add(timeout_ev, &tv);

event_base_dispatch(base); //开启事件循环

event_free(sig_ev);
event_free(timeout_ev);
event_base_free(base);
}

ctrl+c终止进程的信号代号是2,但是信号事件并没有fd描述符,而是巧妙地复用了fd,写入信号代号。

编译测试

gcc编译链接命令后需要加后缀-levent

MainServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TcpServer;
class ThreadPool;
class Reactor;

class MainServer
{
private:
TcpServer * m_server;
ThreadPool * m_pool;
Reactor * m_reactor;
public:
MainServer();
~MainServer();
static void ListenEventCallBack(int fd, short events, void * arg);

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include"mainServer.h"
#include"tcpServer.h"
MainServer::Mainserver()
{
m_server = new TcpServer;
}
MainServer::~MainServer()
{
m_server = new TcpServer(IpAddressPort{"127.0.0.1", 8000});
m_pool = new ThreadPool(3);
m_reactor = new Reactor();
m_reactor->AddEventAndHandler(m_server->GetLfd, EV_READ | EV_PERSIST, MainServer::ListenEventCallBack)
}
MainServer::ListenEventCallBack(int fd, short events, void * arg)
{

}

Linux_IO复用_select_poll

内容

  1. 基本概念
  2. 了解接口socket
  3. 了解协议tcp/udp
  4. io复用
  5. select
  6. poll/epoll

可以参考学习的文章:

  1. 深入浅出理解select、poll、epoll的实现
  2. linux在系统调用进入内核时,为什么要将参数从用户空间拷贝到内核空间?不能直接访问,或是使用memcpy吗?非要使用copy_from_user才行吗? - 针对为什么select/poll调用一次拷贝一次数组到内核态空间。

I/O复用

  1. select
  2. poll
  3. epoll

select

先观察其API

1
2
#include<sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
  1. 参数
    1. nfds参数通常被设置为select监听的所有文件描述符中的最大值加1,表示在fd_set集合中,我们关心的描述符的总数。为什么加1呢?因为文件描述符是从0开始计数的。nfdsfd_set容量大小不一样,容量大小指的是FD_SETSIZE,即fd_set容量大小fd_set可容纳描述符的最大大小。
    2. readfds参数是select关心的读事件的集合;
    3. writefds参数是select关心的写事件的集合;
    4. exceptfds参数select关心的异常事件的集合;
    5. timeout参数设置select的超时时间。
  2. 返回值
    1. 集合中有事件就绪的描述符的个数
    2. 但是并没有告诉你具体是哪一个描述符就绪

fd_set结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<typesizes.h>
#define __FD_SETSIZE 1024

#include<sys/select.h>
#define FD_SETSIZE __FD_SETSIZE
typedef long int __fd_mask;
#undef __NFDBITS
#define __NFDBITS (8*(int)sizeof(__fd_mask))
typedef struct
{
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
#define __FDS_BITS(set) ((set)->__fds_bits)
#endif
}fd_set;
-------------------
#define FD_SETSIZE 1024
#define NFDBITS (8*(int)sizeof(long int))
typedef struct
{
long int fds_bits[FD_SETSIZE / NFDBITS];
}fd_set;

其中,__FD_SETSIZE指出select可以关注的最大文件描述符个数,默认为1024。

__fd_mask被定义为long int的类型别称,long int在32位机占8个字节。

__NFDBITS计算的是1个__fd_mask元素所占用的位数,一个字节占8位,sizeof算出__fd_mask的字节数,相乘得其占用的bit大小。

接着,定义fds_bits,其是long int型的数组,数组大小为__FD_SETSIZE除以__NFDBITS。比如SETSIZE为1024位,NFDBITS是64位,则数组大小位1024/64=16。这里的计算主要是为了计算出数组的大小,以确定多大的数组可以正好容纳1024个位数,来记录文件描述符信息。

用到的宏函数

fd_set集合对于文件描述符的管理是按位进行的,而位只有0和1两种状态。

假如SETSIZE=1024,则可管理1024个文件描述符,如果文件描述符7有效,我们需要对位操作,使其位变为1

由于位操作过于繁琐,select API中提供了一系列宏函数来方便我们访问、操作fd_set集合状态。

1
2
3
4
5
#include<sys/select.h>
FD_ZERO(fd_set *fdset); /*清除fdset的所有位*/
FD_SET(int fd, fd_set *fdset); /*设置fdset的位fd*/
FD_CLR(int fd, fd_set *fdset); /*清除fdset的位fd*/
int FD_ISSET(int fd, fd_set *fdset);/*测试fdset的位fd是否被设置*/

select编程思路

最好另外定义一个整型数组,其大小为我们预测将要出现的描述符的最多数目。用作我们存放描述符的容器。初始化时将数组值一律设为-1,表示容器中该位置还没有存放描述符。如果在某一时刻有一个描述符有了消息,我们就将该描述符数值覆盖到这个容器中第一个为-1的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#define MAX 10
void fds_init(int fds[])
{
for(int i = 0;i<MAX;++i)
{
fds[i] = -1;
}
}
void fds_add(int fd, int fds[])//向fds容器中添加描述符fd
{
if(fd<0)
{
printf("无效的描述符\n");
return;
}
for(int i = 0;i<MAX;++i)
{
if(fds[i]==-1)
{
fds[i] = fd;
return;
}
}
printf("容器已满,无法添加该描述符\n");
}
void fds_del(int fd, int fds[])
{
for(int i = 0;i<MAX;++i)
{
if(fds[i]==fd)
{
fds[i] = -1;
return;
}
}
printf("没有找到该描述符\n");
}
int main()
{
int fds[MAX];
fds_init(fds);
}

示例-TCP服务使用select处理多个套接字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
int main()
{
int sockfd = socket_init();//socket_init封装了bind(ip:port)的操作,还封装了对sockfd进行listen的操作,并设置了监听队列大小。
assert(sockfd!=-1);

int fds[MAX];
fds_init(fds);

fds_add(sockfd, fds);

fd_set fdset;//此处的fd_set即<sys/select.h>库中API提供的fd_set结构体,实际上是一个有1024个“二进制位”的数组。
while(1)//将fds[MAX]中所有有效的(即>=0)描述符全部“加入”fdset中,
//即把fdset中与某有效描述符对应的[位]的状态设为1。
{
FD_ZERO(&fdset); //把1024位全清零。
int maxfd = -1;//记录当前最大的描述符数值是多少,方便过后调用select传入参数nfds。
for(int i = 0; i<MAX; ++i)
{
if(fds[i] == -1)
{
continue;
}
FD_SET(fds[i],fdset); //fds[i] != -1, 说明有效
if(maxfd<fds[i])//寻找最大描述符数值
{
maxfd = fds[i];
}
}//end for

//此时,我们已经把开始创建的sockfd添加到了fdset中。
//下面就可以用select来监测该套接字是否有消息了。
//比如,sockfd监听到了客户端的connect信息,
//则select就可以探测到fdset中对应的sockfd位处于消息就绪态,
//则select就可以不再阻塞,立马返回。
struct timeval tv = {5,0};
//返回在fdset集合中有信息的描述符的个数。
int n = select(maxfd+1, &fdset, NULL, NULL, &tv);
if(n < 0)
{
printf("select err\n");
}
else if(n == 0)
{
printf("time out\n");
}
else
{
for(int i = 0; i<MAX; ++i)//依然需要根据fdset查询:
//目前是哪个描述符有事件产生,
//fdset的过滤又需要根据fds的记录进行遍历。
{
if(fds[i] == -1)continue;
if(FD_ISSET(fds[i], &fdset))//此处判断ISSET
//即是判断我们关注的描述符是否有事件产生。
//为什么此时标志位为1一定有事件产生?
//因为在这之前我们进行了select,
//select不仅说明有事件产生,它还做了更多的工作:
//将我们关心的描述符却在其上没有事件产生的标志位置0。
//因此目前所有标志位为1的描述符均有事件。
{
//以下才是核心业务代码,抓住了有事件产生的描述符,
//对这些描述符我们的处理流程,对于不同类型的描述符
//需要不同的处理流程。比如sockfd用accept处理,
//accept返回1个新描述符c,则先将其加入fds容器,
//下一轮再用recv处理描述符c的消息。
if(fds[i] == sockfd)//处理监听套接字sockfd
{
struct sockaddr_in caddr;
int len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if(c < 0)
{
continue;
}
printf("accept c = %d\n", c);
//此处的fds_add是用户自定义的函数,添加的是fds自定义数组
fds_add(c, fds);//只是收到fds容器,下次while扫描才将c加到fdset
}
else//处理收发套接字,在此程序,除了sockfd皆为收发套接字c
{
char buff[128] = {0};
int num = recv(fds[i], buff, 127, 0);
if(num <= 0)
{
printf("client close\n");
close(fds[i]);
fds_del(fds[i], fds);
}
else//num > 0,读到了数据
{
printf("recv(c = %d) = %s\n",fds[i],buff);
send(fds[i], "ok", 2, 0);
}
}
}//end if(ISSET(fds[i], &fdset))
}//end for(int i = 0; i<MAX; ++i)
}//end if(n > 0)
}//while end
}

场景情况:如果客户端与select服务端已建立连接,而客户端进程结束,select会一直阻塞、未感知吗?–不会。

因为客户端的进程结束,也算是一种读事件,相当于通知服务端该套接字连接结束了。那么服务端recv会返回0,达到关闭该套接字的条件,关闭后,别忘了在fds容器中删除掉该描述符。

如果忘记了close该套接字,且忘了fds_del该描述符,那么如果客户端结束进程,服务端就会一直打印"client close",因为select一直在探测此描述符有无读事件,若该套接字连接关闭,那么此描述符一直有读事件,recv返回0,由于没有fds_del,每次都会关注,所以每次都会打印"client close"。

poll

可以理解为加强版的select。

先观察其API

1
2
#include<poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
  1. fds参数是一个pollfd结构类型的指针,可指向一段连续空间(数组),因此很灵活,大小可按需声明。它可以指定我们感兴趣的文件描述符上发生的可读、可写和异常等事件。定义如下

    1
    2
    3
    4
    5
    6
    struct pollfd
    {
    int fd; //文件描述符
    short events; //注册的事件类型,按位标志
    short revents; //实际发生的事件,按位标志,由内核填充
    }
    1. 其中,fd成员指定文件描述符。
    2. events成员告诉poll监听fd上的哪些事件类型,他可以是一系列事件类型的按位或。常见的事件类型有:POLLIN(数据可读)、POLLOUT(数据可写)。
    3. revents成员由内核修改,以通知应用程序fd上实际发生了哪些事件。

poll编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
void poll_fds_init(struct pollfd* fds)
{
for(int i = 0;i<MAX;++i)
{
fds[i].fd = -1;
fds[i].events = 0;
fds[i].revents = 0;
}
}
void poll_fds_add(int fd, struct pollfd* fds)
{
for(int i = 0;i<MAX;++i)
{
if(fds[i].fd == -1)
{
fds[i].fd = fd;
fds[i].events = POLLIN;//只关注读事件
fds[i].revents = 0;
break;
}
}
}
void poll_fds_del(int fd, struct pollfd* fds)
{
for(int i = 0;i<MAX;++i)
{
if(fds[i].fd == fd)
{
fds[i].fd = -1;
fds[i].events = 0;
fds[i].revents = 0;
break;
}
}
}
#define MAX 10
int main()
{
int sockfd = socket_init();
assert(sockfd != -1);
struct pollfd poll_fds[MAX];
poll_fds_init(poll_fds);
poll_fds_add(sockfd,poll_fds);
while(1)
{
int n = poll(poll_fds,MAX,5000);//5000ms timeout
if(n < 0)printf("poll error\n");
else if(n == 0)printf("time out \n");
else
{
for(int i = 0;i<MAX;++i)
{
if(poll_fds[i].fd == -1)continue;
//short has 16bits,POLLIN is 10000000 ...,
//when revents is 10000000 ...,then the read event is going
if(poll_fds[i].revents & POLLIN)//revents & POLLIN 不为0 则代表有读事件产生
{
if(poll_fds[i].fd == sockfd)
{
struct sockaddr_in caddr;
int len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if(c < 0)
{
continue;
}
printf("accept:%d\n", c);
poll_fds_add(c, poll_fds);
}
else
{
char buff[128] = {0};
int num = recv(poll_fds[i].fd, buff, 127, 0);
if(num <= 0)
{
close(poll_fds[i].fd);
poll_fds_del(poll_fds[i].fd, poll_fds);
printf("client close\n");
}
else
{
printf("recv(%d):%s\n", poll_fds[i].fd, buff);
send(poll_fds[i].fd, "ok", 2, 0);
}
}

}
if(poll_fds[i].revents & POLLOUT)
{
//...
}
}
}
}
}

与select的一处细节区别:

每次select除了监测fd_set有效描述符上有无事件,其次还将没有事件的描述符从fd_set移除(将该描述符对应在fd_set上的位进行置0操作)(这样就得每次select之前都要重新注册一遍我们关注的描述符(即用户和内核共同操作FD_SET、FD_CLR等)),然后下面过滤有事件的描述符时,只要找到fd_set集合哪个位是1状态即就找到了有事件产生的描述符。

而poll的用法是:用户只管注册events,实际上的有无事件由内核来进行对revents的填充,以此来更好地区别该描述符是否有事件产生。这样,就不用在每次poll之前重新注册一遍我们关注的描述符的结构体里的events。我们只要把要关心的描述符的fd置成非-1,以及管理好要关心的哪些事件类型events即可。

与select相比的优点

  1. 可以监听的描述符的最大数目可以超过1024个,大小按需自拟。
  2. 可以监听的事件类型数目变多、变细了,更强大了。
  3. 不用在每次poll之前重新注册一遍我们关注的描述符的结构体里的events

总结

select和poll的总体实现流程:

  • 用户程序

select用fd_set结构体(默认是1024个bit位的数组);poll用struct pollfd fds[MAX]结构体。

select和poll通常被循环调用。每调用一次,就拷贝一次结构体数组给内核。

linux在系统调用进入内核时,为什么要将参数从用户空间拷贝到内核空间?不能直接访问,或是使用memcpy吗?非要使用copy_from_user才行吗? - 针对为什么select/poll调用一次拷贝一次数组到内核态空间。

  • 内核轮询
    内核对若干个文件描述符进行轮询扫描。查看之上是否有事件发生。
    时间复杂度,为O(n)
  • select/poll返回后
    返回值仅仅告诉用户程序发生了事件的描述符个数,并未告诉具体哪个描述符。
    用户程序还需要再次轮询一遍。O(n)
    select