集群服务器

内容

  1. 项目需求及目标
  2. 开发环境
  3. Json介绍
  4. muduo网络库编程
  5. 服务器集群
  6. 基于发布-订阅Redis——服务器中间件
  7. 数据库设计

本项目要用到的技术栈:

  1. Json序列化和反序列化;
  2. muduo网络库开发;
  3. nginx源码编译安装和环境部署;
  4. nginxtcp负载均衡器配置;
  5. redis缓存服务器编程实践;
  6. 基于发布-订阅的服务器中间件redis消息队列编程实践;
  7. MySQL数据库编程;
  8. CMake构建编译环境;
  9. Github托管项目

本项目的内容包含了:通常开发的服务器,网络、业务、数据模块(数据库、数据的操作),
项目中要把三大模块区分开,项目初期时以登录模块为主线,分三大块推进。

项目需求及目标

  • 项目需求
    1. 客户端新用户注册
    2. 客户端用户登录
    3. 添加好友和添加群组
    4. 好友聊天和群组聊天
    5. nginx配置tcp负载均衡
    6. 集群聊天系统支持客户端跨服务器通信
  • 项目目标
    1. 掌握服务器的网络I/O模块,业务模块,数据模块分层的设计思想
    2. 掌握C++ muduo网络库的编程以及实现原理
    3. 掌握Json的编程应用
    4. 掌握nginx配置部署tcp负载均衡器的原理及应用
    5. 掌握服务器中间件的应用场景和基于发布-订阅redis编程实践以及应用原理
    6. 掌握CMake构建自动化编程环境

开发环境

muduo库基于boost库,需要先安装boost。

1
2
3
4
5
6
tar -zxvf boost_1_69_0.tar.gz
cd boost_1_69_0/
./bootstrap.sh #运行bootstrap.sh工程编译构建程序
./b2 #源码根目录下生成了b2程序,运行b2程序
#编译完成后,会有如下打印: The Boost C++ Libraries were successfully built!
sudo ./b2 install #把上面的boost库头文件和lib库文件安装在默认的Linux系统头文件和库文件的搜索路径下

工程目录

1
2
3
4
5
6
7
8
9
10
11
12
include目录是头文件放的位置。

server和client的代码在同一工程中,最后生成时可以把C/S分开生成到bin目录下。
可按server和client分类,
比如生成代码所需用到的头文件可以分别放在/include/server和/include/client,
而server和client共需的头文件直接放到/include下。比如消息的id。

src放源码。

thirdparty是第三方库文件夹,比如放json.hpp。

本项目没有生成lib库(.a/.so),所以没有lib文件夹。

CMakeLists.txt编写

根目录

1
2
3
4
5
6
7
8
9
10
11
cmake_minimum_required(VERSION 3.0.0)
project(chat)
# 配置编译选项
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g)
# 配置最终的可执行文件输出的路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
# 配置头文件的搜索路径
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_SOURCE_DIR}/include/server)
# 加载子目录
add_subdirectory(src)

/src

1
add_subdirectory(server)

/src/server

1
2
3
4
5
6
# 定义了一个SRC_LIST变量,包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
# 指定生成可执行文件
add_executable(ChatServer ${SRC_LIST})
# 指定可执行文件链接时 需要依赖的库文件
target_link_libraries(CharServer muduo_net muduo_base pthread)

JSON介绍

JSON,全拼:JavaScript Object Notation

JSON是一种轻量级的数据交换格式(也叫数据序列化方式)。

JSON采用完全独立于编程语言的文本格式来存储和表示数据。

简洁和清晰的层次结构使得JSON成为理想的数据交换语言。

易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率。

JSON的用处就是下图:
image-20220413164124873

JSON第三方库

本项目选用的是JSON for Modern C++,由德国人nlohmann编写的在C++下使用的JSON库。特点如下:

  1. 整个代码由一个头文件json.hpp包含,没有依赖关系,使用方便;
  2. 使用C++11标准编写;
  3. 使得JSON像STL容器一样,而且STL和JSON容器之间可以相互转换;
  4. 所有类都经过严格的单元测试,覆盖100%的代码,包括所有特殊的行为。此外,还检查了Valgrind是否有内存泄漏。为了保持高质量,该项目遵循“核心基础设施”倡议的最佳实践。

测试JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include"json.hpp"
using json = nlohmann::json;
#include<iostream>
#include<vector>
#include<map>
using namespace std;
/* json序列化 示例1 */
void func1()
{
json js;
js["msg_type"] = 2;
js["from"] = "zhang san";
js["to"] = "li si";
js["msg"] = "hello, i'm zhang san";
cout << js << endl;
}
int main()
{
func1();
return 0;
}

输出结果

1
{"from":"zhang san","msg":"hello, i'm zhang san","msg_type":2,"to":"li si"}

能直接添加二维key - value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* JSON 序列化 实例2 */
void func2()
{
json js;
js["id"] = {1, 2, 3, 4, 5};
js["name"] = "zhang san";
js["msg"]["zhang san"] = "i'm zhang san";
js["msg"]["li si"] = "i'm li si";
/* 上面两句等同于下面这句一次性添加数组对象 */
/* js["msg"] = {{"zhang san", "i'm zhang san"}, {"li si", "i'm li si"}}; */
cout << js << endl;
}
int main()
{
func2();
return 0;
}


输出结果

1
{"id":[1,2,3,4,5],"msg":{"li si":"i'm li si","zhang san":"i'm zhang san"},"name":"zhang san"}

STL容器内容导入JSON

这个JSON库强大到直接把C++ STL中的容器内容可以直接序列化成Json字符串,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* json序列化 实例3 */
void func3()
{
json js;
vector<int> vec;
vec.push_back(1);
vec.push_back(2);
vec.push_back(5);
// 直接序列化一个vector容器
js["list"] = vec;
map<int, string> m;
m.insert({1, "黄山"});
m.insert({2, "华山"});
m.insert({3, "泰山"});
// 直接序列化一个map容器
js["path"] = m;
cout << js << endl;
}

输出结果

1
{"list":[1,2,5],"path":[[1,"黄山"],[2,"华山"],[3,"泰山"]]}

API

dump(序列化)

生成的字符串内容和<< json的一样。返回一个string对象。

cout <<能输出 JSON 对象是因为重载了<<运算符;而要实际生成string可用dump(),生成的字符串内容和<< json的一样。传输数据时,不要传string对象,而是要传string实际指向的字符串首指针,c_str()

1
2
3
4
5
6
7
8
9
10
void func1()
{
json js;
js["msg_type"] = 2;
js["from"] = "zhang san";
js["to"] = "li si";
js["msg"] = "hello, i'm zhang san";
string sendBuf = js.dump();
cout << sendBuf.c_str() << endl;
}

parse(反序列化为JSON对象,可以赋值给STL容器)

解析string类型的字符串,生成一个JSON对象。可以转换为容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void func1()
{
json js;
js["msg_type"] = 2;
js["from"] = "zhang san";
js["to"] = "li si";
js["msg"] = "hello, i'm zhang san";
cout << js << endl;
}
int main()
{
string recvBuf = func1();
json json_obj = json::parse(recvBuf);
cout << json_obj["msg_type"] << endl;
cout << json_obj["from"] << endl;
cout << json_obj["to"] << endl;
cout << json_obj["msg"] << endl;
}

输出结果:

1
2
3
4
2
"zhang san"
"li si"
"hello, what are you doing now?"
1
2
3
4
5
6
7
8
9
10
11
12
13
void func2()
{
json js;
js["id"] = {1, 2, 3, 4, 5};
}
int main()
{
string recvBuf = func2();
json json_obj = json::parse(recvBuf);
auto arr = json_obj["id"];
cout << json_obj["id"] << endl;
cout << arr[2] << endl;
}

输出结果

1
2
[1,2,3,4,5]
3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void func3()
{
json js;
vector<int> vec;
vec.push_back(1);
vec.push_back(2);
vec.push_back(5);
// 直接序列化一个vector容器
js["list"] = vec;
map<int, string> m;
m.insert({1, "黄山"});
m.insert({2, "华山"});
m.insert({3, "泰山"});
// 直接序列化一个map容器
js["path"] = m;
cout << js << endl;
}
int main()
{
vector<int> vec = json_obj["list"];
for (int & v : vec)
{
cout << v << " ";
}
cout << endl;

map<int, string> mymap = json_obj["path"];
for (auto & p : mymap)
{
cout << p.first << " " << p.second << endl;
}
cout << endl;
}

输出结果

1
2
3
4
1 2 5 
1 黄山
2 华山
3 泰山

网络IO模块

main入口(⭐需要处理服务器程序异常退出,如SIGINT信号)

诸如被kill、或者Ctrl + C终止服务器主程序时,需要处理信号,绑定回调void resetHandler(int)
在ChatService中,调用reset函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include"chatserver.hpp"
#include<iostream>
using namespace std;

#include<signal.h>
#include"chatservice.hpp"
#include<muduo/base/Logging.h> // LOG_ERROR <<
using namespace muduo;

/* 处理服务器Ctrl+C结束后,重置user状态信息 */
void resetHandler(int)
{
ChatService::instance()->reset();
LOG_INFO << "服务器因SIGINT信号退出,用户状态已重置。";
exit(0);
}
int main()
{
signal(SIGINT, resetHandler);
EventLoop loop;
InetAddress addr("127.0.0.1", 6000);
ChatServer server(&loop, addr, "ChatServer");
ChatService::instance()->reset();
server.start();
loop.loop(); //开启事件循环
return 0;
}

ChatService中的reset()函数,就是调用_userModelresetAllState()方法。

1
2
3
4
5
6
/* 业务重置方法,通常在服务器异常退出时调用 */
void ChatService::reset()
{
/* 把所有online用户的状态置为offline */
_userModel.resetAllState();
}

即重置所有用户的状态信息,把所有 online 的改成 offline 。

1
2
3
4
5
6
7
8
9
10
/* 重置所有用户的状态信息 */
void UserModel::resetAllState()
{
char sql[1024] = "update user set state = 'offline' where state = 'online'";
MySQL mysql;
if(mysql.connect())
{
mysql.update(sql);
}
}

ChatServer

成员属性

  1. TcpServer m_server - 基于事件驱动的、IO复用+epoll+线程池的服务器类,完全基于Reactor模型
  2. EventLoop *m_loop - mainLoop的指针, 保存事件循环. 有了事件循环的指针,可以在合适的时候调用quit退出事件循环;
1
2
3
4
5
private:
/* 组合的muduo库,实现服务器功能的类对象 */
TcpServer _server;
/* 指向事件循环对象的指针 */
EventLoop *_loop;

成员函数

构造

参数为loop指针, listenAddr, name, 用于初始化TcpServer

1
2
3
4
5
public:
/* 初始化聊天服务器对象 */
ChatServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg);

start()

1
2
public:
void start();

onConnection/onMessage

连接创建/断开的回调函数

读写事件发生的回调函数

