重写muduo库

内容

  1. muduo库的主要板块
    1. base:公共的代码文件
    2. net:网络相关的,如TcpServer、EventLoop、poller、protobuf、protorpc等等
      1. 我们主要写网络模块

目标

主要编写muduo库的网络模块代码,以及改进muduo库在使用上的不便。
muduo库原本属于静态库,且需要依赖boost库。我们改进它,使他与原生C++标准库结合,并把它生成为.so动态库。

muduo库核心组件职责与关系

Reactor模式的核心

  1. EventLoop事件循环
    1. 职责:每个线程一个EventLoop。不断地”询问 - 处理“事件
    2. 关系:
      1. 拥有一个Poller:EventLoop通过Poller来获取当前活跃的事件
      2. 拥有一个Channel:EventLoop管理所有在其上注册的Channel
  2. Poller:I/O多路复用接口
    1. 职责:阻塞地等待文件描述符上的事件,并将活跃的事件返回给EventLoop
    2. 具体实现是EPollPoller。是Linux下基于epoll的具体实现。通过epoll_wait返回活跃的事件。
  3. Channel:通道
    1. 职责:是事件分发器。每个Channel负责一个文件描述符。
    2. 内部保存了该fd关注的事件,以及对应的回调函数
    3. 关系:
      1. 是Poller和回调之间的桥梁。Poller返回一个事件,EventLoop找到对应的Channel,调用Channel的handleEvent()方法

接受新连接:Acceptor

  1. 职责:Acceptor是一个特殊的Channel。专门负责处理监听套接字(listening socket)上的可读事件,即新连接。
  2. 关系:
    1. 继承自Channel。同样需要向EventLoop注册。
    2. 被TcpServer拥有:TcpServer在初始化时会创建Acceptor
    3. 持有newConnectionCallback:当有新连接到来时,最终会调用TcpServer预先设置好的回调函数

表示连接:TcpConnection

  1. 职责:代表了一个已建立的TCP连接。整个连接的生命周期(建立、断开、收发数据)都由该对象管理。
  2. 关系:
    1. 每个TcpConnection对象都有一个自己的Channel。用于监控其描述符上的事件。
    2. 被TcpServer管理:记录在TcpServer的map表中。
    3. 持有各种用户回调:如连接建立回调ConnectionCallback,消息到达回调MessageCallback。这些是由用户通过TcpServer设置的。
    4. 隶属于某个EventLoop。每个TcpConnection对象都只属于一个特定的EventLoop线程。其所有IO操作都在这个线程中进行。保证线程安全。

服务器门面:TcpServer

  1. 职责:提供给用户使用的、易于理解的服务器类。用户只需关注其提供的几个回调函数(如连接回调、消息回调)即可编写网络程序。
  2. 关系:
    • 拥有一个 Acceptor:用于接受新连接。
    • 拥有一个 TcpConnection 的映射表:管理所有活跃的连接。
    • 拥有一个 EventLoopThreadPool:管理线程池。
    • 设置回调:用户通过 TcpServer 设置的各种回调(onConnectiononMessage),最终会“传递”给每一个新创建的 TcpConnection 对象。

线程模型:EventLoopThreadEventLoopThreadPool

  • **EventLoopThread (IO 线程):
    • 职责:封装了一个线程(std::thread),该线程的唯一工作就是运行一个 EventLoop::loop()“one loop per thread” 的理念在此体现。
    • 关系:它创建并拥有一个 EventLoop 对象(在其内部线程中)。
  • **EventLoopThreadPool (线程池):
    • 职责:管理多个 EventLoopThread,即管理一个 EventLoop 池子。它提供了一种轮询(round-robin)算法来为新的 TcpConnection 分配一个 EventLoop
    • 关系
      • 被 TcpServer 所拥有TcpServer 通过线程池来实现多线程 Reactor。
      • 拥有多个 EventLoopThread:管理着多个 IO 线程。
      • 为 Acceptor 提供 getNextLoop():当 Acceptor 接受到一个新连接时,它会从线程池中取出下一个 EventLoop,将这个新连接分配给这个 EventLoop 来监控和处理。

cmake

1
2
3
4
5
6
7
8
9
10
11
cmake_minimum_required(VERSION 2.5)
project(mymuduo)

#mymuduo 最终编译成so动态库,设置动态库的路径
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)#注意不是OUTPUT_DIRECTORY.这两者有区别
#设置为调试模式 以及 声明C++11语言标准
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11 -fPIC")#在较新的编译器后需要加-fPIC,以示生成的是动态库
#定义参与编译的源文件 起一个别名
aux_source_directory(. SRC_LIST)
#编译生成动态库mymuduo
add_library(mymuduo SHARED ${SRC_LIST})

辅助类

noncopyable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// noncopyable.h
#pragma once
/**
* noncopyable 被继承以后,
* 派生类对象无法拷贝构造、相互赋值。
* 无参构造、析构是默认处理。
*/
class noncopyable
{
public:
noncopyable(const noncopyable&) = delete;
noncopyable & operator=(const noncopyable &) = delete;
protected:
noncopyable() = default;
~noncopyable() = default;
};

copyable

1
2
3
4
5
6
class copyable
{
protected:
copyable() = default;
~copyable() = default;
};

TcpServer概览

需要封装以下属性:

  1. EventLoop对象指针:多路分发器,相当于epoll
  2. InetAddress:打包IP地址和端口号

InetAddress

  1. 允许拷贝
  2. 成员变量是sockaddr_in m_addr,也可选择支持IPv6的sockaddr_in6 m_addr6。可用联合体表示。在本项目中,只使用IPv4的m_addr
1
2
3
4
5
6
#include<netinet/in.h>		//sockaddr_in / sockaddr_in6都在此文件下定义
union
{
struct sockaddr_in m_addr;
struct sockaddr_in6 m_addr6;
};

EventLoop概览

  1. 不允许拷贝
  2. 主要包含的成员
    1. poller(相当于epoll),存储了一个unorderedMap,有sockfd及其上面绑定的事件
    2. channel,属性有fd、events、revents等等

EventLoop就是要完成事件循环,事件循环最重要的几个动作:epoll(由poller负责)、epoll监听的fd及感兴趣的事件、实际epoll_wait后发生的事件。
这些sockfd、感兴趣的事件、发生的事件都记录在channel中

要写EventLoop就要理清楚EventLoop、Channel、Poller之间的关系。Reactor模型中,这三个组件整体对应着Demultiplex。

Channel

通道,封装了sockfd和其感兴趣的event,如EPOLLIN、EPOLLOUT事件。还绑定了poller返回的具体事件。

公有别名

定义通用事件回调函数、只读事件回调函数的函数对象类型别名。

1
2
3
4
5
6
class Channel : noncopyable
{
public:
using EventCallback = std::function<void()>;
using ReadEventCallback = std::function<void(Timestamp)>;
}

成员函数

构造 / 析构函数

1
2
3
public:
Channel(EventLoop * loop, int fd);
~Channel();

handleEvent

fd得到poller的通知后,处理事件

1
2
public:
void handleEvent(Timestamp receiveTime);

setXxxCallback(EventCallback cb)

对外提供的设置回调函数对象的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public:
void setReadCallback(ReadEventCallback cb)
{
m_readCallback = std::move(cb);
}
void setWriteCallback(EventCallback cb)
{
m_writeCallback = std::move(cb);
}
void setCloseCallback(EventCallback cb)
{
m_closeCallback = std::move(cb);
}
void setErrorCallback(EventCallback cb)
{
m_errorCallback = std::move(cb);
}

void tie(const std::shared_ptr<void>&)

防止channel被手动remove后,还在执行回调操作

1
2
public:
void tie(const std::shared_ptr<void>&);

fdeventsrevents相关

  1. int fd()
  2. int events()
  3. void set_revents(int revt):向poller提供的设置revents的接口
1
2
3
4
5
6
7
public:
int fd() const { return m_fd; }
int events() const { return m_events; }
int set_revents(int revt)
{
m_revents = revt;
}

判断函数:判断有没有注册事件等等

1
2
3
4
5
6
7
8
9
10
11
12
13
public:
bool isNoneEvent() const
{
return m_events == kNoneEvent;
}
bool isWriting() const
{
return m_events & kWriteEvent;
}
bool isReading() const
{
return m_events & kReadEvent;
}

使能、使不能函数

设置fd相应的事件状态
m_events进行位操作之后调用update(),即epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public:
void enableReading()
{
m_events |= kReadEvent;
update();
}
void disableReading()
{
m_events &= ~kReadEvent;
update();
}
void enableWriting()
{
m_events |= kWriteEvent;
update();
}
void disableWriting()
{
m_events &= ~kWriteEvent;
}
void disableAll()
{
m_events = kNoneEvent;
update();
}

与EventLoop相关

获取所属的Loop

1
2
public:
EventLoop * ownerLoop() {return m_loop;}

删除:remove()

1
2
public:
void remove();

update():相当于调用epoll_ctl

1
2
private:
void update();

handleEventWithGuard

受保护地处理事件

1
2
private:
void HandleEventWithGuard(Timestamp receiveTime);

for poller的index

1
2
3
public:
int index() {return m_index;}
void set_index(int idx) {m_index = idx;}

成员变量

  1. kXxxEvent:以下三个变量描述当前fd的状态,没有感兴趣的事件or对读事件感兴趣or对写事件感兴趣?
