无RPC的代码,怎么升级使用RPC框架
在/example/callee
中的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <iostream> class UserService { public: bool Login(std::string name, std::string pwd) { std::cout << "doing local service: Login" << std::endl; std::cout << "name: " << name << ", pwd: " << pwd << std::endl; } }; int main() { UserService us; us.Login("xcg", "123456"); }
|
如上,这是独立的代码,UserService可以进行Login处理。但是如果不使用RPC框架,则只能在本地被调用。
需要想办法,能让远程调用。
编写user.proto
在/example/
中的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| syntax = "proto3"; package xcg; option cc_generic_services = true; message ResultCode { int32 errcode = 1; bytes errmsg = 2; } message LoginRequest { bytes name = 1; bytes pwd = 2; } message LoginResponse { ResultCode result = 1; bool success = 2; } service UserServiceRpc { rpc Login(LoginRequest) returns(LoginResponse); }
|
使用protoc编译
1
| protoc user.proto --cpp_out=./
|
接下来,就是使用user.pb.h
。
服务提供者继承UserServiceRpc,实现rpc虚方法,业务处理,响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #include <iostream> #include "user.pb.h"
class UserService : public xcg::UserServiceRpc { public: bool Login(std::string name, std::string pwd) { std::cout << "doing local service: Login" << std::endl; std::cout << "name: " << name << ", pwd: " << pwd << std::endl; } void Login(::google::protobuf::RpcController* controller, const ::xcg::LoginRequest* request, ::xcg::LoginResponse* response, ::google::protobuf::Closure* done) { std::string name = request->name(); std::string pwd = request->pwd(); bool login_result = Login(name, pwd); xcg::ResultCode * result_code = response->mutable_result(); result_code->set_errcode(0); result_code->set_errmsg("ok!"); response->set_success(login_result); done->Run(); } };
|
这些重写了的虚函数,在哪里被调用呢?见google::protobuf::NewCallback<>
生成回调函数
服务调用方调用UserServiceRpc_Stub里protobuf给我们自动实现好的rpc方法
比如,UserServiceRpc_Stub
的Login
方法,是位于user.pb.h
中的。
而Login的调用最终会落到UserServiceRpc_Stub
的channel_
成员所含的方法CallMethod
。
要实例化UserServiceRpc_Stub
,需要一个MprpcChannel
对象来给它构造,并且这个MprpcChannel
要实现CallMethod
。
见Mprpcchannel类。
定义要调用的某个方法的request(如LoginRequest),填写参数。
定义要调用的某个方法的response(如LoginResponse),不填写参数。
定义UserServiceRpc_Stub
,通过MprpcChannel()
实例化。
之后通过stub发起远程调用Login
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include <iostream> #include "mprpcapplication.h" #include "user.pb.h" #include "mprpcchannel.h" int main(int argc, char ** argv) { MprpcApplication::Init(argc, argv);
xcg::LoginRequest request; request.set_name("zhang san"); request.set_pwd("123456"); xcg::LoginResponse response;
xcg::UserServiceRpc_Stub stub(new MprpcChannel());
stub.Login(nullptr, &request, &response, nullptr); if (response.result().errcode() == 0) { std::cout << "rpc login response success: " << response.success() << std::endl; } else { std::cout << "rpc login response error: " << response.result().errmsg() << std::endl; } return 0; }
|
rpc服务框架的启动入口?
rpc服务框架由各个服务节点(服务提供者)启动。比如UserService
,类外实现一个main函数,main函数中调用MprpcApplication::Init(argc, argv)
进行框架相关的启动,比如读配置文件。
之后,各个服务节点(服务提供者)可以NotifyService发布自己节点上的服务名、方法名。
之后,各个服务节点调用Run
,启动服务节点。
模拟rpc框架的使用,以分析框架需要什么东西
- 框架需要一个基础类,进行Init初始化,使用argc、argv参数。因为rpc服务器本身有ip地址、端口号,需要zookeeper的ip地址、端口号。从配置文件读。
- 框架需要提供一个可以发布服务的RpcProvider。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
#include "mprpcapplication.h" int main(int argc, char ** argv) { MprpcApplication::Init(argc, argv); RpcProvider provider; provider.NotifyService(new UserService()); provider.Run(); }
|
MprpcApplication类
在/src/include
中编写头文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #pragma once
class MprpcApplication { public: static void Init(int argc, char ** argv); static MprpcApplication& GetInstance() { static MprpcApplication app; return app; } private: MprpcApplication() {
} MprpcApplication(const MprpcApplication&) = delete; MprpcApplication(MprpcApplication&&) = delete; };
|
RpcProvider类
在/src/include
中编写头文件
1 2 3 4 5 6 7 8 9 10 11
| #pragma once #include "google/protobuf/service.h"
class RpcProvider { public: void NotifyService(::google::protobuf::Service * service); void Run(); };
|
CMakeLists.txt
根目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| cmake_minimum_required(VERSION 3.0) project(mprpc)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
include_directories(${PROJECT_SOURCE_DIR}/src/include) include_directories(${PROJECT_SOURCE_DIR}/example) link_directories(${PROJECT_SOURCE_DIR}/lib)
add_subdirectory(src)
add_subdirectory(example)
|
src生成的代码就是框架的部分。需要生成一个对外的so库。
src目录
1 2 3 4
| aux_source_directory(. SRC_LIST)
add_library(mprpc SHARED ${SRC_LIST})
|
example目录
1
| add_subdirectory(callee)
|
example/callee目录
1 2 3 4 5
| set(SRC_LIST userservice.cc ../user.pb.cc)
add_executable(provider ${SRC_LIST})
target_link_libraries(provider mprpc protobuf)
|
流程
- 编写protobuf文件,作为协议;
- 继承rpc服务类。实现rpc方法。
- 获取请求体的参数
- 本地处理业务
- 写入响应体
- 执行回调操作,完成响应数据的序列化,进行发送响应。
- 发布服务,启动服务
- 初始化框架
- 定义发布rpc服务的对象(provider)
- 启动rpc服务发布节点。
- 服务程序进入等待状态,接受远程rpc调用请求。
rpc框架的编写
MprpcApplication类
主要需要实现Init函数。
主要用于使用外部命令行传入的参数,以及读取参数中的配置文件,进行初始化。
- 可以单独封装一个MprpcConfig类。MprpcApplication类使用它进行config的读取。
- Init只需要进行一次,所以将MprpcApplication设计为单例模式。
- 由于Init是静态方法,Init方法使用到了类内对象MprpcConfig,因此MprpcConfig成员需要声明为static。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #pragma once #include "mprpcconfig.h"
class MprpcApplication { public: static void Init(int argc, char ** argv); static MprpcApplication& GetInstance(); private: static MprpcConfig m_config; MprpcApplication() {
} MprpcApplication(const MprpcApplication&) = delete; MprpcApplication(MprpcApplication&&) = delete; };
|
实现
- ShowArgsHelp函数用于提示用户正确的命令行参数格式
- Init函数主要实现
- 检查参数格式
- 通过getopt拿到参数
- 调用MprpcConfig成员的LoadConfigFile方法解析配置文件的信息,即获取rpc服务器和zookeeper的ip、端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| #include "mprpcapplication.h" #include <iostream> #include <unistd.h> MprpcConfig MprpcApplication::m_config; MprpcApplication& MprpcApplication::GetInstance() { static MprpcApplication app; return app; }
void ShowArgsHelp() { std::cout << "format: commad -i <configfile>" << std::endl; }
void MprpcApplication::Init(int argc, char ** argv) { if (argc < 2) { ShowArgsHelp(); exit(EXIT_FAILURE); } int c = 0; std::string config_file; while((c = getopt(argc, argv, "i:")) != -1) { switch (c) { case 'i': config_file = optarg; break; case '?': ShowArgsHelp(); exit(EXIT_FAILURE); case ':': ShowArgsHelp(); exit(EXIT_FAILURE); default: break; } } m_config.LoadConfigFile(config_file.c_str());
std::cout << "rpcserver_ip: " << m_config.Load("rpcserver_ip") << std::endl; std::cout << "rpcserver_port: " << m_config.Load("rpcserver_port") << std::endl; std::cout << "zookeeper_ip: " << m_config.Load("zookeeper_ip") << std::endl; std::cout << "zookeeper_port: " << m_config.Load("zookeeper_port") << std::endl; }
|
若测试的是注意读取配置文件可能出现的空格、注释、无效信息问题 中的配置文件,正确结果:
1 2 3 4 5
| mrcan@ubuntu:~/mprpc/bin$ ./provider -i test.conf rpcserver_ip: 127.0.0.1 rpcserver_port: 8000 zookeeper_ip: 127.0.0.1 zookeeper_port: 5000
|
MprpcConfig类
- 提供LoadConfigFile方法,负责解析加载配置文件,将key、value插入到自身的map中
- 提供Load方法,可以对外提供查询key读取value。
- 需要集成一个
unordered_map
记录key、value。
- Trim负责在解析配置文件过程中除去字符串前后的空格。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #pragma once #include <unordered_map> #include <string> class MprpcConfig { public: void LoadConfigFile(const char * config_file); std::string Load(const std::string& key); private: std::unordered_map<std::string, std::string> m_configMap; void Trim(std::string &str_buf); };
|
注意读取配置文件可能出现的空格、注释、无效信息问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| # rpc node ip rpcserver_ip = 127.0.0.1
# rpc node port rpcserver_port = 8000
# zookeeper ip zookeeper_ip =127.0.0.1
# zookeeper port zookeeper_port= 5000
|
如上,配置文件可能有:
- 注释
- 纯空行的换行符可以用gets过滤
- 配置项,以=分隔,前面是key,后面是value,可能前后各有空格
- key前后的空格可通过Trim处理,后面肯定没有换行符。
- value只能用Trim处理前面的空格,因为后面可能存在
空格或换行符
:
5000\n
:即没有空格,有一个换行符
5000 \n
:即有空格,有换行符
- 所以需要先去掉末尾的换行符,再进行Trim处理。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include "mprpcconfig.h" #include <iostream> void MprpcConfig::Trim(std::string &str_buf) { int idx = str_buf.find_first_not_of(' '); if (idx > 0) { str_buf = str_buf.substr(idx, str_buf.size() - idx); } idx = str_buf.find_last_not_of(' '); str_buf = str_buf.substr(0, idx + 1); } void MprpcConfig::LoadConfigFile(const char * config_file) { FILE *pf = fopen(config_file, "r"); if (nullptr == pf) { std::cout << config_file << " not exist!" << std::endl; exit(EXIT_FAILURE); } while (!feof(pf)) { char buf[512] = {0}; fgets(buf, 512, pf); std::string str_buf(buf); Trim(str_buf);
if (str_buf[0] == '#' || str_buf.empty()) { continue; } int idx = str_buf.find('='); if (idx == -1 || idx == 0) { continue; } std::string key; std::string value; key = str_buf.substr(0, idx); Trim(key);
int lineBreakIdx = str_buf.find('\n', idx + 1); value = str_buf.substr(idx + 1, lineBreakIdx - (idx + 1)); Trim(value);
m_configMap.insert({key, value}); } } std::string MprpcConfig::Load(const std::string& key) { auto it = m_configMap.find(key); if (it == m_configMap.end()) { return ""; } return it->second; }
|
修改MprpcApplication
向其中添加一个公有static方法GetConfig()
以便RpcProvider获得config对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #pragma once #include "mprpcconfig.h"
class MprpcApplication { public: static void Init(int argc, char ** argv); static MprpcApplication& GetInstance(); static MprpcConfig& GetConfig(); private: static MprpcConfig m_config; MprpcApplication() {
} MprpcApplication(const MprpcApplication&) = delete; MprpcApplication(MprpcApplication&&) = delete; };
|
RpcProvider类
根据[[#模拟rpc框架的使用,以分析框架需要什么东西]],框架需要提供一个可以发布服务的RpcProvider。
主要实现:
- NotifyService,是框架提供给外部使用的,可以发布Service
- Run,启动RPC服务节点
- Stop,停止服务
需要集成一个TcpServer,和搭配一个Eventloop(相当于epoll)。
但是经过考虑,TcpServer没有必要写为RpcProvider的成员变量,因为只在Run方法中使用。而且我们提供的是一个对外的框架,最好不要把TcpServer的配置、复杂的参数暴露给用户,尽量让用户简单易用。
而EventLoop可能不仅Run使用,Stop还需要调用它的quit。所以定义为成员变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #pragma once #include "google/protobuf/service.h" #include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h>
class RpcProvider { public: void NotifyService(::google::protobuf::Service * service); void Run(); private: muduo::net::EventLoop m_eventLoop; void onConnection(const muduo::net::TcpConnectionPtr&); void onMessage(const muduo::net::TcpConnectionPtr&, muduo::net::Buffer*, muduo::Timestamp); };
|
Run实现
Run主要实现TcpServer的配置、启动。
这属于RpcProvider的网络模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #include "rpcprovider.h" #include "mprpcapplication.h" #include <functional> void RpcProvider::Run() { std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserver_ip"); uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserver_port").c_str()); muduo::net::InetAddress address(ip, port); muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider"); server.setConnectionCallback(std::bind(&RpcProvider::onConnection, this, std::placeholders::_1)); server.setMessageCallback(std::bind(&RpcProvider::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
server.setThreadNum(4);
std::cout << "RpcProvider Start Service at IP: " << ip << ", port: " << port << std::endl;
server.start(); m_eventLoop.loop(); }
|
NotifyService的实现
发布Rpc服务。
是/example/callee/
下的如UserService程序调用的。
(集成的muduo网络库实现了:不光能调用本地的RpcProvider,而是可以远程调用网络上的任意一个RpcProvider)
怎么做到“发布”?“发布”的本质是什么?其实是记录在案,方便远端查询,并且定位某个方法。
那么我们就要在RpcProvider类中增加:m_serviceMap
,记录注册的每一个service名字和这个service对应的所有(n个)method。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| #pragma once #include "google/protobuf/service.h" #include <unordered_map>
class RpcProvider { public: private: struct ServiceInfo { google::protobuf::Service * m_service; std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap; }; std::unordered_map<std::string, ServiceInfo> m_serviceMap;
};
|
当远端一个Service服务器(比如UserService)调用RpcProvider的NotifyService时,便在里面记录信息,这就是注册,就是发布。
我们可以:
- 通过远端传入的Service指针,
service->GetDescriptor()
获得服务描述符ServiceDescriptor *
,
- 从而获得服务的名字
name()
和方法的数量method_count()
、每一个方法的描述符MethodDescriptor*
,
- 从而获得每一个方法的名字
name()
。
- 把所有方法的名字、方法的描述符记录在
m_methodMap
中。
- 把服务描述符和
m_methodMap
封装在ServiceInfo
结构体中。
- 之后,把服务的名字和这个
ServiceInfo
作为键值对插入到m_serviceMap
中。
需要引入<google/protobuf/descriptor.h>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include "rpcprovider.h" #include "mprpcapplication.h" #include <functional> #include <google/protobuf/descriptor.h> void RpcProvider::NotifyService(::google::protobuf::Service * service) { const google::protobuf::ServiceDescriptor * pserviceDesc = service->GetDescriptor();
ServiceInfo service_info; service_info.m_service = service; int methodCnt = pserviceDesc->method_count(); for(int i = 0; i < methodCnt; ++i) { const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i); std::string method_name = pmethodDesc->name(); std::cout << i << ". method_name: " << method_name << std::endl; service_info.m_methodMap.insert({method_name, pmethodDesc}); } std::string service_name = pserviceDesc->name(); std::cout << "service_name: " << service_name << std::endl; m_serviceMap.insert({service_name, service_info}); }
|
OnConnection
在有新的请求时,会触发OnConnection回调,用于处理连接后的操作。OnConnection的通知调用不是用户调用的,而是muduo库自动调用的。
我们暂时不用处理什么工作。只是判断一下连接状态是否正常,如果断开了则主动close。
1 2 3 4 5 6 7 8
| void RpcProvider::onConnection(const muduo::net::TcpConnectionPtr& conn) { if (!conn->connected()) { conn->shutdown(); } }
|
rpc的连接是短连接,客户端请求、服务端响应之后,服务端就会主动关闭连接。
OnMessage
消息是字节流的。
1 2 3 4 5 6 7 8
| // 框架内部,RpcProvider和RpcConsumer协商好通信用的protobuf数据类型 // 请求时需要提供 service_name method_name args // 需要处理包内的粘包问题,比如:UserService和Login以及args在一起,无法区分 // 因此需要header_size + header_str + args,先把名字部分和参数部分区分开 // 要注意需要区分args和下一个请求包的 包外粘包问题,需要在header_str中记录args_size // 可以通过protobuf定义这个RpcHeader的message类型,根据上述设计,包含了service_name、method_name、args_size // header_size指的是RpcHeader这个message包的字符串长度 // header_size(uint32_t 大小个字节)
|
在src下定义此文件(rpcheader.proto
)
1 2 3 4 5 6 7 8 9 10
| syntax = "proto3";
package mprpc;
message RpcHeader { bytes service_name = 1; bytes method_name = 2; uint32 args_size = 3; }
|
之后在此目录下用protoc命令生成cc和h文件。
1
| protoc rpcheader.proto --cpp_out=./
|
- 解析接收到的字符流
recv_buf
中的内容。
- 取出前4个字节(
sizeof uint32_t
),即header_size
。(header_size
指的是RpcHeader这个message包的字符串长度,包含了service_name
、method_name
、args_size
信息)。响应端通过string的copy
方法读出(相应的写入方法是insert方法,是请求端填写的)。
- 获取了
header_size
之后,在recv_buf
取出4字节之后的header_size
长度的字符串,存入rpc_header_str
。
- 用protobuf生成的RpcHeader的ParseFromString方法即可解析
rpc_header_str
,填入了RpcHeader对象。再调用RpcHeader对象方法service_name()
、method_name()
、args_size()
即可读出解析到的数据。
- 之后,recv_buf的
4字节 + header_size
之后的args_size
长度的字符串就是args的数据。可以同上述方法解析出内容。
需要引入"rpcheader.pb.h"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| void RpcProvider::onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp timestamp) { std::string recv_buf = buffer->retrieveAllAsString(); uint32_t header_size = 0; recv_buf.copy((char*)&header_size, sizeof(uint32_t), 0); std::string rpc_header_str = recv_buf.substr(sizeof(uint32_t), header_size); mprpc::RpcHeader rpcHeader; std::string service_name; std::string method_name; uint32_t args_size; if (rpcHeader.ParseFromString(rpc_header_str)) { service_name = rpcHeader.service_name(); method_name = rpcHeader.method_name(); args_size = rpcHeader.args_size(); } else { std::cout << "rpc_header_str: " << rpc_header_str << "parse error!" << std::endl; return; }
std::string args_str = recv_buf.substr(sizeof(uint32_t) + header_size, args_size);
std::cout << "==============================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "==============================" << std::endl; }
|
查表获取service对象和method对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
auto service_it = m_serviceMap.find(service_name); if (service_it == m_serviceMap.end()) { std::cout << service_name << " not exist!" << std::endl; return; } auto method_it = service_it->second.m_methodMap.find(method_name); if (method_it == service_it->second.m_methodMap.end()) { std::cout << method_name << " not exist in " << service_name << std::endl; return; }
google::protobuf::Service *service = service_it->second.m_service; const google::protobuf::MethodDescriptor *method = method_it->second;
|
service->GetRequestPrototype(method).New()
生成request和response
1 2 3 4 5 6 7 8 9 10 11
|
google::protobuf::Message *request = service->GetRequestPrototype(method).New(); if (!request->ParseFromString(args_str)) { std::cout << "request parse args_str error! args_str: " << args_str << std::endl; return; } google::protobuf::Message *response = service->GetResponsePrototype(method).New();
|
google::protobuf::NewCallback<>
生成回调函数
1 2 3 4 5 6 7 8 9 10
| google::protobuf::Closure* done = google::protobuf::NewCallback <RpcProvider, const muduo::net::TcpConnectionPtr&, google::protobuf::Message*> (this, &RpcProvider::SendRpcResponse, conn, response);
service->CallMethod(method, nullptr, request, response, done); }
|
编写RpcProvider::SendRpcResponse()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response) { std::string response_str; if (response->SerializeToString(&response_str)) { conn->send(response_str); } else { std::cout << "Serialize response_str error!" << std::endl; } conn->shutdown(); }
|
Mprpcchannel类
1 2 3 4 5 6 7 8 9 10 11 12 13
| #pragma once #include <google/protobuf/service.h> #include <google/protobuf/descriptor.h> #include <google/protobuf/message.h> class MprpcChannel : public google::protobuf::RpcChannel { public: void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, const google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done); };
|
在这个方法中,处理服务调用方的请求包。
这个请求包只包含rpc调用的函数参数(args),不包含服务名、方法名。服务名、方法名可以用method
参数查到。
把请求包(只包含args)序列化,得到序列化之后的args字符流args_str
长度args_size
。
填写 RpcHeader(service_name
、method_name
、args_size
),序列化后得到 RpcHeader的字符流rpc_header_str
长度header_size
之后按照约定好的格式,把header_size
、rpc_header_str
、args_str
格式化填充、拼接到send_rpc_str
。
之后便是TCP客户端编程,把send_rpc_str
发送到配置文件中的远端ip、端口。
等待远端响应的数据,解析响应数据。填入了response。
CallMethod结束,Channel的使命结束,Stub的使命结束。
调用栈就回到了服务调用方的主程序中,输出response。
见服务调用者主程序
服务调用方是通过Channel得知远端服务提供方的ip、端口的。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| #include "mprpcchannel.h" #include "rpcheader.pb.h" #include "mprpcapplication.h" #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <error.h>
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, const google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done) { const google::protobuf::ServiceDescriptor *sd = method->service(); std::string service_name = sd->name(); std::string method_name = method->name(); uint32_t args_size = 0; std::string args_str; if (request->SerializeToString(&args_str)) { args_size = args_str.size(); } else { std::cout << "serialize request error!" << std::endl; return; } mprpc::RpcHeader rpcHeader; rpcHeader.set_service_name(service_name); rpcHeader.set_method_name(method_name); rpcHeader.set_args_size(args_size); uint32_t header_size = 0; std::string rpc_header_str; if (rpcHeader.SerializeToString(&rpc_header_str)) { header_size = rpc_header_str.size(); } else { std::cout << "serialize request error!" << std::endl; return; }
std::string send_rpc_str; send_rpc_str.insert(0, std::string((char*)&header_size, 4)); send_rpc_str += rpc_header_str; send_rpc_str += args_str;
std::cout << "==============================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str: " << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "==============================" << std::endl;
int cliendfd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == cliendfd) { std::cout << "Create Socket Error: " << errno << std::endl; exit(EXIT_FAILURE); }
std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserver_ip"); uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserver_port").c_str()); struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); if (-1 == inet_pton(AF_INET, ip.c_str(), &(server_addr.sin_addr.s_addr))) { std::cout << "inet_pton Error: " << errno << std::endl; close(cliendfd); exit(EXIT_FAILURE); } if (-1 == connect(cliendfd, (struct sockaddr*)&server_addr, sizeof(server_addr))) { std::cout << "connect Error: " << errno << std::endl; close(cliendfd); exit(EXIT_FAILURE); } if (-1 == send(cliendfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)) { std::cout << "send Error: " << errno << std::endl; close(cliendfd); exit(EXIT_FAILURE); } char recv_buff[1024] = {0}; int recv_size = 0; if (-1 == (recv_size = recv(cliendfd, recv_buff, sizeof(recv_buff), 0))) { std::cout << "recv Error: " << errno << std::endl; close(cliendfd); exit(EXIT_FAILURE); } if (!response->ParseFromArray(recv_buff, recv_size)) { std::cout << "parse response_str error! response_str: " << recv_buff << std::endl; close(cliendfd); return; } close(cliendfd); }
|
MprpcController 的作用
用于记录rpc调用过程中的状态信息。
试想,如果请求过程中,或者响应过程中,某一环节除了问题,那么服务调用方就拿不到response任何内容了,那么它去看了个寂寞?也不清楚是哪个环节出错了。这时候可以在参数中传入mprpccontroller
的指针,让对应的MprpcController对象在内部记录状态信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #pragma once #include <google/protobuf/service.h> class MprpcController : public google::protobuf::RpcController { public: MprpcController(); void Reset(); bool Failed() const; std::string ErrorText() const; void SetFailed(const std::string& reason);
void StartCancel(); bool IsCanceled() const; void NotifyOnCancel(google::protobuf::Closure *callback); private: bool m_failed; std::string m_errText; };
|
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include "mprpccontroller.h" MprpcController::MprpcController() { m_failed = false; m_errText = ""; } void MprpcController::Reset() { m_failed = false; m_errText = ""; } bool MprpcController::Failed() const { return m_failed; } std::string MprpcController::ErrorText() const { return m_errText; } void MprpcController::SetFailed(const std::string& reason) { m_failed = true; m_errText = reason; } void MprpcController::StartCancel(){} bool MprpcController::IsCanceled() const { return false; } void MprpcController::NotifyOnCancel(google::protobuf::Closure *callback){}
|
如何使用
比如在calluserservice.cc
下,可以:
在stub调用某方法前,定义一个MprpcController对象,之后把其指针传到调用方法的函数参数内。
在读取response前,先判断controller的状态信息。如果错误就不读了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #include "mprpccontroller.h"
MprpcController controller; stub.Login(&controller, &request, &response, nullptr); if (controller.Failed()) { std::cout << controller.ErrorText() << std::endl; } else { if (response.result().errcode() == 0) { std::cout << "rpc login response success: " << response.success() << std::endl; } else { std::cout << "rpc login response error: " << response.result().errmsg() << std::endl; } } return 0; }
|
在mprpcchannel.cc
中,有可能出现异常的地方,均用controller记录。
别忘了在src下的CMakeLists中的set(SRC_LIST ... )
后面添加上mprpccontroller.cc
。
日志类
编译
框架依赖muduo库,因此需要在src目录下的CMakeLists.txt
添加:
如果忘记了库叫什么名字,可以在/usr/lib
或/usr/local/lib
下查找:
1
| sudo find /usr -name "libmuduo*"
|
1 2 3 4
| /usr/local/lib/libmuduo_http.a /usr/local/lib/libmuduo_net.a /usr/local/lib/libmuduo_base.a /usr/local/lib/libmuduo_inspect.a
|
我们需要的muduo的库名:muduo_net、muduo_base
1 2 3 4 5 6
| aux_source_directory(. SRC_LIST)
add_library(mprpc SHARED ${SRC_LIST})
target_link_libraries(mprpc muduo_net muduo_base pthread)
|
由于我们在编译muduo时编译成了静态库,而我们现在CMake指定生成的是动态库(add_library(mprpc SHARED ${SRC_LIST})
)。
由于里面有静态库成分,所以编译链接时会报错,因此我们暂时先生成静态库。
1 2 3 4 5 6
| aux_source_directory(. SRC_LIST)
add_library(mprpc ${SRC_LIST})
target_link_libraries(mprpc muduo_net muduo_base pthread)
|
测试:
1 2 3 4 5 6
| mrcan@ubuntu:~/mprpc/bin$ ./provider -i test.conf rpcserver_ip: 127.0.0.1 rpcserver_port: 8000 zookeeper_ip: 127.0.0.1 zookeeper_port: 5000 RpcProvider Start Service at IP: 127.0.0.1, port: 8000
|
aux_source_directory
存在的问题
~/mprpc/src
下的CMakeLists.txt
原本是如下定义的:
1 2 3 4 5 6
| aux_source_directory(. SRC_LIST)
add_library(mprpc ${SRC_LIST})
target_link_libraries(mprpc muduo_net muduo_base pthread)
|
aux_source_directory(. SRC_LIST)
表示给当前文件夹下的所有内容起了别名SRC_LIST
。
一开始构建、编译没什么问题,但后面发现如果我们给src目录增加.cc
文件后再进行编译则无法找到这些新增的文件。
说明build只记录了当时src
目录下的旧文件,而没有记录新增的文件。
所以我们还是这么写吧:
1 2 3 4 5 6 7
| set(SRC_LIST mprpcapplication.cc mprpcconfig.cc rpcheader.cc rpcprovider.cc)
add_library(mprpc ${SRC_LIST})
target_link_libraries(mprpc muduo_net muduo_base pthread)
|
调试
在顶级cmake写下:set(CMAKE_BUILD_TYPE "Debug")

运行:

break加断点,由于我们是生成了so动态链接库,所以会有如下提示:
1
| break mprpcconfig.cc:第n行
|

加完断点后,run,运行程序。(gdb调试时,如果有参数,正确启动方式是:run arg1 arg2
)
结果:

调试过程中,可以敲入bt查看栈帧信息,l查看源代码信息。
bt

l

敲入n可以单步执行。

p + 变量名,可以打印变量的值。

q退出调试
zookeeper
channel一开始不知道服务节点的ip、port,需要先去查找。