1
2
3
4
5
private:
/* 上报链接相关信息的回调函数(连接创建,连接断开)*/
void onConnection(const TcpConnectionPtr&);
/* 上报读写事件相关信息的回调函数 */
void onMessage(const TcpConnectionPtr&, Buffer*, Timestamp);

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#ifndef CHATSERVER_H
#define CHATSERVER_H
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
using namespace muduo;
using namespace muduo::net;
/**
* 聊天服务器的主类;
*
* 要注册两个方法:
* 给TcpServer注册新用户的连接、连接断开的、已连接用户的可读写事件;
*/
class ChatServer
{
public:
/* 初始化聊天服务器对象 */
ChatServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg);
public:
/* 启动服务 */
void start();

private:
/* 上报链接相关信息的回调函数(连接创建,连接断开)*/
void onConnection(const TcpConnectionPtr&);
/* 上报读写事件相关信息的回调函数 */
void onMessage(const TcpConnectionPtr&, Buffer*, Timestamp);


private:
/* 组合的muduo库,实现服务器功能的类对象 */
TcpServer _server;
/* 指向事件循环对象的指针 */
EventLoop *_loop;
};
#endif

Reactor模型

本项目基于muduo库,模型是基于事件驱动的、IO复用+epoll+线程池的网络,完全基于Reactor模型,线程暂时设置为 4 个,有一个主Reactor是 IO 线程,主要负责新用户的连接,3个 sub-Reactor 是工作线程,主要负责已连接用户的读写事件的处理。

(⭐)客户端异常退出的业务代码解决

需要知道,如果客户端异常断网的话,是不会发送TCP报文的,更不会发送JSON信息。
但是好在muduo网络库为我们封装了处理客户端异常断网的逻辑,我们只需要在onConnection回调里处理客户端断开的逻辑就好了。
既然客户端断开了,需要在Service层面记录用户退出。

1
2
3
4
5
6
7
8
void ChatServer::onConnection(const TcpConnectionPtr& conn)
{
if (!conn->connected())
{
ChatService::instance()->cloentCloseException(conn);
conn->shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ChatService::clientCloseException(const TcpConnectionPtr &conn)
{
User user;
{
lock_guard<mutex> lock(_connMutex);
// 从 map 表中找到该 conn 的用户,删除连接信息
for (auto it = _userConnMap.begin(); it != _userConnMap.end(); ++it)
{
if (it->second == conn)
{
user.setId(it->first);
_userConnMap.erase(it);
break;
}
}
}
// 更新用户的状态信息
if (user.getId() != -1)
{
user.setState("offline");
_userModel.updateState(user);
}
}

数据库设计

User表

字段名称 字段类型 字段说明 约束
id INT 用户id PRIMARY KEY、AUTO_INCREMENT
name VARCHAR(50) 用户名 NOT NULL
password VARCHAR(50) 用户密码 NOT NULL
state ENUM(‘online’, ‘offline’) 当前登录状态 DEFAULT ‘offline’
Friend表
字段名称 字段类型 字段说明 约束
userid INT 用户id NOT NULL、联合主键
friendid INT 好友id NOT NULL、联合主键
AllGroup表
字段名称 字段类型 字段说明 约束
id INT 群组id PRIMARY KEY、AUTO_INCREMENT
groupname VARCHAR(50) 群组名称 NOT NULL
groupdesc VARCHAR(200) 群组描述 DEFAULT ‘’

GroupUser表

字段名称 字段类型 字段说明 约束
groupid INT 群组id PRIMARY KEY
userid INT 群组成员id NOT NULL
grouprole ENUM(‘creator’, ‘normal’) 群组身份 DEFAULT ‘normal’
OfflineMessage表
字段名称 字段类型 字段说明 约束
userid INT 用户id PRIMARY KEY
message VARCHAR(500) 离线消息(JSON字符串) NOT NULL

业务模块

(⭐)业务模块与网络模块解耦 - 回调

考虑问题:
网络模块收到的消息如何派发到业务模块?

让网络模块的代码和业务模块的代码解耦。

假设有一个用户在做登录业务,登录业务包含messageIDnamepassword,要验证用户名密码是否正确。

解耦的方案有两种:

  1. 使用基于面向接口的编程。(抽象基类)
  2. 基于回调函数的操作。

业务类型

  1. 登录 - LOGIN_MSG/ACK
  2. 注册 - REG_MSG/ACK
  3. 加好友 - ADD_FRIEND_MSG
  4. 一对一聊天 - ONE_CHAT_MSG
  5. 创建群组 - CREATE_GROUP_MSG
  6. 加入群组 - ADD_GROUP_MSG
  7. 群聊 - GROUP_CHAT_MSG
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#ifndef PUBLIC_H
#define PUBLIC_H
/* server和client的公共文件 */

enum EnMsgType //En表示Enum枚举
{
LOGIN_MSG = 1, //登录 消息id, 与chatservice中的login方法绑定
LOGIN_MSG_ACK, //登录响应

REG_MSG, //注册 消息id, 与chatservice中的reg方法绑定
REG_MSG_ACK, //注册响应

ADD_FRIEND_MSG, //添加好友
ONE_CHAT_MSG, //一对一聊天

CREATE_GROUP_MSG, //创建群组
ADD_GROUP_MSG, //加入群组
GROUP_CHAT_MSG //群聊
};

enum EnLoginErrType
{
LOGIN_SUCCEESS = 0,
LOGIN_REPEAT = 1,
LOGIN_NOTFOUND = 2,
LOGIN_WRONGPWD = 3
};
#endif

业务消息对应的JSON格式

业务类型 JSON示例
登录请求消息 "msgid":1,"id":22,"password":"123"
登录响应消息 "msgid":2,"errno":0,"id":22,"name":"xcg""msgid":2,"errno":1,"errmsg":"this account is online!""msgid":2,"errno":2,"errmsg":"id not found!""msgid":2,"errno":3,"errmsg":"password wrong!"
注册请求消息 "msgid":3,"name":"xcg","password":"123"
注册响应消息 "msgid":4,"errno":0,"id":22"msgid":4,"errno":1
加好友请求消息 "msgid":5,"friendid":12
一对一聊天消息 "msgid":6,"to":12,"msg":"hello!"
创建群组请求消息 "msgid":7,"groupname":"group1","groupdesc":"this is a group."
加入群组请求消息 "msgid":8,"id":22,"groupid":10
群聊请求消息 "msgid":9,"id":22,"groupid":10,"msg":"hello, everyone!"
注销请求消息 "msgid":10,"id":22

ChatService

前置处理

1
2
#include"json.hpp"
using json = nlohmann::json;
1
2
/* 表示处理消息的事件回调方法类型 */
using MsgHandler = std::function<void(const TcpConnectionPtr&, json&, Timestamp)>;

成员属性

m_msgHandlerMap

是一个unordered_map<int, MsgHandler>

映射消息类型id 和 事件回调函数

在启动业务服务之前,里面的key value都提前添加好了,不用处理线程安全问题。

1
2
3
private:
/* 存储消息id和其对应的业务处理方法 */
unordered_map<int, MsgHandler> m_msgHandlerMap;

m_userConnectionMap

是一个unordered_map<int, TcpConnectionPtr>

存储在线用户的通信连接状态。这是聊天服务器实现长连接的基础

需要处理线程安全问题。

1
2
3
private:
/* 存储在线用户的通信连接 */
unordered_map<int, TcpConnectionPtr> m_userConnectionMap;

mutex m_connMutex

定义互斥锁,保证m_userConnectionMap的线程安全

1
2
3
private:
/* 定义互斥锁,保证m_userConnectionMap的线程安全 */
mutex _connMutex;

数据操作类对象

  1. UserModel - m_userModel
  2. OfflineMsgModel - m_offlineMsgModel
  3. FriendModel - friendModel
  4. GroupModel - groupModel
1
2
3
4
5
private:
UserModel _userModel; /* 数据操作类对象 */
OfflineMsgModel _offlineMsgModel; /* 数据操作类对象 */
FriendModel _friendModel; /* 数据操作类对象 */
GroupModel _groupModel; /* 数据操作类对象 */

成员函数

构造函数(⭐单例)

私有化, 单例处理

1
2
3
4
public:
static ChatService * instance();
private:
ChatService();

业务接口

  1. login - 登陆业务
  2. reg - 注册业务
  3. addFriend - 添加好友业务
  4. oneChat - 一对一聊天业务
  5. createGroup - 创建群组业务
  6. addGroup - 加入群组业务
  7. groupChat - 群组聊天业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public:
/* 处理登录业务 */
void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 处理注册业务 */
void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);

/* 添加好友业务 */
void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 一对一聊天业务 */
void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);

/* 创建群组业务 */
void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 加入群组业务 */
void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 群组聊天业务 */
void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time);

getHandler

获取消息对应的处理器

1
2
3
public:
/* 获取消息对应的处理器 */
MsgHandler getHandler(int msgid);

clientCloseException

处理客户端异常退出

1
2
3
public:
/* 处理客户端异常退出 */
void clientCloseException(const TcpConnectionPtr & conn);

reset

业务重置方法,通常在服务器异常退出时调用

1
2
3
public:
/* 业务重置方法,通常在服务器异常退出时调用 */
void reset();

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#ifndef CHATSERVICE_H
#define CHATSERVICE_H
#include<muduo/net/TcpConnection.h>
#include<unordered_map>
#include<functional>

#include"usermodel.hpp"
#include"offlinemsgmodel.hpp"
#include"friendmodel.hpp"
#include"groupmodel.hpp"

using namespace std;
using namespace muduo;
using namespace muduo::net;
#include"json.hpp"
using json = nlohmann::json;

#include<mutex>

/* 表示处理消息的事件回调方法类型 */
using MsgHandler = std::function<void(const TcpConnectionPtr&, json&, Timestamp)>;
/**
* 聊天服务器业务类.
* 用映射关系来存储消息id和具体处理函数.
* 此类有一个实例就够了,所以采用单例模式.
*/
class ChatService
{
public:
/* 获取单例对象的接口函数 */
static ChatService* instance();
public:
/* 处理登录业务 */
void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 处理注册业务 */
void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);

/* 添加好友业务 */
void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 一对一聊天业务 */
void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);

/* 创建群组业务 */
void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 加入群组业务 */
void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
/* 群组聊天业务 */
void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
public:
/* 获取消息对应的处理器 */
MsgHandler getHandler(int msgid);
public:
/* 处理客户端异常退出 */
void clientCloseException(const TcpConnectionPtr & conn);
public:
/* 业务重置方法,通常在服务器异常退出时调用 */
void reset();

private:
ChatService();
private:
UserModel _userModel; /* 数据操作类对象 */
OfflineMsgModel _offlineMsgModel; /* 数据操作类对象 */
FriendModel _friendModel; /* 数据操作类对象 */
GroupModel _groupModel; /* 数据操作类对象 */
private:
/* 定义互斥锁,保证m_userConnectionMap的线程安全 */
mutex _connMutex;
private:
/* 存储消息id和其对应的业务处理方法 */
unordered_map<int, MsgHandler> _msgHandlerMap;
/* 存储在线用户的通信连接 */
unordered_map<int, TcpConnectionPtr> _userConnectionMap;
};
#endif

(⭐)登录业务需要处理的长连接问题、线程安全问题

需要ChatService类中有一个记录用户的连接状态才行。
因为聊天服务器需要保持一个给用户推送消息的状态。