1
2
3
4
private:
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
  1. m_xxxCallback:四个函数对象,可以绑定外部传入的相关操作。因为channel知道发生了哪些事情(revents记录),所以channel负责调用具体事件的回调函数。
1
2
3
4
5
private:
ReadEventCallback m_readCallback;
EventCallback m_writeCallback;
EventCallback m_closeCallback;
EventCallback m_errorCallback;
  1. EventLoop *m_loop:事件循环
  2. m_fd:fd,即Poller监听的对象
  3. m_events:fd感兴趣的事件注册信息
  4. m_revents:Poller操作的fd上具体发生的事件
  5. m_index:?
  6. std::weak_ptr<void> m_tie:防止手动调用remove channel后仍使用此channel,用于监听跨线程的对象生存状态。
    1. shared_ptrweak_ptr配合使用可以发挥两个能力:
      1. 解决shread_ptr循环引用问题
      2. weak_ptr在多线程程序中可监听资源的生存状态,方法是尝试提升为强指针,若提升成功,则可以访问;若提升失败说明则资源被释放掉了。
    2. tie的意思是绑定,那么m_tie要和谁绑定呢?——自己。
    3. 绑定自己的工具还可以用另一个工具,shared_from_this,可以尝试得到当前对象的强智能指针。
  7. bool m_tied:配合m_tie使用

Poller

成员变量

成员变量中包含一个存储<int, Channel*>的map。

poller监听的channel从何而来?EventLoop中有ChannelList以及Poller,则poller监听的肯定是EventLoop中所保存的channel。即这些channel在poller中也被保存了。

1
2
3
protected:
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap m_channels;

还有一个成员变量,m_ownerLoop,指明了从属于哪个loop。

1
2
private:
EventLoop * m_ownerLoop;

成员函数

构造 / 析构函数

1
2
3
public:
Poller(EventLoop *loop);
virtual ~Poller() = default;

poll:提供给系统的统一的一个IO复用接口

1
2
3
public:
using ChannelList = std::vector<Channel*>;
virtual Timestamp poll(int timeoutMs, ChannelList * activeChannels) = 0;

参数:

  1. timeoutMs:超时时间,毫秒为单位
  2. activeChannels:当前激活的、对事件注册好的channel列表

与事件的注册、注销有关的

1
2
3
4
5
public:
/* 当fd注册的事件有变更时, channel调用update, 函数内包含updateChannel(this) */
virtual void updateChannel(Channel * channel) = 0;
/* 当fd注册的事件要注销时,channel调用remove,函数内包含removeChannel(this) */
virtual void removeChannel(Channel * channel) = 0;

参数:channel 均为外部channel传入的this指针

newDefaultPoller(EventLoop * loop)

提供给EventLoop的接口,以获取默认的IO复用具体实现。

注意:我们最好不要实现到poller.cc文件中,不大妥当。因为Poller类是基类,而把获取具体实现写到抽象类文件实现中是不好的。可以单独把实现代码写到defaultpoller.cc中。

1
2
public:
static Poller* newDefaultPoller(EventLoop *loop);

hasChannel

判断poller是否拥有某一channel

1
2
public:
virtual bool hasChannel(Channel * channel) const;

EpollPoller

是Poller抽象基类的一个具体实现类。

成员函数

构造/析构

构造相当于epoll_create,记录在m_epollfd成员变量中。析构时close该fd。

1
2
3
public:
EpollPoller(EventLoop *loop);
~EpollPoller() override;

poll:重写Poller基类方法,相当于epoll_wait

1
2
public:
Timestamp poll(int timeoutMs, ChannelList *activeChannels) override;

update/removeChannel:重写Poller基类方法,相当于epoll_ctl add/mod/del

1
2
3
public:
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;

fillActiveChannels:填写活跃的channels连接

1
2
private:
void fillActiveChannels(int numEvents, ChannelList *activeChannels) const;

update:更新channel通道

1
2
private:
void update(int operation, Channel * channel);

成员属性

  1. m_epollfd - epoll相关的方法都需要用到fd,通过epoll_create来创建。映射的是epoll底层的文件系统红黑树。
  2. m_events - 是一个vector<struct epoll_event容器。
1
2
3
4
private:
int m_epollfd;
using std::vector<struct epoll_event> EventList;
EventList m_events;
  1. kInitEventListSize - EventList初始的长度。
1
2
private:
static const int kInitEventListSize = 16;
  1. 从Poller继承而来,拥有poller包含的ChannelMap m_channels
1
2
3
4
5
6
7
8
class Poller
{
// ...
protected:
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap m_channels;
// ...
}

实现代码

首先声明了三个全局常量,表示channel的状态

1
2
3
const int kNew      = -1;    //从未添加到epoll的channel
const int kAdded = 1; //已经添加到epoll的channel
const int kDeleted = 2; //已把该channel从epoll中删除

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include"logger.h"      //LOG_FATAL
#include<errno.h> //errno
#include<sys/epoll.h>

EpollPoller::EpollPoller(EventLoop * loop)
: Poller(loop),
m_epollfd(epoll_create1(EPOLL_CLOEXEC)), // epoll_create
m_events(kInitEventListSize) // vector<epoll_event>
{
if(m_epollfd < 0)
{
LOG_FATAL("epoll_create error: %d\n", errno);
}
}

析构

1
2
3
4
5
#include<unistd.h>      //close
EpollPoller::~EpollPoller()
{
close(m_epollfd);
}

CurrentThread:主要用于获取tid

__thread相当于C++11标准中的thread_local修饰符。用于修饰全局变量。
修饰之前,全局变量只能被若干线程共享。修饰之后,此全局变量变成每个线程专有的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#pragma once
#include<unistd.h> //pid_t syscall
#include<sys/syscall.h> //SYS_gettid
namespace CurrentThread
{
/**
* @brief 此变量被__thread修饰, 相当于C++11标准中的thread_local修饰符,
* 用于修饰全局变量。修饰之前, 全局变量只能被若干线程共享;
* 修饰之后, 此全局变量变成每个线程专有的属性。
*/
extern __thread int t_cachedTid;
/* 通过Linux系统调用SYS_gettid, 加载当前线程的tid值到t_cachedTid */
void cacheTid()
{
if(t_cachedTid == 0)
{
t_cachedTid = static_cast<pid_t>(syscall(SYS_gettid));
}
}
/* 返回当前线程的tid, 若加载过则直接返回存储过的值 */
inline int tid()
{
/* 若t_cachedTid为0说明是第一次加载, 需要调用cacheTid */
if(__builtin_expect(t_cachedTid == 0, 0))
{
cacheTid();
}
return t_cachedTid;
}
}

EventLoop

前面的EventLoop概览中提到:

  1. 不允许拷贝
  2. 主要包含的成员之一:poller(相当于epoll),属性有sockfd及其上面绑定的事件
  3. 另一个主要的成员是channel,属性有fd、events、revents等等

EventLoop就是要完成事件循环,事件循环最重要的几个动作:epoll(由poller负责)、epoll监听的fd及感兴趣的事件、实际epoll_wait后发生的事件。
这些sockfd、感兴趣的事件、发生的事件都记录在channel中
要写EventLoop就要理清楚EventLoop、Channel、Poller之间的关系。Reactor模型中,这三个组件整体对应着Demultiplex。

由上述约束,在.h文件中,我们可以首先写出:

1
2
3
4
5
6
7
8
9
#pragma once
#include"noncopyable.h"
class Channel;
class Poller;
class EventLoop : noncopyable
{
public:
private:
}

要用到函数对象。

1
2
3
#include<functional>
public:
using Functor = std::function<void()>;

成员变量

  1. ChannelList m_activeChannels - EventLoop管理的所有的Channel的List;
    Channel * m_currentActiveChannel - 主要用于断言

    1
    2
    3
    4
    5
    private:
    using ChannelList = std::vector<Channel*>;
    ChannelList m_activeChannels;

    Channel * m_currentActiveChannel;
  2. 标志(最好为atomic)

    1. m_looping - 事件循环状态标志 - 真则正在循环,假则将要退出循环

    2. m_quit - 标识退出loop循环

    3. m_eventHandling -

    4. m_callingPendingFunctors - 标识当前loop当前是否有需要执行的回调操作

      1
      2
      3
      4
      private:
      std::atomic_bool m_looping;
      std::atomic_bool m_quit;
      std::atomic_bool m_callingPendingFunctors;
  3. m_threadId - 记录当前Loop所在线程的ID

    1
    2
    private:
    const pid_t m_threadId;
  4. std::unique<Poller> m_poller - EventLoop所管理的poller,去轮询监听channels上发生的事件。用std::unique_ptr管理

    1
    2
    private:
    std::unique_ptr<Poller> m_poller;
  5. Timestamp m_pollReturnTime - poller返回发生事件的channels的时间点

    1
    2
    private:
    Timestamp m_pollReturnTime;
  6. int m_wakeupFd - mainLoop获取到一个新用户的channel后,搭配轮询算法选择一个等待任务的subLoop,通过wakeupFd对其进行唤醒来处理channel。用eventfd创建。eventfd使用线程间的wait/notify事件通知机制,直接在内核唤醒,效率较高。与此处理相似的是,libevent使用的是socketpair的双向通信机制,相当于网络通信层面的机制,效率较低。

    1
    2
    private:
    int m_wakeupFd;
  7. std::unique_ptr<Channel> m_wakeupChannel - 把wakeupFd封装起来和其Channel关联,因为操作的往往不是fd而是其channel

    1
    2
    private:
    std::unique_ptr<Channel> m_wakeupChannel;
  8. std::vector<Functor> m_pendingFunctors - 存储loop需要执行的所有的回调操作。与callingPendingFunctors标识结合使用,如果此标识显示当前loop有需要执行的回调操作,则这些回调操作将在此vector容器中存放。需要用mutex保护其线程安全

    1
    2
    3
    private:
    std::vector<Functor> m_pendingFunctors;
    mutable std::mutex m_mutex;