1
2
3
private:
unordered_map<int, TcpConnectionPtr> _userConnMap;
mutex _connMutex;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* 处理登录业务 */
void ChatService::login(const TcpConnectionPtr &conn,
json &js, Timestamp time)
{
/* 从json参数获取账号、密码信息 */
int id = js["id"].get<int>();
string password = js["password"];
User user = _userModel.query(id);

json response;
response["msgid"] = LOGIN_MSG_ACK;
if(user.getId() == id && user.getPassword() == password)
{
if(user.getState() == "online")
{
/* 该用户已经登录在线,不允许重复登陆 */
response["errno"] = LOGIN_REPEAT;
response["errmsg"] = "该用户已经登录";
}
else if(user.getState() == "offline")
{
/* 登陆成功 */
{
/* 记录用户连接信息 */
lock_guard<mutex> lock(_connMutex);
_userConnectionMap.insert({id, conn});
}
#ifdef __CLUSTER__
/**
* 集群环境下, 向redis订阅此id
*/
m_redis.subscribe(id);
#endif
/* 更新用户状态信息 */
user.setState("online");
_userModel.updateState(user);

response["errno"] = LOGIN_SUCCEESS;
response["id"] = user.getId();
response["name"] = user.getName();
/* 查询该用户是否在离线时未收到的消息 */
vector<string> offlineMsgVec = _offlineMsgModel.query(id);
if(!offlineMsgVec.empty())
{
response["offlinemsg"] = offlineMsgVec;
/* 把该用户的所有离线消息从从数据中删除掉 */
_offlineMsgModel.remove(id);
}
/* 查询该用户的好友信息,并返回 */
vector<User> userVec = _friendModel.query(id);
if(!userVec.empty())
{
vector<string> friendJsonInfoVec;
for(User &user : userVec)
{
json js;
js["id"] = user.getId();
js["name"] = user.getName();
js["state"] = user.getState();
friendJsonInfoVec.push_back(js.dump());
}
response["friends"] = friendJsonInfoVec;
}
}
}
else if(user.getId() != id)
{
/* 登录失败,用户不存在 */
response["errno"] = LOGIN_NOTFOUND;
response["errmsg"] = "用户不存在";
}
else if(user.getPassword() != password)
{
/* 登录失败,密码不匹配 */
response["errno"] = LOGIN_WRONGPWD;
response["errmsg"] = "密码验证失败";
}
conn->send(response.dump());
}

(⭐)离线消息的处理(包括集群环境下)

用OfflineMessage数据表存储离线消息。
当用户登陆时,ChatService去数据表查询是否有该用户id的离线消息。如果有,则推送。

当处于集群环境时,某一台服务器中的_userConnectionMap只记录了在该服务器在线的用户,
所以用户可能在本服务器offline,而在其他服务器上online,
所以真实的在线状态应该去数据库的User表中查询(_userModel.query(id); user.getState() == "online";
如果真的是offline,则才存储离线消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* 一对一聊天业务 */
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int to = js["to"].get<int>();
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnectionMap.find(to);
if(it != _userConnectionMap.end())
{
/* 接收方在线,转发消息 */
/* 服务器主动推送消息给接收方 */
it->second->send(js.dump()); // it->second 表示
return;
}
}
#ifdef __CLUSTER__
/**
* 集群环境下, 需要查询对方(to)是否在线;
* 不可通过服务器connMap查询, 是通过数据库信息;
*/
User user = _userModel.query(to);
if(user.getState() == "online")
{
m_redis.publish(to, js.dump());
return;
}
#endif
/* 接收方离线,存储离线消息 */
_offlineMsgModel.insert(to, js.dump());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/* 群组聊天业务 */
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid);

bool offline = true;
bool reallyOffline = true;
for(int id : useridVec)
{
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnectionMap.find(id);
if(it != _userConnectionMap.end())//在本台服务器上线
{
offline = false;
reallyOffline = false;
it->second->send(js.dump());
}
}
#ifdef __CLUSTER__
if(offline)
{
/**
* 集群环境下, 需要判断其是否在其他服务器上在线;
*/
User user = _userModel.query(id);
if(user.getState() == "online")
{
reallyOffline = false;
m_redis.publish(id, js.dump());
}
}
#endif
if(reallyOffline)
{
_offlineMsgModel.insert(id, js.dump());
}
reallyOffline = true;
offline = true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* 处理登录业务 */
void ChatService::login(const TcpConnectionPtr &conn,
json &js, Timestamp time)
{
/* 从json参数获取账号、密码信息 */
int id = js["id"].get<int>();
string password = js["password"];
User user = _userModel.query(id);

json response;
response["msgid"] = LOGIN_MSG_ACK;
if(user.getId() == id && user.getPassword() == password)
{
if(user.getState() == "online")
{
/* 该用户已经登录在线,不允许重复登陆 */
response["errno"] = LOGIN_REPEAT;
response["errmsg"] = "该用户已经登录";
}
else if(user.getState() == "offline")
{
/* 登陆成功 */
{
/* 记录用户连接信息 */
lock_guard<mutex> lock(_connMutex);
_userConnectionMap.insert({id, conn});
}
#ifdef __CLUSTER__
/**
* 集群环境下, 向redis订阅此id
*/
m_redis.subscribe(id);
#endif
/* 更新用户状态信息 */
user.setState("online");
_userModel.updateState(user);

response["errno"] = LOGIN_SUCCEESS;
response["id"] = user.getId();
response["name"] = user.getName();
/* 查询该用户是否在离线时未收到的消息 */
vector<string> offlineMsgVec = _offlineMsgModel.query(id);
if(!offlineMsgVec.empty())
{
response["offlinemsg"] = offlineMsgVec;
/* 把该用户的所有离线消息从从数据中删除掉 */
_offlineMsgModel.remove(id);
}
/* 查询该用户的好友信息,并返回 */
vector<User> userVec = _friendModel.query(id);
if(!userVec.empty())
{
vector<string> friendJsonInfoVec;
for(User &user : userVec)
{
json js;
js["id"] = user.getId();
js["name"] = user.getName();
js["state"] = user.getState();
friendJsonInfoVec.push_back(js.dump());
}
response["friends"] = friendJsonInfoVec;
}
}
}
else if(user.getId() != id)
{
/* 登录失败,用户不存在 */
response["errno"] = LOGIN_NOTFOUND;
response["errmsg"] = "用户不存在";
}
else if(user.getPassword() != password)
{
/* 登录失败,密码不匹配 */
response["errno"] = LOGIN_WRONGPWD;
response["errmsg"] = "密码验证失败";
}
conn->send(response.dump());
}

数据模块

(⭐)业务模块与数据模块解耦 - ORM框架

Object Relation Map - 对象关系映射。

在这个框架中,业务层操作的都是对象,看不到具体的SQL操作。

在DAO层(数据层),才有具体的数据库操作。

解决了痛点:业务模块、数据模块之间的解耦。

搭建MySQL数据库环境

(以下命令基于Ubuntu环境)

  1. 安装mysql-server和mysql开发包, 包括mysql头文件和动态库文件
1
2
sudo apt-get install mysql-server		#安装最新版MySQL服务器
sudo apt-get install libmysqlclient-dev #安装开发包
  1. 初始的用户名和密码是自动生成的,按下面步骤修改mysql的root用户密码为123456
1
2
3
4
5
6
7
~$ sudo cat /etc/mysql/debian.cnf

[client]
host = localhost
user = debian-sys-maint #初始的用户名
password = Kk3TbShbFNvjvhpM #初始的密码
socket = /var/run/mysqld/mysqld.sock
1
2
3
# 用上面初始的用户名和密码,登录mysql server,修改root用户的密码,命令如下:
~$ mysql -u debian-sys-maint -pKk3TbShbFNvjvhpM
#-u后面是上面查看的用户名; -p后面紧跟上面查看的密码

更改密码:参考 https://blog.csdn.net/mrcan666/article/details/124163537?spm=1001.2014.3001.5502

  1. 设置MySQL字符编码utf-8,以支持中文操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> show variables like "char%"; # 先查看MySQL默认的字符编码
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |#不支持中文!!!
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
8 rows in set (0.06 sec)

mysql> set character_set_server=utf8;
Query OK, 0 rows affected (0.00 sec)
  1. 修改表的字符编码:alter table user default character set utf8;
  2. 修改属性的字符编码:alter table user modify column name varchar(50) character set utf8;

MySQL类 - 封装MySQL操作

需要引入mysql/mysql.h头文件

1
#include<mysql/mysql.h>

成员变量

MYSQL *m_conn

记录MYSQL类型的指针, 以获取这个mysql连接

1
2
private:
MYSQL *m_conn;

成员函数

构造/析构

初始化/释放数据库连接

1
2
3
4
5
public:
/* 初始化数据库连接 */
MySQL();
/* 释放数据库连接资源 */
~MySQL();

getConnection

获取连接, 即获取成员m_conn

1
2
3
public:
/* 获取连接 */
MYSQL * getConnection();

connect

连接数据库, 返回值为bool, 说明连接成功与否

1
2
3
public:
/* 连接数据库 */
bool connect();

query

查询操作, 参数是string类型的sql语句, 返回值为MYSQL_RES, 即MySQL结果集类型

1
2
3
public:
/* 查询操作 */
MYSQL_RES * query(string sql);

update

更新操作, 参数是string类型的sql语句, 返回值为bool, 说明更新成功与否

1
2
3
public:
/* 更新操作 */
bool update(string sql);

代码实现

前置全局声明

1
2
3
4
5
6
#include <muduo/base/Logging.h>	//日志工具
/* 数据库配置信息 */
static string server = "127.0.0.1";
static string user = "root";
static string password = "123";
static string dbname = "chat";

构造

调用mysql_init, 实际上只是对mysql连接进行空间资源的开辟, 返回一个指针赋给m_conn成员, 没有真正连接, 因此传入nullptr

1
2
3
4
5
/* 初始化数据库连接 */
MySQL::MySQL()
{
m_conn = mysql_init(nullptr);
}

析构

调用mysql_close(m_conn), 对MySQL连接资源进行释放

1
2
3
4
5
6
7
8
/* 释放数据库连接资源 */
MySQL::~MySQL()
{
if(m_conn != nullptr)
{
mysql_close(m_conn);
}
}

getConnection

获取连接, 即返回m_conn成员

1
2
3
4
5
/* 获取连接 */
MYSQL * MySQL::getConnection()
{
return m_conn;
}

connect

连接数据库, 内部调用mysql_real_connect, 传入m_conn, 以及server地址, user号, 密码, 要连接的数据库name, 服务器端口;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* 连接数据库 */
bool MySQL::connect()
{
MYSQL *p = mysql_real_connect(m_conn, server.c_str(), user.c_str(),
password.c_str(), dbname.c_str(), 3306, nullptr, 0);
if(p != nullptr)
{
/* C/C++代码默认的编码字符是ASCII,如果不设置,则从MySQL上拉下来的中文无法正常显示 */
mysql_query(m_conn, "set name gbk");
LOG_INFO << "connect mysql success!";
}
else
{
LOG_INFO << "connect mysql failed!";
}
return p;
}

query

查询操作

  1. 内部调用mysql_query, 传入m_conn, sql-string的C风格字符串首址;
    1. mysql_query的返回值:
      1. 如果查询成功,返回0;
      2. 如果出现错误,返回非0值。
      3. 返回值需要调用mysql_use_result(m_conn)获取结果集, 再return