成员函数

  1. 构造/析构

    1
    2
    3
    public:
    EventLoop();
    ~EventLoop();
  2. loop/quit - 开始/结束事件循环

    1
    2
    3
    public:
    void loop();
    void quit();
  3. Timestamp pollReturnTime() const

    1
    2
    3
    4
    5
    public:
    Timestamp pollReturnTime() const
    {
    return m_pollReturnTime;
    }
  4. runInLoop - 在当前loop中执行cb

    1
    2
    public:
    void runInLoop(Functor cb);
  5. queueInLoop - 把cb放入队列中,唤醒loop所在的线程,执行cb

    1
    2
    public:
    void queueInLoop(Functor cb);
  6. wakeup - mainLoop唤醒subLoop所在的线程

    1
    2
    public:
    void wakeup();
  7. 更新Channel相关 - EventLoop的方法调用Poller的方法

    1
    2
    3
    4
    public:
    void updateChannel(Channel *channel);
    void removeChannel(Channel *channel);
    bool hasChannel(Channel *channel);
  8. isInLoopThread - 判断EventLoop对象是否在自己的线程里面

    1
    2
    3
    4
    5
    6
    #include"currentthread.h"
    public:
    bool isInLoopThread() const
    {
    return m_threadId == CurrentThread::tid();
    }
  9. handleRead - 处理wakeup唤醒相关的逻辑

    1
    2
    private:
    void handleRead();
  10. doPendingFunctors - 执行回调

    1
    2
    private:
    void doPendingFunctors();

实现代码

全局变量

  1. 防止一个线程创建多个loop

    1
    2
    //__thread修饰表示这个全局变量转为了每个线程私有的属性
    __thread EventLoop *t_loopInThisThread = nullptr;
  2. 默认的超时时间

    1
    const int kPollTimeMs = 10000;

全局函数

  1. createEventfd() - 创建wakeupfd,用来通知等待任务的subLoop,处理新的Channel事件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #include<sys/eventfd.h>
    int createEventfd()
    {
    int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if(evtfd < 0)
    {
    LOG_FATAL("Failed in eventfd: %d\n", errno);
    }
    return evtfd;
    }

EventLoopThread

EventLoop组件及其内部的Chennel、Poller已经在上文讨论。要和thread结合达成最终的"one loop per thread"模型,较好的办法就是将EventLoop与thread组合封装。

Thread

1
class Thread : noncopyable

线程函数

线程最主要的组成部分就是线程函数。

1
2
public:
using ThreadFunc = std::function<void()>;

使用无返回值+无参数是为了便于统一线程函数的形式,具体绑定回调则使用函数对象绑定器。

成员变量

用C线程库、智能指针。
Thread对象刚创建不会执行线程函数,而是在成员函数start()被调用时,用智能指针创建C
11的thread线程才开始真正执行。

  1. m_started - 表示
  2. m_joined
  3. std::shared_ptr<std::thread> m_thread
  4. pid_t m_tid
  5. ThreadFunc m_func - 存储线程函数
  6. std::string m_name
  7. static std::atomic_int m_numCreated - 目前产生了线程对象的计数值

成员方法

setDefaultName

构造函数中如果没有传入name则赋"Thread %d ",%d为已创建的线程对象数目。

1
2
private:
void setDefaultName();

构造 / 析构函数

1
2
3
public:
explicit Thread(ThreadFunc, const std::string & name = std::string());
~Thread();

start

1
2
public:
void start();

join

1
2
public:
void join();

获取线程状态相关的标志、信息

  1. started
  2. tid
  3. name
  4. numCreated
1
2
3
4
5
public:
bool started() const {return m_started;}
pid_t tid() const {return m_tid;}
const std::string & name() const {return m_name;}
static int numCreated() {return m_numCreated;}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include"thread.h"
#include"currentthread.h"
#include<semaphore.h>
std::atomic_int Thread::m_numCreated(0);
void Thread::setDefaultName(int numCreated)
{
char buf[32] = {0};
snprintf(buf, sizeof buf, "Thread%d", numCreated);
m_name = buf;
}
Thread::Thread(ThreadFunc func, const std::string & name)
: m_started(false), m_joined(false), m_tid(0),
m_func(std::move(func)), m_name(name)
{
int num = ++m_numCreated;
if(m_name.empty())
{
setDefaultName(num);
}
}
Thread::~Thread()
{
/* 线程要么join, 要么detach */
if(m_started && !m_joined)
{
m_thread->detach();
}
}
void Thread::start()
{
m_started = true;
/* 为了tid初始化时期的线程安全, 保证tid有效 */
sem_t sem;
sem_init(&sem, false, 0); //地址, 是否进程间共享, 初始值0
//下面这句才是真正去创建一个立即执行的子线程。而且下面这个语句是子线程的生命周期。
//从创建完m_thread后主线程、子线程分离, 主线程需要等待子线程执行完毕, 可以用信号量来控制。
m_thread = std::make_shared<std::thread>([&](){
m_tid = CurrentThread::tid();

sem_post(&sem);

m_func();
});
//这里是主线程的代码,只有sem的值变为>0时才能往下走。
sem_wait(&sem);
}
void Thread::join()
{
m_joined = true;
m_thread->join();
}

EventLoopThread

1
class EventLoopThread : noncopyable

封装的目标:在thread线程对象上运行一个loop。

线程初始化时回调函数

1
2
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

成员变量

  1. m_loop - 存储Eventloop对象指针
1
2
private:
EventLoop * m_loop;
  1. m_thread - 存储线程对象
1
2
private:
Thread m_thread;
  1. bool m_exiting - 线程正在退出的标志
1
2
private:
bool m_exiting;
  1. ThreadInitCallback m_callback - 线程初始化调用的回调操作,在EventLoopThread构造时在第1个参数传入,默认是一个空操作。
1
2
private:
ThreadInitCallback m_callback;

成员函数

构造 / 析构函数

构造函数参数:可以传入一个线程初始化回调函数对象;还有name。其中,回调函数对象默认构造为空操作。

1
2
3
4
public:
EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(),
const std::string & name = std::string());
~EventLoopThread();

startLoop - 开启循环

1
2
public:
EventLoop * startLoop();

threadFunc - 线程函数

1
2
private:
void threadFunc();

代码实现

  1. 构造函数
    1. 主要的工作就是构造EventLoopThread中的Thread对象即m_thread成员。Thread对象m_thread绑定的线程函数是用std::bind绑定的函数,用的是EventLoopThread类中的threadFunc函数,并且绑定了其this指针。EventLoopThread构造函数的第2个参数name将作为m_thread的名字。
    2. 第1个参数指定的是线程初始化时的回调函数与线程start后执行的线程函数无关。第1个参数的默认值和第2个参数的默认值在.h文件中已指出。ThreadInitCallback()是指创建一个默认函数对象,函数执行空操作。
    3. Thread对象构造完成后,不会立即执行线程函数threadFunc,因为Thread构造并不意味着C++11标准库的thread创建完毕。只有调用m_thread.start()才会真正执行线程函数threadFunc
    4. 构造函数还把传入的线程初始化时回调函数cb保存到了m_callback成员。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include"eventloopthread.h"
#include"eventloop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,
const std::string &name)
: m_loop(nullptr), m_exiting(false),
m_thread(std::bind(&EventLoopThread::threadFunc, this), name),
m_mutex(), m_cond(), m_callback(cb)
{

}
EventLoopThread::~EventLoopThread()
{
m_exiting = true;
if(m_loop != nullptr)
{
m_loop->quit();
m_thread.join();
}
}
EventLoop * EventLoopThread::startLoop()
{
m_thread.start(); //启动底层新线程,执行回调函数,

EventLoop * loop = nullptr;
{/* 临界区m_loop */
std::unique_lock<std::mutex> lock(m_mutex);
while(m_loop == nullptr)
{
m_cond.wait(lock);
}
loop = m_loop;
}/* 临界区m_loop */
return loop;
}
/**
* @brief Thread对象实际执行的线程函数,在单独的子线程中执行
*/
void EventLoopThread::threadFunc()
{
EventLoop loop; //构造一个独立的eventloop, 和m_thread一对一, one loop per thread的证据
if(m_callback) //如果m_callback(即ThreadInitCallback)不为空则执行此函数
{
m_callback(&loop);
}
{/* 临界区m_loop */
std::unique_lock<std::mutex> lock(m_mutex);
m_loop = &loop;
m_cond.notify_one(); //通知主线程的startLoop(), loop已经在子线程创建好了。
}/* 临界区m_loop */
loop.loop(); //EventLoop loop => Poller.poll

/* 执行到此处说明loop已经结束 退出循环 */
std::unique_lock<std::mutex> lock(m_mutex);
m_loop = nullptr;
}

EventLoopThreadPool

1
class EventLoopThreadPool : noncopyable

线程初始化时回调函数

1
2
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

成员变量

  1. m_baseLoop - 用户最开始创建的loop
  2. 标志相关
    1. std::string m_name
    2. bool m_started
    3. int m_numThreads
    4. int m_next
  3. std::vector<std::unique_ptr<EventLoopThread>> m_threads - 包含了所有创建的线程
  4. std::vector<EventLoop*> m_loops - 包含了所有管理着的loop的指针,通过m_threads中的某个thread进行startLoop()返回loop的指针。

成员函数

构造/析构函数

1
2
3
public:
EventLoopThreadPool(EventLoop * baseLoop, const std::string &nameArg);
~EventLoopThreadPool();

setThreadNum(int) - 供TcpServer调用

1
2
3
4
5
public:
void setThreadNum(int numThreads)
{
m_numThreads = numThreads;
}

start - 开启事件循环线程

1
2
public:
void start(const ThreadInitCallback &cb = ThreadInitCallback());

getNextLoop - 如果工作在多线程中,baseLoop默认以轮询的方式分配channel给subLoop

1
2
public:
EventLoop * getNextLoop();

getAllLoops - 获取所有管理着的loop,存到vector中,相当于拷贝了m_loops

1
2
public:
std::vector<EventLoop*> getAllLoops();

获取各种状态、信息

  1. started
  2. name
1
2
3
public:
bool started() const {return m_started;}
const std::string name() const {return m_name;}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include"eventloopthreadpool.h"
#include"eventloopthread.h"
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: m_baseLoop(baseLoop),
m_name(nameArg),
m_started(false),
m_numThreads(0),
m_next(0)
{

}
EventLoopThreadPool::~EventLoopThreadPool()
{
/**
* nothing to do, bacause evey loop is on the thread stack,
* that will destruct automatically.
*/
}
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
m_started = true;
for(int i = 0; i < m_numThreads; ++i)
{
char buf[m_name.size() + 32];
/* 以线程池name + 下标序列号 作为thread线程的名字 */
snprintf(buf, sizeof buf, "%s%d", m_name.c_str(), i);
std::string threadName(buf);
m_threads.push_back(std::make_unique<EventLoopThread>(cb, threadName));
m_loops.push_back(m_threads.back()->startLoop());
}
/* m_numThreads == 0时, 上面for循环不会执行, 执行下面的操作 */
if(m_numThreads == 0 && cb != nullptr)
{
cb(m_baseLoop);
}
}
/* 体现了对subLoop的轮询算法 */
EventLoop* EventLoopThreadPool::getNextLoop()
{
EventLoop * loop = m_baseLoop;
if(!m_loops.empty())
{
loop = m_loops[m_next];
++m_next;
if(m_next >= m_loops.size())
{
m_next = 0;
}
}
return loop;
}
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
if(m_loops.empty())
{
return std::vector<EventLoop*>(1, m_baseLoop);
}
else
{
return m_loops;
}
}

Acceptor

mainReactor主要的工作是处理客户端的连接请求,然后把sockfd轮询分配给subReactors。

这个工作由mainReactor中的acceptor处理。处理的流程和TCP socket编程流程基本一致。需要有一个listenfd,即监听套接字,去其中的监听队列取可用的连接。即Acceptor主要就是对若干sockfd的封装。

socket

.h文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#pragma once
#include"noncopyable.h"
class InetAddress;
class Socket : noncopyable
{
public:
explicit Socket(int sockfd)
: m_sockfd(sockfd)
{

}
~Socket();
int fd() const {return m_sockfd;}
void bindAddress(const InetAddress &localAddr);
void listen();
int accept(InetAddress *peerAddr);
public:
void shutdownWrite();
/* 更改TCP选项, 直接交付, 不进行缓存 */
void setTcpNoDelay(bool on);
/* 更改TCP选项 */
void setReuseAddr(bool on);
/* 更改TCP选项 */
void setReusePort(bool on);
/* 更改TCP选项 */
void setKeepAlive(bool on);
private:
const int m_sockfd;
};

.cc文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include"socket.h"
#include"logger.h"
#include"inetaddress.h"
#include<unistd.h> //close
#include<sys/socket.h> //bind
#include<strings.h> //bzero
#include<netinet/tcp.h> //TCP_NODELAY
Socket::~Socket()
{
close(m_sockfd);
}
void Socket::bindAddress(const InetAddress &localAddr)
{
/* bind return 0 when success */
if(0 != ::bind(m_sockfd, (sockaddr*)localAddr.getSockAddr(), sizeof(sockaddr_in)))
{
LOG_FATAL("bind sockfd: %d fail, createNonblocking or Die\n", m_sockfd);
}
}
void Socket::listen()
{
if(0 != ::listen(m_sockfd, 1024))
{
LOG_FATAL("listen sockfd: %d fail\n", m_sockfd);
}
}
int Socket::accept(InetAddress * peerAddr)
{
struct sockaddr_in addr;
socklen_t len;
bzero(&addr, sizeof addr);
int connfd = ::accept(m_sockfd, (sockaddr*)&addr, &len);
if(connfd >= 0)
{
peerAddr->setSockAddr(addr);
}
return connfd;
}
void Socket::shutdownWrite()
{
if(::shutdown(m_sockfd, SHUT_WR) < 0)
{
LOG_ERROR("shutdown Write error\n");
}
}
void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval);
}
void Socket::setReuseAddr(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
}
void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval);
}
void Socket::setKeepAlive(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(m_sockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval);
}

Acceptor

1
class Acceptor : noncopyable

收到新连接时的回调

1
2
public:
using NewConnectionCallback = std::function<void(int fd, const InetAddress&)>;

成员变量

  1. m_loop
  2. m_acceptSocket
  3. m_acceptChannel
  4. m_newConnectionCallback - 把fd打包为channel,getNextLoop唤醒一个subLoop,把channel分发给subLoop。
  5. m_listening
1
2
3
4
5
6
private:
EventLoop * m_loop;
Socket m_acceptSocket;
Channel m_acceptChannel;
NewConnectionCallback m_newConnectionCallback;
bool m_listenning;

成员函数

构造/析构

1
2
3
4
public:
/* 此构造的三个参数本身也是TcpServer的三个参数 */
Acceptor(EventLoop * loop, const InetAddress & listenAddr, bool reusePort);
~Acceptor();

listen

1
2
public:
void listen();

get/set

  1. setNewConnectionCallback
  2. listening
1
2
3
4
5
6
public:
void setNewConnectionCallback(const NewConnectionCallback &cb)
{
m_newConnectionCallback = cb;
}
bool listenning() const {return m_listenning;}

handleRead

1
2
private:
void handleRead();

代码实现

构造

  1. 由传入的loopm_loop初始化;创建一个NonBlock的Tcp socketfd并用于构造m_acceptSocket;把loop和刚才创建好的socketfd打包构造m_acceptChannel;设置各种标志。
  2. 根据传入的第2个参数listenAddrbindAddress绑定地址到socket上。
  3. TcpServer调用start()后,意味着acceptor要对listen sockfd进行listen。如果接收到了新用户的连接,需要执行一个回调(具体操作是把connfd->channel->subloop)。所以还要设置一个ReadCallback。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int createNonblockingSocket()
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if(sockfd < 0)
{
LOG_FATAL("%s:%s:%d listen socketfd create err: %d\n", __FILE__, __FUNCTION__, __LINE__, errno);
}
return sockfd;
}
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reusePort)
: m_loop(loop),
m_acceptSocket(createNonblockingSocket()), //create socket
m_acceptChannel(loop, m_acceptSocket.fd()),
m_listenning(false)
{
m_acceptSocket.setReuseAddr(true);
m_acceptSocket.setReusePort(true);
m_acceptSocket.bindAddress(listenAddr); //bind addr to socket
// TcpServer::start() Acceptor::listen(), 如果有新连接需要执行回调 connfd->channel->subloop
//baseLoop -> m_acceptChannel(listenfd) ->
m_acceptChannel.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

handleRead - listenfd有事件发生, 即有新用户链接时的回调操作

accept一个connfd

把fd和peerAddr交给newConnectionCallback处理。newConnectionCallback是TcpServer中编写的。主要工作是轮询找到subLoop, 唤醒, 分配当前新客户端的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* listenfd有事件发生, 即有新连接 */
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = m_acceptSocket.accept(&peerAddr);
if(connfd > 0)
{
if(m_newConnectionCallback)
{
/* 轮询找到subLoop, 唤醒, 分配当前新客户端的channel */
m_newConnectionCallback(connfd, peerAddr);
}
else
{
close(connfd);
}
}
else
{
LOG_ERROR("%s:%s:%d accept err: %d", __FILE__, __FUNCTION__, __LINE__, errno);
if(errno == EMFILE)
{
LOG_ERROR("(sockfd reached max limit)");
}
LOG_ERROR("\n");
}
}

析构

1
2
3
4
5
Acceptor::~Acceptor()
{
m_acceptChannel.disableAll();
m_acceptChannel.remove();
}

listen

  1. 设置listenning标志
  2. 调用m_acceptSocket的listen
  3. 调用m_acceptChannel的enableReading,即把m_acceptChannel注册到Poller中。
1
2
3
4
5
6
void Acceptor::listen()
{
m_listenning = true;
m_acceptSocket.listen(); //listen
m_acceptChannel.enableReading(); //m_acceptChannel -> poller
}

TcpServer