1
2
3
4
5
6
7
8
9
10
/* 查询操作 */
MYSQL_RES * MySQL::query(string sql)
{
if (mysql_query(m_conn, sql.c_str()))
{
LOG_INFO << __FILE__ << ":" << __LINE__ << ":" << sql << "查询失败!";
return nullptr;
}
return mysql_use_result(m_conn);
}
  1. update - 更新操作
    1. 内部调用mysql_query, 传入m_conn, sql-string的C风格字符串首址;
    2. 判断mysql_query的返回值, 若为非 0 则更新失败; 若为 0 则更新成功;
1
2
3
4
5
6
7
8
9
10
/* 更新操作 */
bool MySQL::update(string sql)
{
if (mysql_query(m_conn, sql.c_str()))
{
LOG_INFO << __FILE__ << ":" << __LINE__ << ":" << sql << "更新失败!";
return false;
}
return true;
}

Model层 - 对业务层封装底层数据库的操作

以User类的操作为例

User类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#ifndef USER_H
#define USER_H
#include<string>
using namespace std;
/**
* 属于映射类;
* 匹配User表的ORM类;
*/
class User
{
public:
User(int id = -1, string name="", string password="", string state="offline")
: id_(id), name_(name), password_(password), state_(state)
{
}
public:
void setId(int id){id_ = id;}
void setName(string name){name_ = name;}
void setPassword(string password){password_ = password;}
void setState(string state){state_ = state;}
public:
int getId() const{return id_;}
string getName() const{return name_;}
string getPassword() const{return password_;}
string getState() const{return state_;}
private:
int id_;
string name_;
string password_;
string state_;
};
#endif

UserModel类

  1. insert - 参数为User的引用, 返回值为bool
  2. query - 参数为id, 返回值为User
  3. updateState - 更新用户的状态信息, 参数为User的一个临时副本, 返回bool
  4. resetAllState - 重置所有用户的状态信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef USERMODEL_H
#define USERMODEL_H
#include"user.hpp"
/* User表的数据操作类 */
class UserModel
{
public:
/* User表的增加方法 */
bool insert(User &user);
public:
/* 根据用户号码查询用户信息 */
User query(int id);
public:
/* 更新用户的状态信息 */
bool updateState(User user);
/* 重置所有用户的状态信息 */
void resetAllState();
};
#endif

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include"usermodel.hpp"
#include"db.h"
#include<iostream>
using namespace std;
/* User表的增加方法 */
bool UserModel::insert(User &user)
{
/* 1.组装SQL语句 */
char sql[1024] = {0};
sprintf(sql, "insert into user(name, password, state) values('%s', '%s', '%s')",
user.getName().c_str(), user.getPassword().c_str(), user.getState().c_str());
MySQL mysql;
if(mysql.connect())
{
if(mysql.update(sql))
{
/* 获取插入成功的用户数据生成的主键id */
/* 以下的mysql_insert_id是生成id的方法之一 */
user.setId(mysql_insert_id(mysql.getConnection()));
return true;
}
}
/* 注册失败 */
return false;
}
/* 根据用户号码查询用户信息 */
User UserModel::query(int id)
{
/* 1.组装SQL语句 */
char sql[1024] = {0};
sprintf(sql, "select * from user where id = %d", id);
MySQL mysql;
if(mysql.connect())
{
/* mysql.query内部申请了资源,处理完成User的构造后,需要free */
MYSQL_RES *res = mysql.query(sql); // 此query为MySQL的query,和update同级。
if(res != nullptr)
{
MYSQL_ROW row = mysql_fetch_row(res);
if(row != nullptr)
{
User user;
user.setId(atoi(row[0]));
user.setName(row[1]);
user.setPassword(row[2]);
user.setState(row[3]);
mysql_free_result(res);
return user;
}
}
}
/* 如果没有有效的查询结果,返回一个默认User,id为-1,表示出错 */
return User();
}
/* 更新用户的状态信息 */
bool UserModel::updateState(User user)
{
/* 1.组装SQL语句 */
char sql[1024] = {0};
sprintf(sql, "update user set state = '%s' where id = %d", user.getState().c_str(), user.getId());
MySQL mysql;
if(mysql.connect())
{
if(mysql.update(sql))
{
return true;
}
}
return false;
}
/* 重置所有用户的状态信息 */
void UserModel::resetAllState()
{
char sql[1024] = "update user set state = 'offline' where state = 'online'";
MySQL mysql;
if(mysql.connect())
{
mysql.update(sql);
}
}

测试

点对点聊天

1
./ChatServer

点对点聊天的json格式:{"msgid":5,"from":"from_name","to":to_id,"msg":"......"}

1
2
3
# 注册
telnet 127.0.0.1 6000
{"msgid":3,"name":"test0511","password":"123"}
1
2
3
4
5
6
7
#登录后一对一聊天
{"msgid":1,"id":22,"password":"123"}
{"msgid":6,"from":"test0511","to":13,"msg":"hello zhang san, i am test0511!"}

{"msgid":1,"id":13,"password":"123456"}
{"msgid":6,"from":"zhang san","to":22,"msg":"hello test0511, i'm zhang san!"}

1
2
3
4
5
6
7
8
9
# xcg
telnet 127.0.0.1 6000
{"msgid":1,"id":22,"password":"123"} #登录
{"msgid":5,"from":"xcg","to":13,"msg":"hello zhang san, i'm xcg!"} #发送消息
# 发送离线消息
{"msgid":5,"from":"xcg","to":13,"msg":"hello - 1"}
{"msgid":5,"from":"xcg","to":13,"msg":"hello - 2"}
{"msgid":5,"from":"xcg","to":13,"msg":"hello - 3"}

1
2
3
4
5
# zhang san
telnet 127.0.0.1 6000
{"msgid":1,"id":13,"password":"123456"} #登录
{"msgid":6,"from":"zhang san","to":22,"msg":"hello xcg, i'm zhang san!"}#发送消息

好友业务

  1. 显示有哪些已添加的好友,id
  2. 添加好友

但是业务并不严格,只要知道其id即可聊天。

总体业务流程:向服务器发起添加好友的请求,服务器就把关系添加到friend表中,初版本不用征询对方的同意。

friend表就两个字段:useridfriendid是联合主键。

测试

添加好友、登陆成功后显示好友列表

1
./ChatServer

添加好友的JSON格式:{"msgid":6,"id":22,"friendid":13}。此语句意为:id为22的用户主动添加id为13的用户为好友,建立双向关系。

添加后,查看friend表中是否有信息,应有一个id:22 - friendid:13

1
select * from friend;
1
2
3
4
5
6
7
8
9
10
# xcg
telnet 127.0.0.1 6000
{"msgid":1,"id":22,"password":"123"} # 登录
{"msgid":6,"id":22,"friendid":13} # 添加id:13为好友

# Ctrl + ] -> quit 退出
# 重新登陆
telnet 127.0.0.1 6000
{"msgid":1,"id":22,"password":"123"} # 登录
# 看看是否返回好友列表

群组业务

  1. 管理员创建群
  2. 用户加入群
  3. 群聊

与群组业务相关的有两张表:一个是AllGroup表,一个是GroupUser表。

AllGroup表有三个字段:id、groupname、groupdesc(群组描述)

GroupUser表,因为群和成员之间是多对多的关系,所以需要此中间表来描述这个关系。有三个字段:groupiduseridgrouprole(成员在群中的权限)。

groupiduserid是联合主键。

这两张表都是处理群组业务的,所以对应的model只创建了一个。

model

groupmodel.hpp

负载均衡

负载均衡器, 亦叫做反向代理服务器, 在集群服务器架构中, 作为统一接收客户端请求的端口。

其根据配置所界定的负载算法,把客户端的请求分发到业务服务器上。

要做的三件事情:

  1. 把 client 的请求按照负载均衡算法分发到具体的业务服务器 ChatServer 上面;
    1. 相应地, 服务器的响应也要经过负载均衡器, 准确地返回给这个 client 。
    2. 服务器的响应消息,也可以通过服务器和客户端建立一个 IP 隧道实现,达到直接连接, 这样的效率更好;
  2. 能够和 ChatServer 保持心跳机制,监测 ChatServer 故障;
  3. 能够发现新添加的 ChatServer 设备,方便扩展服务器数量,最好是能够平滑地完成这个过程,而不是需要重启负载均衡服务器导致服务停止。

nginx负载均衡模块

本项目选择 Nginx 的 TCP 负载均衡模块,要解决的问题

  1. 如何进行 Nginx 源码编译,包含 TCP 负载均衡模块
  2. nginx.conf 配置文件中如何配置负载均衡
  3. Nginx 的平滑加载配置文件启动

nginx在1.9版本之前,只支持http协议web服务器的负载均衡,
从1.9版本开始以后,Nginx开始支持 TCP 的长连接负载均衡
但是 Nginx 默认不编译 TCP 负载均衡模块,
编写它时,需要加入--with-stream参数来激活这个模块。