考虑一个问题:用户使用muduo库编写服务器程序时,为了避免用户再去困惑引入哪些头文件,我们在tcpserver.h中把该引入的头文件全引入进去,而不再只是对类前置声明了。

1
2
3
4
5
6
#pragma once
#include"eventloop.h"
#include"acceptor.h"
#include"inetaddress.h"
#include"noncopyable.h"
class TcpServer : noncopyable

回调

所有的回调,都是用户设置到TcpServer后,TcpServer再自己设置到EventLoop中的。

以下是TcpServer类中包含的回调操作属性。(成员变量)

1
2
3
4
5
6
private:
ThreadInitCallback m_threadInitCallback;
ConnectionCallback m_connectionCallback;
MessageCallback m_messageCallback;
WriteCompleteCallback m_writeCompleteCallback;
HighWaterMarkCallback m_highWaterMarkCallback;

线程初始化时的回调

直接声明在TcpServer class中。

1
2
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

连接、读写事件的回调 - 单独写到callbacks.h文件中

为了对各种回调函数进行管理,写到单独的头文件callbacks.h中。

  1. ConnectionCallback - 有关连接的回调。
  2. CloseCallback - 关闭连接的回调
  3. WriteCompleteCallback - 消息发送完成后的回调
  4. MessageCallback - 已连接用户有读写事件发生时的回调
  5. HighWaterMarkCallback - 高水位回调,为了控制收发流量稳定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#pragma once
#include<memory>
#include<functional>
class Buffer;
class TcpConnection;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;

using ConnectionCallback = std::function<void(const TcpConnectionPtr&)>;
using CloseCallback = std::function<void(const TcpConnectionPtr&)>;
using WriteCompleteCallback = std::function<void(const TcpConnectionPtr&)>;
using HighWaterMarkCallback = std::function<void(const TcpConnectionPtr&, size_t)>;

using MessageCallback = std::function<void(const TcpConnectionPtr &,
Buffer *, Timestamp)>;

枚举声明

Option枚举类,直接声明在TcpServer类中。

Option.kReusePort/Option.kNoReusePort表示端口是否可重用。

1
2
3
4
5
6
public:
enum Option
{
kNoReusePort,
kReusePort
};

成员变量

其中,回调函数属性已经在上面给出。

  1. ConnectionMap m_connections - 保存所有的连接

    1
    2
    3
    private:
    using ConnectionMap = std::unoredered_map<std::string, TcpConnectionPtr>;
    ConnectionMap m_connections;
  2. EventLoop *m_loop - 用户实现定义的baseLoop

    1
    2
    private:
    EventLoop * m_loop;
  3. m_IPport - 存储IP和端口的字符串

    1
    2
    private:
    const std::string m_IPport;
  4. std::unique_ptr<Acceptor> m_acceptor - 运行在mainLoop,任务是监听新连接事件。

  5. std::unique_ptr<EventLoopThreadPool> m_threadPool -

  6. 标识

    1. std::string m_name - TcpServer的易记忆名字
    2. atomic_bool started - 是否已启动
    1
    2
    3
    private:
    std::string m_name;
    atomic_bool started;
  7. m_nextConnId

    1
    2
    private:
    int m_connId;

成员函数

  1. 构造/析构

    1
    2
    3
    4
    public:
    TcpServer(EventLoop * loop, const InetAddress &listenAddr,
    const std::string &nameArg, Option option = kNoReusePort);
    ~TcpServer();
  2. 设置回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public:
    void setThreadInitCallback(const ThreadInitCallback &cb)
    {
    m_threadInitCallback = cb;
    }
    void setConnectionCallback(const ConnectionCallback &cb)
    {
    m_connectionCallback = cb;
    }
    void setMessageCallback(const MessageCallback &cb)
    {
    m_messageCallback = cb;
    }
    void setWriteCompleteCallback(const WriteCompleteCallback &cb)
    {
    m_writeCompleteCallback = cb;
    }
  3. start - 开启服务器监听,即mainLoop的acceptor的listen

    1
    2
    public:
    void start();
  4. 关于Connection

    1
    2
    3
    4
    private:
    void newConnection(int sockfd, const InetAddress& peerAddr);
    void removeConnection(const TcpConnectionPtr& conn);
    void removeConnectionInLoop(const TcpConnectionPtr& conn);

代码实现

  1. 构造,有三个参数,loop指针,InetAddress,标识名称name,端口是否重用Option

    1. m_loop进行赋值,需要做非空检查

      1
      2
      3
      4
      5
      6
      7
      8
      EventLoop* CheckLoopNotNull(EventLoop* loop)
      {
      if(loop == nullptr)
      {
      LOG_FATAL("%s:%s:%d mainLoop is null\n", __FILE__, __FUNCTION__, __LINE__);
      }
      return loop;
      }
    2. m_IPport进行赋值,值是调用参数listenAddr中的函数toIPport()获得的

    3. m_name赋值

    4. m_acceptor进行构造(unique_ptr)。传入参数looplistenAddroption

      1. 创建socket
      2. socket的fd和loop指针封装为Channel - m_acceptChannel(loop, m_acceptSocket.fd())
      3. 设置channel的ReadCallback回调
      4. 当TcpServer调用start时,acceptor将会调用listen,将调用m_acceptChannelenableReading()函数,往其相应的loop中进而在poller中注册事件。
      5. loop等待事件,如果发生事件,调用channel的handleEvent,进而执行readCallBack。acceptor的readCallBack在构造时绑定为handleRead,工作是对channel的socket进行accept。
    5. m_threadPool进行构造(shared_ptr)。传入参数loopm_name

    6. 设置回调

      1. m_connectionCallBack
      2. m_messageCallBack
    7. m_nextConnId

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #include<functional>
    using namespace std::placeholders;

    TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr,
    const std::string &nameArg, Option option = kNoReusePort)
    : m_loop(CheckLoopNotNull(loop)),
    m_IPport(listenAddr.toIPport()),
    m_name(nameArg),
    m_acceptor(new Acceptor(loop, listenAddr, option == kReusePort)),
    m_threadPool(new EventLoopThreadPool(loop, m_name)),
    m_connectionCallback(),
    m_messageCallback(),
    m_nextConnId(1)
    {
    m_acceptor->setNewConnectionCallback(std::bind(&TcpServer::newConnection,
    this, _1, _2));//connfd, peerAddr
    }
  2. newConnection - 运行在主线程当中,主线程的mainLoop调用此函数,选择了一个ioLoop,在非子loop的线程中(即当前是在mainThread)执行cb,就需要唤醒子loop所在线程(subThread),执行cb,即调用subLoop的queueInLoop(cb)

    1. 根据轮询算法选择一个subLoop,即调用m_threadPool->getNextLoop()
    2. 唤醒subLoop
    3. 把当前connfd封装成channel分发给subloop
  3. setThreadNum - 设置底层subLoop的个数

    1
    2
    3
    4
    void TcpServer::setThreadNum(int numThreads)
    {
    m_threadPool->setThreadNum(numThreads);
    }
  4. start - 开启服务器监听

    1
    2
    3
    4
    5
    6
    7
    8
    9
    void TcpServer::start()
    {
    //防止一个TcpServer对象被start多次;
    if(m_started++ == 0)//即使bool为1,bool++后的值也还是1
    {
    m_threadPool->start(m_threadInitCallback);
    m_loop->runInLoop(std::bind(&Acceptor::listen, m_acceptor.get()));
    }
    }

TcpConnection

顾名思义,此类对象表示的是tcp通信中,客户端和服务器之间成功建立的一条连接。主要封装用户在服务端的数据。

  1. mainLoop通过acceptor接收到新的连接时,将会把fd和loop封装到channel,继而封装到TcpConnection中,再通过轮询算法交给subLoop。
  2. 更重要的是,TcpConnection中存储了一些连接事件、读写事件发生时的回调。
  3. 最后,TcpConnection还还封装了Buffer。

Buffer

基于非阻塞IO的服务端编程,Buffer是必不可少的。比如解决TCP粘包问题。

成员变量

  1. std::vector<char> m_buffer - 数据数组。
  2. size_t m_readerIndex - 数据可读的位置下标
  3. size_t m_writerIndex - 数据可写的位置下标
1
2
3
4
private:
std::vector<char> m_buffer;
size_t m_readerIndex;
size_t m_writerIndex;

除此之外,还有两个静态常量。

  1. kCheapPrepend - 记录数据包的长度
  2. kInitialSize - 数据包的初始大小值。
1
2
3
public:
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;

成员函数

  1. 构造/析构

    1
    2
    3
    4
    5
    6
    7
    8
    public:
    explicit Buffer(size_t initialSize = kInitialSize)
    : m_buffer(kCheapPrepend + initialSize),
    m_readerIndex(kCheapPrepend),
    m_writerIndex(kCheapPrepend)
    {
    }
    ~Buffer() = default;
  2. readableBytes、writableBytes、prependableBytes

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public:
    size_t readableBytes() const
    {
    return m_writerIndex - m_readerIndex;
    }
    size_t writableBytes() const
    {
    return m_buffer.size() - m_writerIndex;
    }
    size_t prependableBytes() const
    {
    return m_readerIndex;
    }
  3. 返回指针

    1. begin - 获取buffer实际首部指针
    2. peek - 返回缓冲区数据包中可读数据起始位置
    3. beginWrite - 返回可写的数据起始位置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private:
    char * begin()
    {
    return &*m_buffer.begin();
    }
    const char * begin() const
    {
    return &*m_buffer.begin();
    }
    public:
    const char * peek() const
    {
    return begin() + m_readerIndex;
    }
    char * beginWrite()
    {
    return begin() + m_writerIndex;
    }
    const char * beginWrite() const
    {
    return begin() + m_writerIndex;
    }
  4. retrieve/retrieveAll/retrieveAsString/retrieveAllAsString - 后两个是把buffer中的数据转为string类型,多与onMessage配合使用;前两个是将m_readerIndexm_writerIndex调整位置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public:
    void retrieve(size_t len)
    {
    if(len < readableBytes())//只读取了可读缓冲区数据的一部分
    {
    m_readerIndex += len;
    }
    else
    {
    retrieveAll();
    }
    }
    void retrieveAll()
    {
    m_readerIndex = m_writerIndex = kCheapPrepend;
    }
    std::string retrieveAsString(size_t len)
    {
    std::string result(peek(), len);
    retrieve(len);
    return result;
    }
    std::string retrieveAllAsString()
    {
    return retrieveAsString(readableBytes());
    }
  5. ensureWritableByte - 确保buffer可写空间大小足够len,不足则扩容

    1
    2
    3
    4
    5
    6
    7
    8
    public:
    void ensureWritableBytes(size_t len)
    {
    if(writableBytes() < len)
    {
    makeSpace(len);
    }
    }
  6. makeSpace - 扩容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private:
    void makeSpace(size_t len)
    {
    if(writableBytes()+prependableBytes()-kCheapPrepend < len)
    {
    m_buffer.resize(m_writerIndex + len);
    }
    else//move readable data to the front to make space
    {
    size_t readable = readableBytes();
    //将m_readerIndex到m_writerIndex的内容复制到kCheapPrepend处
    std::copy(begin() + m_readerIndex, begin() + m_writerIndex, begin() + kCheapPrepend);
    m_readerIndex = kCheapPrepend;
    m_writerIndex = m_readerIndex + readable;
    }
    }
  7. append

    1
    2
    3
    4
    5
    6
    7
    public:
    void append(const char * data, size_t len)
    {
    ensureWritableBytes(len);
    std::copy(data, data+len, beginWrite());
    m_writerIndex += len;
    }
  8. readFd - 从fd上读取数据

    1
    2
    public:
    ssize_t readFd(int fd, int * saveErrno); //在.cc文件中实现

代码实现

  1. readFd - 从fd上读取数据

    1. Poller默认工作在LT模式

    2. Buffer缓冲区是有大小的,但从fd上读数据却不清楚数据有多少。

    3. 此函数使用了系统调用readv

    4. struct iovec结构 - iov_base指向缓冲区首址;iov_len是缓冲区的长度。网络编程中可以使用此工具,创建一个struct iovec iov[2],第一个的iov_base指向tcp连接底层的缓冲区,第二个的iov_base指向额外的缓冲区,以备使用。如果使用到额外的缓冲区,在readv完毕后,把额外缓冲区内容拼接到tcp底层缓冲区尾部即可。

      1
      2
      3
      4
      5
      struct iovec
      {
      void * iov_base;
      size_t iov_len;
      }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    #include"buffer.h"
    #include<sys/uio.h>
    ssize_t Buffer::readFd(int fd, int * savedErrno)
    {
    char extrabuf[65536] = {0}; //栈上内存空间
    struct iovec iov[2];
    const size_t writable = writableBytes();//buffer底层缓冲区剩余的可写空间大小
    iov[0].iov_base = begin() + m_writerIndex;
    iov[0].iov_len = writable;
    iov[1].iov_base = extrabuf;
    iov[1].iov_len = sizeof extrabuf;
    const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
    //如果可写空间大小少于64kb则可以按需写到vec[0]/vec[1];
    //如果可写空间大小大于等于64kb则只能写到vec[0]。
    //说明, 可收到的数据大小限制至少为64kb。
    const ssize_t n = ::readv(fd, iov, iovcnt);
    if(n < 0)
    {
    *savedErrno = errno;
    }
    else if(n <= writable)
    {
    m_writerIndex += n;
    }
    else// n > writable, 需要拼接extrabuf
    {
    m_writerIndex = m_buffer.size();
    append(extrabuf, n-writable);
    }
    return n;
    }

TcpConnection

1
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>

成员变量

  1. m_loop - subLoop

    1
    2
    private:
    EventLoop *m_loop;
  2. m_socket/m_channel - unique_ptr管理

    1
    2
    3
    private:
    std::unique_ptr<Socket> m_socket;
    std::unique_ptr<Channel> m_channel;
  3. m_localAddr/m_peerAddr - 本地/对端地址信息

    1
    2
    3
    private:
    const InetAddress m_localAddr;
    const InetAddress m_peerAddr;
  4. m_inputBuffer/m_outputBuffer - 读写缓冲区

    1
    2
    3
    private:
    Buffer m_inputBuffer;
    Buffer m_outputBuffer;
  5. 各种标志

    1. m_name
    2. m_state - atomic,用枚举类变量赋值
    3. m_reading
    4. m_highWaterMark - 高水位阈值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    private:
    const std::string m_name;

    enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
    std::atomic_int m_state;

    bool m_reading;

    size_t m_highWaterMark;
  6. 各种回调

    1
    2
    3
    4
    5
    6
    private:
    ConnectionCallback m_connectionCallback;
    MessageCallback m_messageCallback;
    WriteCompleteCallback m_writeCompleteCallback;
    HighWaterMarkCallback m_highWaterMarkCallback;
    CloseCallback m_closeCallback;