编译安装流程(记得带--with-stream

nginx-1.12.2.tar.gz为例;

nginx编译安装需要先安装pcre、openssl、zlib等库。

1
sudo apt-get install libpcre3 libpcre3-dev

对开源产品发行源代码的编译安装, 一般都是:

  1. 先执行./configure, 生产相应的makefile文件;
  2. make, 进行编译
  3. make install, 进行安装

解压nginx-1.12.2.tar.gz后,进入nginx-1.12.2目录,
先运行./configure --with-stream生成Makefile后,
运行make,最后make install
make install命令会向系统路径拷贝文件,所以需要在root用户下执行。

编译完成后,默认安装在了/usr/local/nginx目录。

nginx目录下,可执行文件在sbin目录里,配置文件在conf目录里。

配置文件

如何配置负载均衡?

/usr/local/nginx/conf/nginx.conf中, 可以看到http字段, 这是基于http的负载均衡配置;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#user  nobody;
worker_processes 1;

events {
worker_connections 1024;
}

http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server {
listen 80;
server_name localhost;
location / {
root /var/www/hexo;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}

而本项目是 TCP 服务器, 需要写到stream下, 表示基于TCP的负载均衡配置;

  1. server字段
    1. listen - nginx负载均衡器将要监听的端口号
    2. proxy_pass - 所有在listen字段端口号上的请求都将分发到这个标记字段所填充到信息中
  2. upstream字段 - 可用于负载均衡的服务器信息
    1. server IP:Port weight=权重 max_fail=最多失败次数 fail_timeout=最长等待响应时间
1
2
3
4
5
6
7
8
9
10
11
12
# nginx tcp loadbalance config
stream {
upstream MyServer {
server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
}
server {
# proxy connect timeout 1s;
listen 8000;
proxy_pass MyServer;
}
}

常用操作

1
2
nginx -s reload		# 重新加载配置文件,平滑启动
nginx -s stop # 停止nginx服务

Nginx的网络模型设计(accpet锁解决惊群现象)(reactors in process - one loop per process)

我们上面的网络IO模块的 muduo 的网络设计是:reactors in threads - one loop per thread.

而 Nginx 服务器的网络模块设计是基于进程的,采用多个 Reactor 充当 I/O 进程和工作进程,通过一把 accept 锁完美解决多个 Reactors 的惊群现象。
因此Nginx服务器的模型是:reactors in process - one loop per process

更加高并发?把负载均衡器也集群!(LVS负载均衡器)

负载均衡也分为很多种,

  1. 可分为业务层负载均衡器,通过业务分发;
  2. 也可分为传输层的负载均衡器,通过UDP/TCP分发;
  3. 网络层的负载均衡器,通过IP分发;
  4. 数据链路层的负载均衡器,通过数据帧分发;

如何更进一步提高并发量?

可以把负载均衡器也进行集群处理, 前端使用一个偏底层的 LVS 负载均衡器, 即, 一台 LVS 加多台 Nginx 服务器的模型。

LVS的并发量很容易扩展到十几万;

Redis优化跨服务器通信

本项目中, 用户间的通信模型无非有两种:一对一聊天、群聊。

集群环境中,即用户可能分布在不同服务器主机上。

如果按照之前的代码逻辑,每台服务器上的一个 Server 都只有一个m_userConnectionMap(因为这个是ChatService中的成员,而ChatService是单例模式)。

所以用户给对方发消息后,如果接收方用户不在同一台服务器上,那么该消息就会被当作离线消息,这显然是不对的。

那么怎么解决跨服务器通信呢?

多个服务器广播通信(虽能解决,但是太差)

最直观的想法是让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。

这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的 socket 资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此不是一个好的设计。

Redis 消息队列 解耦设计

集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,
解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源。

所以答案是引入服务器中间件如消息队列。

如此一来, 服务器仅需做的工作是:向中间件发布订阅消息,之后等待中间件通知去取消息

在集群分布式环境中,经常使用的中间件消息队列有 ActiveMQ 、RabbitMQ 、Kafka 等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。

限于本项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此本项目中间件消息队列选型的是:基于发布-订阅模式的 Redis。

Redis环境安装和配置

  1. Ubuntu安装redis服务命令
1
sudo apt-get install redis-server
  1. 安装完成后会自动启动redis服务,通过ps命令确认; redis默认端口为6379
1
ps -ef | grep redis

redis-cli测试redis-server

启动redis-cli客户端,连接redis server体验一下数据缓存功能

1
redis-cli
1
2
3
4
127.0.0.1:6379> set "abc" "hello world" #设置key-value
OK
127.0.0.1:6379> get "abc"
"hello world"

Redis 订阅 / 发布

Redis 的发布-订阅机制:发布-订阅模式包含了两种角色,分别是消息的发布者和消息的订阅者。

订阅者可以订阅一个或者多个频道 channel ,发布者可以向指定的频道 channel 发送消息,所有订阅此频道的订阅者都会收到此消息。

订阅频道的命令是 subscribe,可以同时订阅多个频道,用法是subscribe channel1 [channel2] ...;

1
2
3
4
5
127.0.0.1:6379> subscribe 13
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "13"
3) (integer) 1
1
127.0.0.1:6379> publish 13 "hello, 13" #另一端推送消息给13

订阅了13频道 的用户收到的消息:

1
2
3
1) "message"
2) "13"
3) "hello, 13"

对应于本项目

由于服务器是集群化的, 所以登录到本系统的用户可能不在同一聊天服务器上, 需要观察Redis中间件来获取消息;

即, 用户是观察者, 消息队列是被观察者;

某一用户登陆到聊天系统后,

  1. 服务器需要向 Redis 订阅某一频道的消息, 这个频道的id号即为该用户的id号;
  2. 当该用户给另一用户发送消息时, 发现其不在本服务器中, 需要向该频道发布消息;

Redis发布-订阅的客户端编程 - 封装为Redis类

redis支持多种不同的客户端编程语言,例如 Java 对应 Jedis ,PHP对应 phpredis ,C++对应的是 hiredis

hiredis 安装步骤

  1. git clone https://github.com/redis/hiredis
  2. make && make install
1
2
3
4
5
6
cd hiredis
make
...
sudo make install #拷贝生成的动态库到/usr/local/lib目录下
# 如果提示没有找到hiredis动态库,则执行下面
# sudo ldconfig /usr/local/lib

成员变量

  1. hiredis同步上下文对象
    1. 一个专门负责publish消息
    2. 一个专门负责subscribe消息
1
2
3
4
5
private:
/* hiredis同步上下文对象, 负责publish消息 */
redisContext * m_publish_context;
/* hiredis同步上下文对象, 负责subscribe消息 */
redisContext * m_subscribe_context;
  1. 回调操作, 收到订阅的消息, 给service层上报
1
2
3
private:
/* 回调操作, 收到订阅的消息, 给service层上报 */
MessageCallback m_notify_handler;
  1. 对于"hiredis上下文对象"的理解:
  • 相当于一个redis-cli, 存储了连接相关的信息;
  1. 为什么要写两个上下文对象?
  • 如果上下文对象正在subscribe那么其将会阻塞, 所以 subscribe 和 publish 需要分开操作;

成员函数

构造/析构

1
2
3
public:
Redis();
~Redis();

connect

连接Redis服务器

1
2
3
public:
/* 连接Redis服务器 */
bool connect();

发布/订阅消息

1
2
3
4
5
6
7
public:
/* 向指定的redis频道发布消息 */
bool publish(int channel, string message);
/* 向指定的redis频道订阅消息 */
bool subscribe(int channel);
/* 向指定的redis频道取消订阅消息 */
bool unsubscribe(int channel);

observer_channel_message

在独立线程中接收订阅频道中的消息

1
2
3
public:
/* 在独立线程中接收订阅频道中的消息 */
void observer_channel_message();

init_notify_handler

初始化向业务层上报频道消息 的回调函数, 需要用到一个int(频道号), 一个消息内容字符串

1
2
3
4
public:
using MessageCallback = function<void(int, string)>;
/* 初始化向业务层上报频道消息 的回调函数, 需要用到int(频道号), 消息内容字符串 */
void init_notify_handler(MessageCallback cb);

代码实现

构造

只是对两个上下文对象指针赋nullptr, 没有实际构造

1
2
3
4
Redis::Redis()
: m_publish_context(nullptr), m_subscribe_context(nullptr)
{
}

析构

调用redisFree释放上下文对象资源

1
2
3
4
5
6
7
8
9
10
11
Redis::~Redis()
{
if(m_publish_context != nullptr)
{
redisFree(m_publish_context);
}
if(m_subscribe_context != nullptr)
{
redisFree(m_subscribe_context);
}
}

connect

  1. 对context进行实际的申请资源/构造, 返回指针赋给成员, 底层调用redisConnect
  2. 创建线程, 执行observer_channel_message, 即循环等待Redis频道的reply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bool Redis::connect()
{
m_publish_context = redisConnect("127.0.0.1", 6379);
if(nullptr == m_publish_context)
{
cerr << "connect redis failed!" << endl;
return false;
}

m_subscribe_context = redisConnect("127.0.0.1", 6379);
if(nullptr == m_subscribe_context)
{
cerr << "connect redis failed!" << endl;
return false;
}

/**
* 由于subscribe操作是阻塞的,
* 在实际的使用环境下, 不可能因为一个订阅操作去阻塞一个服务器,
* 所以要用一个单独的线程来完成监听频道上的事件,
* 有消息则给业务层进行上报;
*/
thread t(
[&]() { observer_channel_message();} );
t.detach();

cout << "connect redis-server success!" << endl;
return true;
}

publish

相当于向redis-server发送命令, reply接收命令执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
bool Redis::publish(int channel, string message)
{
/* 相当于向redis-server发送命令, reply接收命令执行结果 */
redisReply * reply = (redisReply*)redisCommand(
m_publish_context, "PUBLISH %d %s", channel, message.c_str());
if(nullptr == reply)
{
cerr << "publish command failed!" << endl;
return false;
}
freeReplyObject(reply);
return true;
}

subscribe

相当于把redisCommand细化了, 只操作了发命令, 接收结果交给单独的线程做了, 详见observer_channel_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool Redis::subscribe(int channel)
{
if(REDIS_ERR == redisAppendCommand(
m_subscribe_context, "SUBSCRIBE %d", channel))
{
cerr << "subscribe command failed!" << endl;
return false;
}
/* 循环发送缓冲区内容, 直到发送完毕 */
int done = 0;
while(!done)
{
if(REDIS_ERR == redisBufferWrite(m_subscribe_context, &done))
{
cerr << "subscribe command failed!" << endl;
return false;
}
}
/**
* 这里不做redisReply的操作,
* 这是个阻塞的操作, 放在observer_channel_message中做;
*/
return true;
}

unsubscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool Redis::unsubscribe(int channel)
{
if(REDIS_ERR == redisAppendCommand(
m_subscribe_context, "UNSUBSCRIBE %d", channel))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
/* 循环发送缓冲区内容, 直到发送完毕 */
int done = 0;
while(!done)
{
if(REDIS_ERR == redisBufferWrite(m_subscribe_context, &done))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
}
/**
* 这里不做redisReply的操作,
* 这是个阻塞的操作, 放在observer_channel_message中做;
*/
return true;
}

observer_channel_message

  • Redis频道如果有消息, 则有三个字段,
    1. 对应的是redisGetReply返回的reply->element[0],[1],[2];
    2. 本项目的element[1]对应的是频道号;
    3. 本项目的element[2]对应的是消息体;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void Redis::observer_channel_message()
{
redisReply *reply = nullptr;
int res = REDIS_ERR;
while(REDIS_OK==(res = redisGetReply(
m_subscribe_context, (void**)&reply)))
{
if(reply != nullptr &&
reply->element[2] != nullptr &&
reply->element[2]->str != nullptr)
{
/* 给业务层上报频道上发生的消息, 即频道号+消息体 */
m_notify_handler(atoi(reply->element[1]->str),
reply->element[2]->str );
}
freeReplyObject(reply);
}
if(res == REDIS_ERR)
{
cerr << "redisGetReply err" << endl;
return;
}
cerr << "observer_channel_message quit" << endl;
}

init_notify_handler

设置m_notify_handler回调

1
2
3
4
void Redis::init_notify_handler(MessageCallback cb)
{
m_notify_handler = cb;
}

redisCommand和redisAppendCommand的区别:

  1. redisAppendCommand只是把命令先写到本地缓存中;
  2. 写到缓存之后还需要调用redisBufferWrite把缓存中的命令发送到Redis服务器;
  3. 最后, 如果要获得reply, 还需要调用redisGetReply获取结果, 这个操作对于subscribe是阻塞的;
  4. 由于publish操作一般不会阻塞, 所以直接调用redisCommand;
  5. 由于subscribe操作最后的redisGetReply将会阻塞, 所以我们把这几个步骤单独写出来, 粒度减小, 追求效率;

ChatService加入Redis组件

首先, 需要在chatservice.hpp中, 引入头文件"redis.hpp"; 然后在ChatService的类成员变量中声明一个Redis m_redisredis操作对象;

1
2
private:
Redis m_redis;

在ChatService类中, 添加一个处理redis业务的成员函数handleRedisSubscribeMessage

1
2
public:
void handleRedisSubscribeMessage(int channel, string message);

chatservice.cpp中, ChatService的构造函数中, 需要添加连接redis服务器的操作;
如果连接成功, 给redis设置回调函数为handleRedisSubscribeMessage, 参数为chatservice对象指针, channel, message;

1
2
3
4
5
6
7
8
9
ChatService::ChatService()
{
// ...
if(m_redis.connect())
{
m_redis.init_notify_handler(std::bind(
&ChatService::handleRedisSubscribeMessage, this, _1, _2));
}
}

Cpp_原子类型_无锁队列

线程安全问题

1
2
3
int count = 0;
count++;
count--; /* 是线程不安全的 */

后置++有多个动作:

  1. 取出count的值为tmp
  2. 计算count + 1
  3. count + 1的结果赋给count
  4. 返回tmp

对应于多条指令。

传统地解决,可以用加锁的方式

1
2
3
4
{
lock_guard<std::mutex> guard(mtx);
count++;
}

但是,互斥锁是比较耗费资源的,如果临界区的代码比较轻量级,那么传统mutex锁相对而言就比较小题大做了。

现在有新机制解决:指令级并发。

如果想让这么多动作在一条指令内做完的话,需要让处理器支持这样的操作,而不用锁机制,因此这种叫做无锁结构。

原子类型

定义于<atomic>
原子类型,封装了一个值,保证其的访问不会导致数据竞争,并且可用于同步不同线程之间的内存访问。

atomic_flag

初始化可以赋值为ATOMIC_FLAG_INIT,意为无状态,对应false。

  1. test_and_set():置位为有状态,对应true,并返回调用之前的状态。这是一个原子操作,不会被其他线程干扰。
  2. clear()重置flag为无状态

atomic_flag做自旋锁

以下就是一个用atomic_flag做忙等待的例子:

  1. 初始flag为无状态
  2. 调用一次test_and_set()置位flag为有状态
  3. while中一直调用test_and_set(),测试它的状态,直到为无状态时,退出循环,结束程序。
1
2
3
4
5
6
7
8
9
10
11
12
#include <atomic>

int main()
{
std::atomic_flag flag = ATOMIC_FLAG_INIT;
flag.test_and_set();
while (flag.test_and_set())
{
continue;
}
return 0;
}

给定一个atomic_flag,初始为无状态,同时启动10个线程,操作之前,调用test_and_set,如果测试为无状态,说明没有其他线程在操作,就可以进行操作。
操作完毕后,clear重置flag为无状态。下一个线程就可以探测到无状态,开始它的操作。

此时,atomic_flag就相当于自旋锁的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

std::atomic_flag lock_output = ATOMIC_FLAG_INIT;
std::counting_semaphore<10> sema{ 0 };
void worker(int v)
{
// lock
while (lock_output.test_and_set())
{}
std::cout << "thread #" << v << std::endl;
// unlock
lock_output.clear();
}
int main()
{
std::jthread t[10];
for (int i = 0; i < 10; ++i)
{
t[i] = std::jthread(&worker, i + 1);
}
sema.release(10);
using namespace std::chrono_literals;
std::this_thread::sleep_for(5s);
return 0;
}

输出:

1
2
3
4
5
6
7
8
9
10
thread #1
thread #5
thread #4
thread #7
thread #2
thread #10
thread #6
thread #3
thread #9
thread #8

atomic

1
template <class T> struct atomic;

原子对象的主要特征是,从不同线程访问值不会导致数据竞争(即,这样做是明确定义的行为,访问顺序正确)。
通常,对于所有其他对象,如果同时访问同一对象而导致数据争用,则该操作将被视为未定义行为。

原子对象能够通过指定不同的内存顺序同步对其线程中其他非原子对象的访问

Relaxed Ordering的问题

1
2
3
4
5
6
// Thread 1:
r1 = y.load(std::memory_order_relaxed); // A
x.store(r1, std::memory_order_relaxed); // B
// Thread 2:
r2 = x.load(std::memory_order_relaxed); // C
y.store(42, std::memory_order_relaxed); // D

标记为 memory_order_relaxed 的原子操作不是同步操作。它们不会在并发内存访问中强加顺序,它们只保证原子性和修改顺序的一致性。
例如,x 和 y 初始为零。
以上程序就会允许产生 r1 == r2 == 42,在线程 1 内,A 在 B 之前被排序,并且在线程 2 内,C 在 D 之前被排序。
但是没有什么可以阻止 D 在 A 之前 修改了 y,并且 B 在 C 之前 修改 x 。
D 对 y 的副作用对于线程 1 中的 load A 是可见的,B 对 x 的副作用对于线程 2 中的 load C 是可见的。

特别是,如果在线程 2 中 D 在 C 之前完成,这可能会发生,这可能是在运行时发生的,或者由于编译器重新排序导致的。

实际上,A、B是不能调换的,因为编译器会看到,同在线程1中,B中的 r1 的值依赖于上一句的A对 r1 的操作。
而C、D之间就没有依赖了,因为 D 操作的是 42,是个常量。
所以,经过线程并发、内存重排,可能的执行顺序有:
A B C D 此时 x、y、r1、r2的值:0、0、0、42
A B D C 此时 x、y、r1、r2的值:0、42、0、0
A C D B 此时 x、y、r1、r2的值:0、42、0、0
A D C B 此时 x、y、r1、r2的值:0、42、0、0
C A D B 此时 x、y、r1、r2的值:0、42、0、0
D A C B 此时 x、y、r1、r2的值:42、42、42、42 => 这时便出现了r1 == r2 == 42
C D A B 此时 x、y、r1、r2的值:42、42、42、0
D C A B 此时 x、y、r1、r2的值:0、42、42、0

怎么解决呢?
见下文

内存顺序

原子操作只是提供了不同线程读写的同步。
但是没有提供操作顺序的同步

比如:线程1修改原子值a,线程2读取原子值a。

两个线程各自的读写操作,确实是保证了不会有脏值。
但是线程1、线程2的顺序没有做控制,
如果线程2想要读出旧值,但是线程1在线程2读值前进行写操作,还是会导致线程2读出脏值。

因此需要提供内存顺序机制。

下面举一个类似的例子,怎么通过原子变量+内存顺序控制,让consumer确保producer执行完毕后,再 load 出 b 的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void producer(void)
{
a.store(true, std::memory_order::relaxed);
b.store(true, std::memory_order::release);
}
void consumer(void)
{
// memory_order_acquire表示必须排序在x store、y store之后
while (!b.load(std::memory_order::acquire))
{}
if (a.load(std::memory_order::relaxed))
++c;
}
int main()
{
a = false;
b = false;
c = 0;
using namespace std::chrono::literals;
std::jthread th2(consumer);
std::this_thread::sleep_for(2s);

std::jthread th(producer);

th.join();
th2.join();
}
类型 含义
relaxed CPU和编译器可以重新排序变量顺序。
这是一种松散的内存顺序,不保证不同线程中的内存访问对原子操作进行排序。
consume(废弃) 针对某个原子变量的访问指令(store)重排到此指令(load)前。即自己load排到针对某个变量的release操作后。(在C++26中被废弃了,推荐用acquire)
acquire 所有访问指令(store)排到此指令(load)前。即自己load排到所有release操作后
release 所有访问指令(load)排到此指令(store)之后。即自己store排到所有consume、acquire操作之前。
扮演了一个同步点(synchronization point)的角色。
acq_rel The operation loads acquiring and stores releasing。
该操作可能扮演两种角色。
比如std::atomic::exchange操作,两个变量交换:
需要load值,可能需要等待release;
需要store值,即产生release,需要给其他acquire、consume通知。
seq_cst sequentially consistent,意思是该操作以顺序一致的方式排序。一旦所有可能对其他线程产生可见副作用的内存访问已经发生,则所有操作使用此内存顺序。
这是最严格的内存顺序,保证了在非原子内存访问中线程交互之间的意外副作用最小。
对于consume和acquire的load,顺序一致的store操作被认为是release操作。

lock_free测试

测试以确定原子模板包含的类型是否支持无锁。

1
2
3
4
5
int main()
{
std::wcout << std::boolalpha << std::atomic<int>{}.is_lock_free() << std::endl; // true
std::wcout << std::boolalpha << std::atomic<int>{}.is_always_lock_free << std::endl; // true
}

经过测试后发现,不超过8字节(64位)的数据结构,并且没有虚函数表的数据结构,是支持无锁的。

1
2
3
4
5
6
7
8
9
10
struct A {int x, y; A() {}}
struct B {int a[2];}
struct C {int a[3];}

int main()
{
std::wcout << std::boolalpha << std::atomic<A>{}.is_lock_free() << std::endl; // true
std::wcout << std::boolalpha << std::atomic<B>{}.is_lock_free() << std::endl; // true
std::wcout << std::boolalpha << std::atomic<C>{}.is_lock_free() << std::endl; // false
}

无锁编程的原理

CAS(Compare-And-Swap)

CAS在执行事务时把整个数据都拷贝到应用中,在数据更新提交的时候比较数据库中的数据与新数据,如果两个数据一模一样则表示没有冲突可以直接提交,如果有冲突就要交给业务逻辑去解决。(具有ABA问题:用版本号或时间戳解决)

现代CPU提供​​CAS(Compare-And-Swap)​​ 等原子指令,可在单个指令周期内完成类似以下的判断函数:

1
2
3
4
5
6
7
8
9
bool CAS(T* ptr, T old_val, T new_val)
{
if (*ptr == old_val)
{
*ptr = new_val;
return true;
}
return false;
}

以上类似的操作,实际指令执行期间不会被中断,避免了数据竞争。

比如,用这个特性,实现一个自旋锁:

1
2
3
4
5
6
int TestAndSet(int *old_ptr, int new)
{
int old = *old_ptr;
*old_ptr = new;
return old;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct lock_t
{
int flag;
} lock_t;
void init(lock_t *lock)
{
lock->flag = 0;
}
// 当 flag 为 0 时,函数返回false,退出。加锁成功。flag被修改为 1
void lock(lock_t *lock)
{
while (TestAndSet(&lock->flag, 1) == 1)
{} // do nothing
}
void unlock(lock_t *lock)
{
lock->flag = 0;
}

内存顺序

1
2
3
4
5
6
7
8
9
std::atomic<int> flag(0);
// 线程1
data = 42; // 1. 写数据
flag.store(1, std::memory_order_release); // 2. 释放屏障:保证 1 一定在 2 前完成

// 线程2
while (flag.load(std::memory_order_acquire) != 1)// 3. 获取屏障:保证读到 flag = 1 时,才退出循环
; // 一定能看到data=42
read(data); // 4. 安全读取数据

无锁数据结构的实现模式

”读-修改-写“循环(Read-Modify-Write Loop)

这是实现无锁的关键思维之一:乐观并发控制先执行操作,提交前,验证数据(tail)未被修改,如果未被修改,再提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool CAS(Node* old_ptr, Node* new_ptr)
{
if (this == old_ptr)
{
this = new_ptr;
return true;
}
return false;
}
void push(T value)
{
Node* new_node = new Node(value);
Node* old_tail;
do {
old_tail = tail.load(); // 读
new_node->next = old_tail; // 修改
} while (!tail.CAS(old_tail, new_node)); // 把 tail 改为 new_node(失败则重试)
}

帮助机制(Helping Mechanism)​

当线程A发现线程B的操作未完成时,主动协助推进(如无锁队列中帮其他线程移动tail指针)。

​​分离并发关注点(如头尾指针分离)​

将数据结构拆分为多个可独立更新的部分,减少竞争点。

这是实现无锁的关键思维之二:​​局部性原理(Locality Principle):通过数据分片(如分槽队列)减少缓存行冲突:
对齐64位。

1
2
3
4
struct alignas(64) PaddedAtomic
{
std::atomic<int> count; // 独占缓存行
};

ABA问题(乐观锁问题)

乐观锁会出现这种问题。

乐观锁是对于数据冲突保持一种乐观态度,操作数据时不会对操作的数据进行加锁(这使得多个任务可以并行的对数据进行操作),只有到数据提交的时候才通过一种机制来验证数据是否存在冲突(一般实现方式是通过加版本号然后进行版本号的对比方式实现)

特点:乐观锁是一种并发类型的锁,其本身不对数据进行加锁。而是通过业务实现锁的功能。
不对数据进行加锁就意味着允许多个请求同时访问数据,同时也省掉了对数据加锁和解锁的过程。
这种方式节省了悲观锁加锁的操作,所以可以一定程度的的提高操作的性能。
不过在并发非常高的情况下,会导致大量的请求冲突,冲突导致大部分操作无功而返而浪费资源。
所以在高并发的场景下,乐观锁的性能却反而不如悲观锁。

问题场景

比如说线程一从数据库中取出库存数 3,这时候线程二也从数据库中取出库存数 3。
线程二进行了一些操作变成了 2但是然后线程二在线程一拿到操作权之前,又将库存数变成了 3
这时候:线程一进行 CAS 操作发现数据库中仍然是 3,然后线程一操作成功。
尽管线程一的 CAS 操作成功,但是不代表这个过程就是没有问题的。

解决方案

使用版本戳来对数据进行标记,数据每发生一次修改,版本号就增加1。某条数据在提交的时候,如果数据库中的版本号与自己的一致,就说明数据没有发生修改,否则就认为是脏数据,需要处理。

使用带版本号的指针:

1
2
3
4
5
struct VersionedPtr
{
Node* ptr;
uint64_t version; // 每次修改+1
};

内存回收挑战

方法 原理 适用场景
Hazard Pointers 线程本地记录"危险指针"(有的也叫风险指针),延迟删除其他线程可能访问的内存 Linux内核常用
Epoch-Based Reclaim 将内存标记为待回收,当所有线程都不持有旧epoch时才删除 NVIDIA库常用
RCU(Read-Copy-Update) 读操作无同步,写操作创建副本延迟回收 Linux内核链表

MPMC无锁队列

Node的设计(伪共享预防、ABA问题解决)

  • ​伪共享预防​​:alignas(64)和填充确保 头尾 在不同缓存行
  • ABA问题防护​​:通过version版本号区分指针复用
1
2
3
4
5
6
7
8
9
10
11
12
13
struct Node
{
T* data; // 数据指针(避免拷贝开销)
std::atomic<Node*> next; // 原子指针
// 防止伪共享
char padding[64 - sizeof(T*) - sizeof(std::atomic<Node*>)];
};

struct VersionedPtr
{
Node* ptr;
uint64_t version; // ABA防护
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <atomic>
#include <memory>

template<typename T>
class LockFreeMPMCQueue {
private:
// 节点结构:带缓存行填充防止伪共享
struct Node
{
Node() : data(nullptr), next(nullptr)
{

}
T* data; // 存储实际数据指针(避免对象拷贝开销)
std::atomic<Node*> next; // 下一个节点指针
// 确保每个节点独占缓存行(64字节)
char padding[64 - sizeof(T*) - sizeof(std::atomic<Node*>)];
};

// 带版本号的指针(解决ABA问题)
struct alignas(16) VersionedPtr
{
Node* ptr;
uint64_t version;
};

// 保证头尾指针不在同一缓存行
struct
{
alignas(64) std::atomic<VersionedPtr> head;
char padding[64]; // 填充确保tail在下一个缓存行
};
alignas(64) std::atomic<VersionedPtr> tail;
public:
// ...
};

队列初始化(哨兵dummy头节点)

作用​​:保证非空队列始终存在头节点,避免头尾指针竞态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
// ...
public:
LockFreeMPMCQueue()
{
// 初始化哑元节点(dummy node)
Node* dummy = new Node();

// 头尾指针指向同一个节点
VersionedPtr init = {dummy, 0};
head.store(init, std::memory_order_relaxed);
tail.store(init, std::memory_order_relaxed);
}
// ...
};

入队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// ...
// 入队操作(多线程安全)
bool Enqueue(T value)
{
// 1. 创建新节点并填充数据
Node* new_node = new Node();
try
{
new_node->data = new T(std::move(value));
}
catch (...)
{
delete new_node;
return false;
}

// 2. CAS循环:找到真正的尾节点
VersionedPtr current_tail;
while (true)
{
// 原子获取当前尾指针
// current_tail是std::atomic<VersionedPtr>类型的
// 还要取出其成员 ptr。是 Node* 类型
current_tail = tail.load(std::memory_order_acquire);
Node* real_tail = current_tail.ptr;

// 定义新的 Node * next,先让它指向尾节点的 next
Node* next = (real_tail->next).load(std::memory_order_acquire);

// 检查期间尾指针是否变化
if (current_tail.ptr != tail.load(std::memory_order_acquire).ptr)
continue; // 发生变化则循环重试

if (next == nullptr)
{
// 情况A:尾节点后是空位置(理想情况)
VersionedPtr new_ptr{new_node, current_tail.version + 1};
// 尝试将新节点链接到尾节点后面
// 关键CAS:
// 如果成功,则 原子更新尾节点的 next 指针,即尾节点的 next 指向 new_node
// 如果失败,则 real_tail->next 的值 写入到 next 中(详见函数的作用)
// 总之不会阻塞。
if (real_tail->next.compare_exchange_weak(
next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
{
break; // 链接成功,退出循环
}
}
else
{
// 情况B:发现尾指针滞后,帮助其它线程推进
// 意思就是说,现在的 next 不是 null,说明这是一个其他人入队创建的节点
VersionedPtr other_ptr{next, current_tail.version + 1};

// 尝试更新尾指针到后继节点(避免竞争导致的阻塞)
// 即,把队列的 tail 改为 other_ptr
tail.compare_exchange_strong(
current_tail, other_ptr,
std::memory_order_release,
std::memory_order_relaxed);
}
}

// 3. 尝试更新全局尾指针(允许失败,因为有可能有人帮忙推进了)
// 即,把队列的 tail 改为 new_tail
VersionedPtr new_tail{new_node, current_tail.version + 1};
tail.compare_exchange_strong(
current_tail, new_tail,
std::memory_order_release,
std::memory_order_relaxed);

return true;
}
// ...

compare_exchange_weak和strong

1
2
3
4
5
6
bool compare_exchange_weak(T& expected, T desired,
std::memory_order success,
std::memory_order failure) noexcept;
bool compare_exchange_strong(T& expected, T desired,
std::memory_order success,
std::memory_order failure) noexcept;

两个函数都是:
Atomically compares the value representation (since C++20) of *this with that of expected. If those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in *this into expected (performs load operation).
比较*thisexpected的值,若相等,则把*this替换为desired(执行read-modify-write operation,即:读-修改-写),返回真。否则,把*this实际的值,存到expected中(执行load operation),返回假。

参数:
compare_exchange_weakcompare_exchange_strong都在后面有两个内存顺序的参数,第一个内存顺序指的是,成功时,做read‑modify‑write operation的内存顺序;第二个内存顺序指的是,失败时,做load operation的内存顺序。

与 compare_exchange_strong 不同的是,这个弱版本允许通过返回 false来虚假地失败,即使*expected 确实与 obj 中包含的值相等。
对于某些循环算法来说,这可能是可接受的行为,并且可能在某些平台上导致显著更好的性能。
对于这些虚假的失败,函数返回false,但不修改预期。

出队操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// ...
// 出队操作(多线程安全)
bool Dequeue(T& result)
{
VersionedPtr current_head;
Node* real_tail;
Node* next;

while (true)
{
// 1. 原子获取头尾指针(关键)
current_head = head.load(std::memory_order_acquire);
Node* real_head = current_head.ptr;

real_tail = tail.load(std::memory_order_acquire).ptr;

next = (real_head->next).load(std::memory_order_acquire);

// 一致性检查:防止读取过程中数据结构变化
if (current_head.ptr != head.load(std::memory_order_acquire).ptr)
continue;

// 2. 队列状态判断
if (real_head == real_tail)
{
if (next == nullptr)
{
// 情况A:队列为空
return false;
}

// 情况B:尾指针滞后,帮助推进
VersionedPtr new_tail{next, tail.load().version + 1};
tail.compare_exchange_strong(
tail.load(), new_tail,
std::memory_order_release,
std::memory_order_relaxed);
}
else
{
// 情况C:正常出队
// 移动数据前预先加载(减少持有锁时间)
// 注意,我们是有哑元节点的,现在获取到的head不是真实的head,而是dummy
// dummy的下一个才是真实有数据的head,因此在这里取next的data
T* data_ptr = next->data;

// 尝试移动头指针
VersionedPtr new_head{next, current_head.version + 1};
if (head.compare_exchange_strong(
current_head, new_head,
std::memory_order_release,
std::memory_order_relaxed))
{
// 出队成功:转移数据
result = std::move(*data_ptr);

// 安全删除旧头节点(工业实现需延迟回收)
delete data_ptr;
delete real_head; // 实际应使用危险指针回收
return true;
}
}
}
}
// ...

析构

1
2
3
4
5
6
7
8
9
10
11
12
// ...
~LockFreeMPMCQueue()
{
// 遍历删除所有节点(实际应用中需处理并发安全)
while (Node* node = head.load().ptr)
{
head.store({node->next, 0}, std::memory_order_relaxed);
delete node->data;
delete node;
}
}
// ...

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <atomic>
#include <memory>

template<typename T>
class LockFreeMPMCQueue {
private:
// 节点结构:带缓存行填充防止伪共享
struct Node {
Node() : data(nullptr), next(nullptr) {}

T* data; // 存储实际数据指针(避免对象拷贝开销)
std::atomic<Node*> next; // 下一个节点指针

// 确保每个节点独占缓存行(64字节)
char padding[64 - sizeof(T*) - sizeof(std::atomic<Node*>)];
};

// 带版本号的指针(解决ABA问题)
struct alignas(16) VersionedPtr {
Node* ptr;
uint64_t version;
};

// 保证头尾指针不在同一缓存行
struct {
alignas(64) std::atomic<VersionedPtr> head;
char padding[64]; // 填充确保tail在下一个缓存行
};
alignas(64) std::atomic<VersionedPtr> tail;

public:
LockFreeMPMCQueue() {
// 初始化哑元节点(dummy node)
Node* dummy = new Node();

// 头尾指针指向同一个节点
VersionedPtr init = {dummy, 0};
head.store(init, std::memory_order_relaxed);
tail.store(init, std::memory_order_relaxed);
}

~LockFreeMPMCQueue() {
// 遍历删除所有节点(实际应用中需处理并发安全)
while (Node* node = head.load().ptr) {
head.store({node->next, 0}, std::memory_order_relaxed);
delete node->data;
delete node;
}
}

// 入队操作(多线程安全)
bool Enqueue(T value) {
// 1. 创建新节点并填充数据
Node* new_node = new Node();
try {
new_node->data = new T(std::move(value));
} catch (...) {
delete new_node;
return false;
}

// 2. CAS循环:找到真正的尾节点
VersionedPtr current_tail;
while (true) {
// 原子获取当前尾指针
current_tail = tail.load(std::memory_order_acquire);
Node* real_tail = current_tail.ptr;

// 尝试将新节点链接到尾节点后面
Node* next = real_tail->next.load(std::memory_order_acquire);

// 检查期间尾指针是否变化
if (current_tail.ptr != tail.load(std::memory_order_acquire).ptr)
continue; // 发生变化则重试

if (next == nullptr) {
// 情况A:尾节点后是空位置(理想情况)
VersionedPtr new_ptr{new_node, current_tail.version + 1};

// 关键CAS:原子更新尾节点的next指针
if (real_tail->next.compare_exchange_weak(
next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
{
break; // 链接成功,退出循环
}
} else {
// 情况B:发现尾指针滞后,帮助其它线程推进
VersionedPtr new_ptr{next, current_tail.version + 1};

// 尝试更新尾指针到后继节点(避免竞争导致的阻塞)
tail.compare_exchange_strong(
current_tail, new_ptr,
std::memory_order_release,
std::memory_order_relaxed);
}
}

// 3. 尝试更新全局尾指针(允许失败)
VersionedPtr new_tail{new_node, current_tail.version + 1};
tail.compare_exchange_strong(
current_tail, new_tail,
std::memory_order_release,
std::memory_order_relaxed);

return true;
}

// 出队操作(多线程安全)
bool Dequeue(T& result) {
VersionedPtr current_head;
Node* real_tail;
Node* next;

while (true) {
// 1. 原子获取头尾指针(关键)
current_head = head.load(std::memory_order_acquire);
Node* real_head = current_head.ptr;
real_tail = tail.load(std::memory_order_acquire).ptr;
next = real_head->next.load(std::memory_order_acquire);

// 一致性检查:防止读取过程中数据结构变化
if (current_head.ptr != head.load(std::memory_order_acquire).ptr)
continue;

// 2. 队列状态判断
if (real_head == real_tail) {
if (next == nullptr) {
// 情况A:队列为空
return false;
}

// 情况B:尾指针滞后,帮助推进
VersionedPtr new_tail{next, tail.load().version + 1};
tail.compare_exchange_strong(
tail.load(), new_tail,
std::memory_order_release,
std::memory_order_relaxed);
} else {
// 情况C:正常出队
// 移动数据前预先加载(减少持有锁时间)
T* data_ptr = next->data;

// 尝试移动头指针
VersionedPtr new_head{next, current_head.version + 1};
if (head.compare_exchange_strong(
current_head, new_head,
std::memory_order_release,
std::memory_order_relaxed))
{
// 出队成功:转移数据
result = std::move(*data_ptr);

// 安全删除旧头节点(工业实现需延迟回收)
delete data_ptr;
delete real_head; // 实际应使用危险指针回收
return true;
}
}
}
}
};

内存回收(危险指针)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Hazard Pointer简单实现(线程本地注册)
thread_local std::vector<Node*> hp_records(2); // 通常每个线程2个足够

void RetireNode(Node* old)
{
// 检查其他线程是否引用该节点
if (!IsPointerHazard(old))
{
delete old;
}
else
{
// 加入待删除列表(延迟处理)
retired_list.push_back(old);
}
}

无锁是忙等待吗?

无锁编程并不等同于忙等待(Busy-Waiting),其核心在于「非阻塞」,而非具体等待方式。​​

  1. ​「非阻塞」的定义​
    • Lock-Free:至少一个线程能前进
    • Wait-Free:所有线程都能在有限步完成
  2. ​延迟与吞吐的权衡​
    • ​忙等待​​:牺牲CPU,以降低延迟(高频交易系统)
    • ​阻塞等待​​:加大延迟,以加大吞吐(Web服务器连接池)

无锁编程可​​根据竞争强度动态选择等待策略​​:

  1. 低竞争时短暂自旋(利用CPU流水线)
  2. 中竞争时退避+主动让出CPU
  3. 高竞争时进入操作系统阻塞队列

​核心目标是以最小开销维持「至少一个线程前进」的非阻塞特性​​,而非强制忙等待。

实际无锁设计中可通过多种策略避免忙等:

操作系统级阻塞等待

1
2
3
4
5
6
7
8
// C++20 前(使用 futex)
while (!atomic_var.compare_exchange_weak(...))
{
syscall(SYS_futex, &atomic_var, FUTEX_WAIT, ...); // 主动让出CPU
}

// C++20 后(std::atomic::wait)
atomic_var.wait(old_val, std::memory_order_relaxed); // 线程挂起直到值变化

定时退避策略

1
2
3
4
5
6
7
8
9
10
11
12
13
int retries = 0;
while (!CAS(ptr, old_val, new_val))
{
if (retries++ > MAX_SPIN)
{
std::this_thread::yield(); // 放弃时间片,通知调度器切换
retries = 0;
}
else
{
_mm_pause(); // x86 自旋等待指令(降低CPU功耗)
}
}

队列化竞争机制(排队锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct Waiter { std::atomic<Waiter*> next; };
std::atomic<Waiter*> tail_{nullptr};

void lock()
{
Waiter w;
w.next = nullptr;
Waiter* prev = tail_.exchange(&w, std::memory_order_acq_rel);
if (prev)
{
prev->next = &w; // 排队
while (w.next.load() != &w) {} // 等待前驱节点唤醒
}
}

void unlock()
{
Waiter* w = ...;
if (!w->next)
{ // 无后续等待者
if (tail_.compare_exchange_strong(w, nullptr)) return;
}
while (!w->next.load()) // 等待后继节点就绪
{}
// 唤醒下一个
w->next.load()->next.store(w->next.load()->next, ...);
}

对比

策略​ 实现方式 适用场景 CPU占用 延迟
​忙等待​ while(!CAS) 极低延迟场景(<100ns) 100%核心 极低
​主动退让​ std::this_thread::yield() 用户态竞争适中 < 30% 微秒级
​操作系统阻塞​ futex / atomic::wait 高竞争/长等待 ~0% 毫秒级
​队列化调度​ MCS锁等排队机制 严格公平性要求 随队列转移 亚毫秒级

工业级案例

Linux内核Futex

  • 首次CAS竞争失败后,通过FUTEX_WAIT系统调用挂起线程
  • 解锁时通过FUTEX_WAKE唤醒等待线程

​​Java的AQS(Abstract Queued Synchronizer)​

1
2
3
4
5
6
7
8
9
final boolean acquireQueued(...)
{
for (;;)
{
if (tryAcquire(arg)) return true; // 无锁尝试
if (shouldParkAfterFailedAcquire())
LockSupport.park(this); // 挂起线程(JVM层面)
}
}

C++20原子等待

1
2
3
4
5
6
std::atomic<int> flag(0);
// 消费者
while (!flag.wait(1, std::memory_order_seq_cst)); // 阻塞直至值!=1
// 生产者
flag.store(2);
flag.notify_all(); // 唤醒所有等待者

总线锁和缓存锁

总线锁(Bus Lock)

当CPU执行带LOCK前缀的指令(如CMPXCHG)时,会通过芯片组发出​​硬件信号​​,​​独占整个内存总线(Bus)​​ 。此时其他CPU的所有内存访问请求将被阻塞,直到当前操作完成。

1
2
3
4
5
sequenceDiagram
CPU核心A->>内存总线: LOCK#信号(总线锁定)
内存总线->>所有CPU: 阻塞其他内存请求
CPU核心A->>内存: 执行原子操作(如CAS)
内存总线->>所有CPU: 解锁

特点

  • ​全局性锁定​​:锁定期间所有内存操作均被阻塞(包括非竞争数据)
  • ​性能开销大​​:原子操作串行化,多核性能急剧下降
  • ​兼容性强​​:早期x86处理器的唯一选择(如80486)

缓存锁(Cache Locking)—— MESI优化

核心:缓存一致性协议(Cache Coherence)

现代CPU通过​​MESI协议​​(Modified/Exclusive/Shared/Invalid)维护多核缓存一致性:

1
2
3
4
5
graph LR
Modified -->|写回| Invalid
Exclusive -->|其他核读| Shared
Shared -->|写| Modified
Invalid -->|写| Exclusive

缓存锁的触发条件(以Intel CPU为例)

当原子操作访问的数据满足:

  1. ​对齐在缓存行内​​(通常64字节对齐)
  2. ​目标地址未跨缓存行​​(Non-split Access)
  3. ​CPU支持缓存锁定技术​​(几乎所有现代处理器)

工作流程

1
2
3
4
5
6
7
8
9
10
// 示例:两个线程在核 0 和核 1 上执行原子操作
// 初始状态:变量 X 被核 0 和核 1 缓存(Shared状态)
1.0执行原子操作(如X++):
- 发出RFO(Request For Ownership)消息
- 其他核心将X的缓存行置为Invalid
- 核0将缓存行置为Modified状态
- 执行修改(未触发总线锁)
2.1尝试操作X:
- 发现缓存失效(Invalid)
- 从核0的缓存中读取最新数据(缓存行状态转为Shared)

避免缓存冲突

内存对齐避免总线锁

1
2
3
4
5
6
7
8
9
10
11
12
// 错误:变量可能跨缓存行
struct Unaligned
{
char padding[62]; // 62字节填充
std::atomic<int> x; // 位于62-66字节(跨越两个缓存行)
};

// 正确:强制缓存行对齐
struct alignas(64) Aligned
{
std::atomic<int> x; // 独占一个缓存行
};

检测方法:Linux下perf c2c可检测缓存行冲突(False Sharing)

写竞争下的性能差异​

当两个CPU核心频繁写​​同一缓存行​​时:

  • ​缓存锁场景​​:缓存行在ModifiedInvalid状态间震荡,产生大量RFO消息
  • ​解决方案​​:​​伪共享隔离(False Sharing Elimination)
1
2
3
4
5
6
// 多核计数器优化(每个核独占缓存行)
struct PerCoreCounter
{
alignas(64) std::atomic<int> value;
};
PerCoreCounter counters[CPU_CORES];

特殊场景总线锁不可避免

1
2
3
LOCK XCHG [mem], reg  ; 显式LOCK前缀
CMPXCHG16B m128 ; 128位跨缓存行操作
一个未对齐的LOCK操作 ; 如对跨64字节边界的int操作

对比

​特征​ 总线锁 缓存锁
​锁定范围​ 整个内存总线 单个缓存行
​性能影响​ 全局停顿,性能损失严重 仅影响涉及特定缓存行的操作
​触发条件​ LOCK前缀指令 内存对齐且未跨缓存行的原子操作
​实现技术​ 硬件信号硬阻塞 MESI缓存一致性协议
​现代应用​ 仅作兜底(如跨缓存行操作) 99%原子操作的默认实现
​能耗​ 高(总线开关) 低(仅缓存状态切换)