成员函数

  1. 构造/析构

    1
    2
    3
    4
    public:
    TcpConnection(EventLoop *loop, const std::string& name, int sockfd,
    const InetAddress& localAddr, const InetAddress& peerAddr);
    ~TcpConnection();
  2. 建立/销毁连接

    1
    2
    3
    public:
    void connectEstablished();
    void connectDestoryed();
  3. send - 发送数据

    1
    2
    3
    private:
    void send(const std::string &buf);
    void send(const void * data, int len);
  4. shutdown - 关闭连接

    1
    2
    3
    4
    public:
    void shutdown();
    private:
    void shutdownInLoop();
  5. 设置回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public:
    void setConnectionCallback(const ConnectionCallback & cb)
    {
    m_connectionCallback = cb;
    }
    void setMessageCallback(const MessageCallback & cb)
    {
    m_messageCallback = cb;
    }
    void setWriteCompleteCallback(const WriteCompleteCallback & cb)
    {
    m_writeCompleteCallback = cb;
    }
    void setCloseCallback(const CloseCallback &cb)
    {
    m_closeCallback = cb;
    }
    void setHighWaterMarkCallback(const HighWaterMarkCallback & cb, size_t highWaterMark)
    {
    m_highWaterMarkCallback = cb;
    m_highWaterMark = highWaterMark;
    }
  6. 设置、判断标志

    1
    2
    3
    public:
    bool connected() const {return m_state == kConnected;}
    void setState(StateE state) {m_state = state;}
  7. 获取属性

    1
    2
    3
    4
    5
    public:
    EventLoop * getLoop() const {return m_loop;}
    const std::string& name() const {return m_name;}
    const InetAddress& localAddress() const {return m_localAddr;}
    const InetAddress& peerAddress() const {return m_peerAddr;}
  8. handleRead/handleWrite/handleClose/handleError

    1
    2
    3
    4
    5
    private:
    void handleRead(Timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

代码实现

  1. 构造:重要参数 - loop、sockfd、localAddr、peerAddr

    1. 给loop赋值,name起名字
    2. 赋state为kConnecting、reading为true
    3. 以sockfd为参数构造socket,new后赋给智能指针m_socket
    4. 以loop、sockfd为参数构造channel,new后赋给智能指针m_channel
    5. 赋值localAddr、peerAddr
    6. 赋高水位阈值m_highWaterMark64*1024*1024(64M)
    7. m_channel设置相应的回调,当poller给channel通知感兴趣的事件发生,则channel会回调相应的操作函数
    8. m_socket调用setKeepAlive,使TCP启动保活机制
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /* 写为static,防止函数名字冲突 */
    static EventLoop* CheckLoopNotNull(EventLoop* loop)
    {
    if(loop == nullptr)
    {
    LOG_FATAL("%s:%s:%d mainLoop is null\n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
    }
    TcpConnection::TcpConnection(EventLoop* loop, const std::string &nameArg, int sockfd,
    const InetAddress &localAddr, const InetAddress &peerAddr)
    : m_loop(CheckLoopNotNull(loop)), m_name(nameArg),
    m_state(kConnecting), m_reading(true),
    m_socket(new Socket(sockfd)),
    m_channel(new Channel(loop, sockfd)),
    m_localAddr(localAddr), m_peerAddr(peerAddr),
    m_highWaterMark(64*1024*1024) //64M
    {
    m_channel->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));
    m_channel->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));
    m_channel->setCloseCallback(std::bind(&TcpConnection::handleClose, this));
    m_channel->setErrorCallback(std::bind(&TcpConnection::handleError, this));
    LOG_INFO("%s [%s] at fd = %d\n", __FUNCTION__, m_name.c_str(), sockfd);
    m_socket->setKeepAlive(true); //启动TCP保活机制
    }
  2. 析构

    1
    2
    3
    4
    5
    TcpConnection::~TcpConnection()
    {
    LOG_INFO("%s [%s] at fd = %d, state = %d\n",
    __FUNCTION__, m_name.c_str(), m_channel->fd(), m_state.load());
    }
  3. handleRead - 调用m_inputBufferreadFd, 读取channel上的消息; 如果有数据则调用m_messageCallback; 如果返回值为0说明客户端断开, 调用handleClose; 如果出错则handleError;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void TcpConnection::handleRead(Timestamp receiveTime)
    {
    int savedErrno = 0;
    ssize_t n = m_inputBuffer.readFd(m_channel->fd(), &savedErrno);
    if(n > 0)
    {
    //已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage
    m_messageCallback(shared_from_this(), &m_inputBuffer, receiveTime);
    }
    else if(n == 0) //客户端断开
    {
    handleClose();
    }
    else
    {
    errno = savedErrno;
    LOG_ERROR("%s\n", __FUNCTION__);
    handleError();
    }
    }
  4. handleWrite - 调用m_outputBufferwriteFd, 写到channel上对应fd的缓冲区(调用peek, 找到缓冲区数据包中可读数据起始位置, 把从此位置起共readableBytes()数据写到fd); 如果成功则调用m_loop->queueInLoop(std::bind(m_writeCompleteCallback, shared_from_this())); 最后, 判断连接的状态如果是DisconnectingshutdownInLoop

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    void TcpConnection::handleWrite()
    {
    if(m_channel->isWriting())
    {
    int savedErrno = 0;
    ssize_t n = m_outputBuffer.writeFd(m_channel->fd(), &savedErrno);
    if(n > 0)
    {
    m_outputBuffer.retrieve(n);
    if(m_outputBuffer.readableBytes() == 0)
    {
    m_channel->disableWriting();
    if(m_writeCompleteCallback)
    {
    /* 唤醒loop对应的thread线程, 执行回调 */
    m_loop->queueInLoop(std::bind(m_writeCompleteCallback,
    shared_from_this()));
    }
    if(m_state == kDisconnecting)
    {
    shutdownInLoop();
    }
    }
    }
    else
    {
    LOG_ERROR("%s\n", __FUNCTION__);
    }
    }
    else
    {
    LOG_ERROR("%s: connection fd = %d is down, no more writing.\n",
    __FUNCTION__, m_channel->fd());
    }
    }
  5. handleClose - 调用setState(kDisconnected), m_channel->disableAll(), m_connectionCallback, m_closeCallback

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void TcpConnection::handleClose()
    {
    LOG_INFO("%s: fd = %d, state: %d\n", __FUNCTION__, m_channel->fd(), m_state.load());
    setState(kDisconnected);
    m_channel->disableAll();

    TcpConnectionPtr connPtr(shared_from_this());
    m_connectionCallback(connPtr);
    m_closeCallback(connPtr);
    }
  6. handleError - 调用getsockopt, 调查fd的错误, 如果连getsockopt也失败则存储全局errno

    • int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen): 操作套接字选项时,必须指定选项所在的级别和选项的名称, SOL_SOCKET表示在套接字API级别, 参见getprotoent(3); 参数optvaloptlen对于getsockopt(), 标识一个缓冲区,请求选项的值将在其中返回。optlen是一个值结果参数,最初包含optval指向的缓冲区的大小,并在返回时修改以指示返回值的实际大小。 如果不提供或返回选项值,则optval可能为NULL。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void TcpConnection::handleError()
    {
    int optval;
    socklen_t optlen = sizeof optval;
    int err = 0;
    if(::getsockopt(m_channel->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
    {
    err = errno;
    }
    else
    {
    err = optval;
    }
    LOG_ERROR("%s name: %s - SO_ERROR: %d\n", __FUNCTION__, m_name.c_str(), err);
    }
  7. send - 用户会给TcpServer注册onMessageCallback, 已建立连接的用户有读写事件时, 尤其是读事件, onMessage会响应; 处理完客户端发来的事件后(onMessageCallback), 服务端会send给客户端回发消息;

    在TcpConnection的成员中, 有两个Buffer成员;

    1. inputBuffer - 接收数据的缓冲区 - 即recv操作需要暂存的区域
    2. outputBuffer - 发送数据的缓冲区 - 即send操作需要暂存的区域

    其中, outputBuffer存在的意义?

    1. 应用层可能需要处理很多数据, 数据从传输层到网络层到数据链路层的传输往往比应用层发送得快; 需要用缓冲区暂存;
    2. 为了防止应用层与底层传输的数据量差距悬殊导致数据丢失, 设置了一个高水位阈值;

    收发数据的方式: 本项目的数据收发统一使用json或protobuf格式化的字符串进行, 所以此send函数的参数为了方便起见, 直接规定为string类型;

    1. 判断当前连接的状态是否为connected;
    2. 判断此loop是否在本thread中, 如果是则调用sendInLoop; 否则runInLoop, 绑定的函数也是sendInLoop;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void TcpConnection::send(const std::string &buf)
    {
    if(m_state == kConnected)
    {
    if(m_loop->isInLoopThread())
    {
    sendInLoop(buf.c_str(), buf.size());
    }
    else
    {
    m_loop->runInLoop(std::bind(&TcpConnection::sendInLoop,
    this, buf.c_str(), buf.size()));
    }
    }
    }
  8. sendInLoop - 写数据操作, 结合了sendInLoop和handleWrite

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    void TcpConnection::sendInLoop(const void * data, size_t len)
    {
    ssize_t nwrote = 0;
    size_t remaining = len;
    bool faultError = false; //记录是否产生错误
    if(m_state == kDisconnected)
    {
    LOG_ERROR("Disconnected, give up writing!");
    return;
    }
    /**
    * m_channel->isWriting()为false表示channel第一次开始写数据,
    * readableBytes()为0说明缓冲区没有待发送数据;
    */
    if(!m_channel->isWriting() && m_outputBuffer.readableBytes() == 0)
    {
    nwrote = ::write(m_channel->fd(), data, len);
    if(nwrote >= 0)
    {
    remaining = len - nwrote;
    if(remaining == 0 && m_writeCompleteCallback)
    {
    //如果此时数据全部发送完毕, 不用再给channel设置epollout事件
    m_loop->queueInLoop(std::bind(m_writeCompleteCallback, shared_from_this()));
    }
    }
    else //nwrote < 0
    {
    nwrote = 0;
    if(errno != EWOULDBLOCK)
    {
    LOG_ERROR("%s\n", __FUNCTION__);
    if(errno == EPIPE || errno == ECONNRESET)// SIGPIPE or RESET
    {
    faultError = true;
    }
    }
    }
    }
    if(!faultError && remaining > 0)//没有出错, 没有发送完毕, 剩余数据需要存到缓冲区, 然后给channel注册epollout事件, LT模式, poller发现tcp的发送缓冲区有空间, 会通知相应的sock->channel, 调用writeCallback方法, 即调用handleWrite, 直到把发送缓冲区中数据全部发送
    {
    size_t oldlen = m_outputBuffer.readableBytes();
    if(oldlen + remaining >= m_highWaterMark && oldlen < m_highWaterMark && m_highWaterMarkCallback)
    {
    m_loop->queueInLoop(std::bind(m_highWaterMarkCallback, shared_from_this(), oldlen+remaining));
    }
    m_outputBuffer.append((char*)data+nwrote, remaining);//data+nworte即剩余的位置
    if(!m_channel->isWriting())//m_channel->isWriting()为false表示channel第一次开始写数据, 之前没有注册epollout, 现在需要注册
    {
    m_channel->enableWriting();
    }
    }
    }
  9. connectEstablished

    1. setState为kConnected;
    2. 调用m_channel->tie, 让m_channel绑定一个TcpConnection, 方便后期观察TcpConnection是否还有效, 若已失效将不进行相应的操作, 已然无意义;
    1
    2
    3
    4
    5
    6
    7
    8
    void TcpConnection::connectEstablished()
    {
    setState(kConnected);
    m_channel->tie(shared_from_this());
    m_channel->enableReading(); //向poller注册channel的epollin事件
    //有新连接建立, 调用connectionCallback
    m_connectionCallback(shared_from_this());
    }
  10. connectDestroyed

    1. 判断state是否为connected, 若是则setState为kDisconnected, 并调用m_channel->disableAll(), 调用connectionCallback
    2. 最后m_channel->remove(), 把channel从poller中删除掉
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void TcpConnection::connectDestoryed()
    {
    if(m_state == kConnected)
    {
    setState(kDisconnected);
    m_channel->disableAll();
    m_connectionCallback(shared_from_this());
    }
    m_channel->remove();
    }
  11. shutdown/shutdownInLoop

    • 关闭写端, 将会触发epollhup, 调用closeCallback, 即TcpConnection中的handleClose方法,
      1. setState(kDisconnected)
      2. m_channel->disableAll()
      3. connectionCallback, closeCallback
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    void TcpConnection::shutdown()
    {
    if(m_state == kConnected)
    {
    setState(kDisconnecting);
    m_loop->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
    }
    }
    /**
    * 关闭写端, 将会触发epollhup,
    * 会调用closeCallback,
    * 即TcpConnection中的handleClose方法,
    * handleClose将会:
    * 1. setState(kDisconnected);
    * 2. m_channel->disableAll();
    * 3. connectionCallback, closeCallback
    */
    void TcpConnection::shutdownInLoop()
    {
    if(!m_channel->isWriting())//说明outputBuffer中的数据已经全部发送完成
    {
    m_socket->shutdownWrite();//关闭写端, 将会触发epollhup, 会调用closeCallback, 即TcpConnection中的handleClose方法
    }
    }

TcpServer收尾

  1. newConnection - 运行在主线程当中,主线程的mainLoop调用此函数,选择了一个ioLoop,在非子loop的线程中(即当前是在mainThread)执行cb,就需要唤醒子loop所在线程(subThread),执行cb,即调用subLoop的queueInLoop(cb)

    1. 根据轮询算法选择一个subLoop,即调用m_threadPool->getNextLoop()
    2. 唤醒subLoop
    3. 把当前connfd封装成channel分发给subloop
    1
    2
    3
    4
    5
    6
    void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
    {
    EventLoop *ioLoop = m_threadPool->getNextLoop();
    char buf[64] = {0};
    snprintf(buf, sizeof buf, "-%s#%d", m_IPport.c_str(), m_nextConnId)
    }
  2. removeConnection/removeConnectionInLoop

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void TcpServer::removeConnection(const TcpConnectionPtr &conn)
    {
    m_loop->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
    }
    void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
    {
    LOG_INFO("%s [%s] - connection %s\n",
    __FUNCTION__, m_name.c_str(), conn->name().c_str());
    m_connections.erase(conn->name());
    EventLoop *ioLoop = conn->getLoop();
    ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestoryed, conn));
    }

流程

  1. TcpServer -> Acceptor -> 有一个新用户连接,通过accept函数得到connfd
  2. 用户给TcpServer设置回调 -> TcpServer给TcpConnection设置回调 -> TcpConnection把回调传给Channel -> Channel注册到Poller中 -> Poller通知Channel调用回调
  3. mainLoop中的acceptor是一个特殊的socketfd, 它只有一个回调ReadCallback, 负责监听新用户的连接, 返回socket, 将这个socket打包成TcpConnection, 再注册相应的回调;

CAS_原子操作

内容

  1. CAS原理
  2. 原子操作
  3. 汇编层面

CAS

CAS(compare And Swap)也叫比较交换,是一种无锁原子算法,映射到操作系统就是一条cmpxchg硬件汇编指令(保证原子性),其作用是让CPU将内存值更新为新值,但是有个条件,内存值必须与期望值相同,并且CAS操作无需用户态与内核态切换,直接在用户态对内存进行读写操作(意味着不会阻塞/线程上下文切换)。

它包含3个参数CAS(V,E,N) ,V表示待更新的内存值,E表示预期值,N表示新值,当V值等于E值时,才会将V值更新成N值,如果V值和E值不等,不做更新,这就是一次CAS的操作。

image-20220527162619434

链栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include<mutex>
#include<thread>
template<class T>
class Stack
{
private:
struct StackNode
{
T data;
StackNode* next;
};
StackNode* Buynode()
{
StackNode* s = (StackNode*)malloc(sizeof(StackNode));
if (NULL == s)exit(1);
memset(s, 0, sizeof(StackNode));
return s;
}
void Freenode(StackNode* pnode)
{
free(pnode);
}
private:
StackNode* base;
size_t cursize;
//mutable std::mutex mtx;
mutable std::recursive_mutex mtx;
Stack(const Stack&) = delete;
Stack& operator=(const Stack&) = delete;
public:
Stack()
:base(nullptr), cursize(0)
{

}
~Stack()
{
clear();
}
void clear()
{
std::lock_guard<std::recursive_mutex> locker(mtx);
while (base != nullptr)
{
StackNode* q = base;
base = q->next;
(&(q->data))->~T();
Freenode(q);
cursize -= 1;
}
}
void push(const T& x)
{
std::lock_guard<std::recursive_mutex> locker(mtx);
StackNode* s = Buynode();
new(&(s->data)) T(x); //定位new构造对象
s->next = base;
base = s;
cursize += 1;
}
const T& top() const
{
std::lock_guard<std::recursive_mutex> locker(mtx);
return base->data;
}
T& top()
{
std::lock_guard<std::recursive_mutex> locker(mtx);
return base->data;
}
void pop()
{
std::lock_guard<std::recursive_mutex> locker(mtx);
if (base != nullptr)
{
StackNode* q = base;
base = q->next;
(&(q->data))->~T(); //定位析构
Freenode(q);
cursize -= 1;
}
}
public:
size_t get_size() const
{
std::lock_guard<std::recursive_mutex> locker(mtx);
return cursize;
}
bool is_empty() const
{
std::lock_guard<std::recursive_mutex> locker(mtx);
return get_size() == 0;
}
};
#define _TEST1
#ifdef _TEST1
#include<iostream>
#include<thread>
void thread_funa(Stack<int>& s)
{
for (int i = 1; i <= 10; i += 2)
{
std::cout << i << std::endl;
s.push(i);
}
}
void thread_funb(Stack<int>& s)
{
for (int i = 2; i <= 10; i += 2)
{
std::cout << i << std::endl;
s.push(i);
}
}
int main()
{
Stack<int> ist;
std::thread tha(thread_funa, std::ref(ist));
std::thread thb(thread_funb, std::ref(ist));

tha.join();
thb.join();
std::cout << "push over." << std::endl;
while (!ist.is_empty())
{
int val = ist.top();
std::cout << val << std::endl;
ist.pop();
}
}
#endif

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
12

43

65

87

109

push over.
9
10
7
8
5
6
3
4
1
2

标准库原子操作API

构造函数

原型:std::atomic<T>::atomic

1
2
3
4
atomic() noexcept = default;
constexpr atomic() = noexcept(std::is_nothrow_default_constructible_v<T>);
constexpr atomic(T desired) noexcept;
atomic(const atomic&) = delete;

有默认无参构造;拷贝构造删除。

operator=

1
2
3
4
T operator=(T desired) noexcept;
T operator=(T desired) volatile noexcept;
atomic& operator=(const atomic&) = delete;
atomic& operator=(const atomic&) volatile = delete;

两个原子对象之间不可相互赋值。

代码示例

1
2
3
4
5
6
7
8
#include<atomic>
int main()
{
atomic<int> iat;
atomic<int> iat2;
//iat = iat2; //error
iat = 10;
}

load

原子地加载并返回原子对象的当前值。安装order的值影响内存。

1
2
3
4
5
6
T load(std::memory_order order = std::memory_order_seq_cst) const noexcept;
T load(std::memory_order order = std::memory_order_seq_cst) const volatile noexcept;
参数:
order - 强制的内存顺数
返回值:
原子变量的当前值
1
2
3
4
5
6
7
8
#include<atomic>
int main()
{
atomic<int> iat;
atomic<int> iat2;
iat = 10;
int val = iat.load();
}

存在的问题:

语义是获取原子对象的值没错,但是获取的此值只作为当时的瞬时值。

在多线程中,可能获得的是脏数据。

compare_exchange

有两个函数。compare_exchange_weakcompare_exchange_strong

原子地比较原子对象与的非原子参数的值,若相等进行交换,若不相等则进行加载。

1
2
3
4
5
6
7
8
9
bool compare_exchange_weak(T & expected, T desired, std::memory_order order = std::memory_order_seq_cst) noexcept;
参数:
expected - 到期待在原子对象中找到的值的引用。若比较失败则被存储*this的实际值。
desired - 若符合期待则存储于原子对象的值
success - 若比较成功,则读修改写操作所用的内存同步顺序。容许所有值。
failure - 若比较失败,则加载操作所用的内存同步顺序。不能为std::memory_order_release或std::memory_order_acq_rel,且不能指定强于success 的顺序(C++17前)
order - 两个操作所用的内存同步顺序
返回值:
若成功更改底层原子值则为true,否则为false

原子地比较*this原子对象与expected对象表示(C++20前) | 值表示(C++20起),而若它们逐位相等则以desired替换前者(进行读修改写操作)。否则,将*this中的实际值加载进expected(进行加载操作)。

读修改写和加载操作的内存模型分别为successfailure。在上面这个函数中,order用于读修改写操作和加载操作,除了若order == std::memory_order_acq_relorder== std::memory_order_release,则加载操作分别使用std::memory_order_acquirestd::memory_order_relaxed

改写链栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include<mutex>
#include<thread>
template<class T>
class Stack
{
private:
struct StackNode
{
T data;
StackNode* next;
};
StackNode* Buynode()
{
StackNode* s = (StackNode*)malloc(sizeof(StackNode));
if (NULL == s)exit(1);
memset(s, 0, sizeof(StackNode));
return s;
}
void Freenode(StackNode* pnode)
{
free(pnode);
}
private:
mutable std::atomic<StackNode*> pHead;
Stack(const Stack&) = delete;
Stack& operator=(const Stack&) = delete;
public:
Stack()
:pHead(nullptr)
{

}
~Stack()
{

}
void push(const T& val)
{
StackNode * newnode = Buynode();
new(&(newnode->data)) T(val);
newnode->next = pHead;
pHead = newnode;
}
};
int main()
{
Stack<int> ist;

}

newnode->next = pHead; pHead = newnode;进行原子化。
即转为:while(!pHead.compare_exchange_weak(newnode->next, newnode));

1
2
3
4
5
6
7
8
void push(const T& val)
{
StackNode * newnode = Buynode();
new(&(newnode->data)) T(val);

newnode->next = pHead.load();
while(!pHead.compare_exchange_weak(newnode->next, newnode));
}

判断pHead是否等于newnode->next,如果相等,则把newnode赋值给pHead