0. 简介
gRPC 一开始由 google 开发,高性能、开源、支持多语言的 RPC 框架。
在gRPC框架中,运行在不同机器上的客户端应用可以直接调用服务器端上提供的方法,使得我们可以更容易的创建一个分布式系统。
gRPC基于定义服务(Service)的思想,指定可以使用其参数和返回类型远程调用的方法;在服务器端,服务器实现这个接口并运行一个gRPC服务器来处理客户端调用;在客户端,客户端有一个存根(stub),它提供与服务器相同的方法。
相较于ROS需要在同一网段且广播分发的形式来说,gRPC的点到点分发是ROS无法实现的,所以如何完成ROS与gRPC的结合,是本文讲述的重点。
1. gRPC与C++
gRPC官方文档:英文原文
gRPC 官方文档中文版 v1.0: 上面文档的中文翻译版本,开源中国提供。
首先一上来我们给出两个学习gRPC的常用网站,其中中文网站翻译的质量不是太高,但是对于初学者来说已经足够。
1.1 定义服务
如译文所说,gRPC默认使用protocol buffers,是Google开源的一种简洁、高效的结构化数据存储格式。使用protocol的第一步是在proto文件(.proto)中定义 gRPC service 和方法 request 以及 response 的类型。你可以在examples/protos/route_guide.proto看到完整的 .proto 文件,作者的这篇文章详细介绍了protobuf的内容。
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}
然后,我们可以使用protoc编译器来生成指定语言的数据结构(比如cpp中上述定于会生成Person类),protocol将会对每个成员提供简单的访问方法,比如 name() 和 set_name(),这些方法将会完成原始数据和序列化数据的转换。
1.2 安装gRPC和Protocol buffers
设置
export MY_INSTALL_DIR=$HOME/develop/gRPC
mkdir -p $MY_INSTALL_DIR
添加到环境变量
export PATH="$MY_INSTALL_DIR/bin:$PATH"
安装cmake(> 3.13)
wget -q -O cmake-linux.sh https://github.com/Kitware/CMake/releases/download/v3.19.6/cmake-3.19.6-Linux-x86_64.sh
sh cmake-linux.sh -- --skip-license --prefix=$MY_INSTALL_DIR
rm cmake-linux.sh
安装其它工具
sudo apt-get install -y build-essential autoconf libtool pkg-config
拉取gRPC源码
git clone --recurse-submodules -b v1.41.0 https://github.com/grpc/grpc
编译并安装protocol buffers
cd grpc
mkdir -p cmake/build
pushd cmake/build
cmake -DgRPC_INSTALL=ON \
-DgRPC_BUILD_TESTS=OFF \
-DCMAKE_INSTALL_PREFIX=$MY_INSTALL_DIR \
../..
make -j8
make install
popd
1.3 RPC类型
一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
一个 双向流式 RPC ,是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响应类型-比如,下面的Point消息类型:
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
option objc_class_prefix = "RTG";
package routeguide;
// Interface exported by the server.
service RouteGuide {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// A feature with an empty name is returned if there's no feature at the given
// position.
rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
// The name of the feature.
string name = 1;
// The point where the feature is detected.
Point location = 2;
}
// A RouteNote is a message sent while at a given point.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
2. gPRC编译与修改
2.1 生成客户端和服务器端代码
接下来我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口。我们通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC C++ 插件来完成。
简单起见,我们提供一个 makefile 帮您用合适的插件,输入,输出去运行 protoc(如果你想自己去运行,确保你已经安装了 protoc,并且请遵循下面的 gRPC 代码安装指南)来操作:
$ make route_guide.grpc.pb.cc route_guide.pb.cc
实际上运行的是:
$ protoc -I ../../protos --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ../../protos/route_guide.proto
$ protoc -I ../../protos --cpp_out=. ../../protos/route_guide.proto
运行这个命令可以在当前目录中生成下面的文件:
route_guide.pb.h
, 声明生成的消息类的头文件route_guide.pb.cc
, 包含消息类的实现route_guide.grpc.pb.h
, 声明你生成的服务类的头文件route_guide.grpc.pb.cc
, 包含服务类的实现
这些包括:
- 所有的填充,序列化和获取我们请求和响应消息类型的 protocol buffer 代码
- 名为
RouteGuide
的类,包含- 为了客户端去调用定义在
RouteGuide
服务的远程接口类型(或者 存根 ) - 让服务器去实现的两个抽象接口,同时包括定义在
RouteGuide
中的方法。
- 为了客户端去调用定义在
2.2 创建服务器
实现RouteGuide
首先来看看我们如何创建一个 RouteGuide
服务器。让 RouteGuide
服务工作有两个部分:
- 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
- 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。
你可以从examples/cpp/route_guide/route_guide_server.cc看到我们的 RouteGuide
服务器的实现代码。现在让我们近距离研究它是如何工作的。
我们可以看出,服务器有一个实现了生成的 RouteGuide::Service 接口的 RouteGuideImpl 类:
class RouteGuideImpl final : public RouteGuide::Service {
...
}
在这个场景下,我们正在实现 同步 版本的RouteGuide
,它提供了 gRPC 服务器缺省的行为。同时,也有可能去实现一个异步的接口 RouteGuide::AsyncService
,它允许你进一步定制服务器线程的行为,虽然在本教程中我们并不关注这点。
RouteGuideImpl
实现了所有的服务方法。让我们先来看看最简单的类型 GetFeature
,它从客户端拿到一个 Point 然后将对应的特性返回给数据库中的 Feature。
Status GetFeature(ServerContext* context, const Point* point,
Feature* feature) override {
feature->set_name(GetFeatureName(*point, feature_list_));
feature->mutable_location()——>CopyFrom(*point);
return Status::OK;
}
这个方法为 RPC 传递了一个上下文对象,包含了客户端的 Point protocol buffer
请求以及一个填充响应信息的Feature protocol buffer
。在这个方法中,我们用适当的信息填充 Feature
,然后返回OK的状态,告诉 gRPC
我们已经处理完 RPC,并且 Feature 可以返回给客户端。
启动服务器
一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们RouteGuide服务中实现的过程:
void RunServer(const std::string& db_path) {
std::string server_address("0.0.0.0:50051");
RouteGuideImpl service(db_path);
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
如你所见,我们通过使用ServerBuilder去构建和启动服务器。为了做到这点,我们需要:
- 创建我们的服务实现类 RouteGuideImpl 的一个实例。
- 创建工厂类 ServerBuilder 的一个实例。
- 在生成器的 AddListeningPort() 方法中指定客户端请求时监听的地址和端口。
- 用生成器注册我们的服务实现。
- 调用生成器的 BuildAndStart() 方法为我们的服务创建和启动一个RPC服务器。
- 调用服务器的 Wait() 方法实现阻塞等待,直到进程被杀死或者 Shutdown() 被调用。
2.3 创建客户端
在这部分,我们将尝试为RouteGuide服务创建一个C++的客户端。你可以从examples/cpp/route_guide/route_guide_client.cc看到我们完整的客户端例子代码.
创建一个存根
为了能调用服务的方法,我们得先创建一个 存根。
首先需要为我们的存根创建一个gRPC channel
,指定我们想连接的服务器地址和端口,以及 channel 相关的参数——在本例中我们使用了缺省的 ChannelArguments
并且没有使用SSL:
grpc::CreateChannel("localhost:50051", grpc::InsecureCredentials(), ChannelArguments());
现在我们可以利用channel,使用从.proto中生成的RouteGuide类提供的NewStub方法去创建存根。
public:
RouteGuideClient(std::shared_ptr<ChannelInterface> channel,
const std::string& db)
: stub_(RouteGuide::NewStub(channel)) {
...
}
调用服务的方法
现在我们来看看如何调用服务的方法。注意,在本教程中调用的方法,都是 阻塞/同步 的版本:这意味着 RPC 调用会等待服务器响应,要么返回响应,要么引起一个异常。
调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。
Point point;
Feature feature;
point = MakePoint(409146138, -746188906);
GetOneFeature(point, &feature);
...
bool GetOneFeature(const Point& point, Feature* feature) {
ClientContext context;
bool result;
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->async()->GetFeature(
&context, &point, feature,
[&result, &mu, &cv, &done, feature, this](Status status) {
bool ret;
if (!status.ok()) {
std::cout << "GetFeature rpc failed." << std::endl;
ret = false;
} else if (!feature->has_location()) {
std::cout << "Server returns incomplete feature." << std::endl;
ret = false;
} else if (feature->name().empty()) {
std::cout << "Found no feature at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_
<< std::endl;
ret = true;
} else {
std::cout << "Found feature called " << feature->name() << " at "
<< feature->location().latitude() / kCoordFactor_ << ", "
<< feature->location().longitude() / kCoordFactor_
<< std::endl;
ret = true;
}
std::lock_guard<std::mutex> lock(mu);
result = ret;
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> lock(mu);
cv.wait(lock, [&done] { return done; });
return result;
}
如你所见,我们创建并且填充了一个请求的 protocol buffer 对象(例子中为 Point),同时为了服务器填写创建了一个响应 protocol buffer 对象。为了调用我们还创建了一个 ClientContext 对象——你可以随意的设置该对象上的配置的值,比如期限,虽然现在我们会使用缺省的设置。注意,你不能在不同的调用间重复使用这个对象。最后,我们在存根上调用这个方法,将其传给上下文,请求以及响应。如果方法的返回是OK,那么我们就可以从服务器从我们的响应对象中读取响应信息。
这里也提供一个更易读的方法
std::string SayHelloAgain(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
Status status = stub_->SayHelloAgain(&context, request, &reply);
// Act upon its status.
if (status.ok()) {
return reply.message();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC failed";
}
}
到此我们讲完了Cpp中gRPC的基础操作。
3. ROS与gPRC
这部分是本文的重点。我们可以看一下一个开源项目ros-grpc-bridge-generator,其就完成了gPRC指令与ROS的交互。ROS也为gPRC提供了简单的安装方法,而不需要自行构建
sudo apt install ros-noetic-grpc
而我们更期望的是想知道如何自己写一个gRPC与ROS的交互程序。首先我们需要在CMakeLists.txt中包含gRPC。同时这里的generate_proto
完成了proto和gRPC头文件的生成。
cmake_minimum_required(VERSION 2.8.3)
project(greeter)
find_package(catkin REQUIRED COMPONENTS grpc)
#---------------------------------------^^^^
# Besure to "find_package" "grpc"
catkin_package(
CATKIN_DEPENDS grpc
)
# Generate protobuf header/library only.
generate_proto(hello_msg_proto proto/subdir/hello_msgs.proto)
# -------------^^^^^^^^^^^^^^^ -- This is a target (library) name.
# Generate protobuf header/library along with grpc header/library.
generate_proto(hello_proto GRPC proto/hello.proto)
# -------------------------^^^^ -- Also generates gRPC header/source.
# hello_proto depends on hello_msg_proto.
target_link_libraries(hello_proto hello_msg_proto)
add_executable(greeter_cc_server src/greeter_cc_server.cc)
# Links hello_proto to greeter_cc_server.
target_link_libraries(greeter_cc_server ${catkin_LIBRARIES} hello_proto)
# ----------------------------------------------------------^^^^^^^^^^^
add_executable(greeter_cc_client src/greeter_cc_client.cc)
# Links hello_proto to greeter_cc_client.
target_link_libraries(greeter_cc_client ${catkin_LIBRARIES} hello_proto)
# ----------------------------------------------------------^^^^^^^^^^^
add_executable(greeter_cc_self src/greeter_cc_self.cc)
# Links hello_msg_proto to greeter_cc_self. This does not contain gRPC.
target_link_libraries(greeter_cc_self ${catkin_LIBRARIES} hello_msg_proto)
# --------------------------------------------------------^^^^^^^^^^^^^^^
catkin_python_setup()
然后在package包含这个包
<?xml version="1.0"?>
<package>
<name>greeter</name>
<version>0.0.0</version>
<description>An example of using catkinized gRPC package.</description>
<maintainer email="example@exp.com"></maintainer>
<license>BSD</license>
<buildtool_depend>catkin</buildtool_depend>
<!-- ADD THE NEXT LINES TO USE GRPC -->
<build_depend>grpc</build_depend>
<run_depend>grpc</run_depend>
</package>
主要我们来看一下服务端,内部包含了最简单的gRPC消息
#include <iostream>
#include <memory>
#include <string>
#include <grpc++/grpc++.h>
#include "greeter/proto/hello.pb.h"
#include "greeter/proto/hello.grpc.pb.h"
#include "greeter/proto/subdir/hello_msgs.pb.h"
// -------^^^^^^^ -- Include package name if SRC_BASE is not in generate_proto
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using catkin_grpc::greeter::HelloRequest;
using catkin_grpc::greeter::HelloReply;
using catkin_grpc::greeter::Greeter;
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix("Hello ");
reply->set_message(prefix + request->name());
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
GreeterServiceImpl service;
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
grpc_init();
RunServer();
return 0;
}
所以对应的客户端也非常简单
#include <iostream>
#include <memory>
#include <string>
#include <grpc++/grpc++.h>
#include "greeter/proto/hello.pb.h"
#include "greeter/proto/hello.grpc.pb.h"
#include "greeter/proto/subdir/hello_msgs.pb.h"
// -------^^^^^^^ -- Include package name if SRC_BASE is not in generate_proto
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using catkin_grpc::greeter::HelloRequest;
using catkin_grpc::greeter::HelloReply;
using catkin_grpc::greeter::Greeter;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assambles the client's payload, sends it and presents the response back
// from the server.
std::string SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
Status status = stub_->SayHello(&context, request, &reply);
// Act upon its status.
if (status.ok()) {
return reply.message();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC failed";
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
grpc_init();
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
std::string user("world");
std::string reply = greeter.SayHello(user);
std::cout << "Greeter received: " << reply << std::endl;
return 0;
}
4. 参考链接
http://doc.oschina.net/grpc?t=57966
https://blog.csdn.net/Mculover666/article/details/120875659
https://blog.csdn.net/heroacool/article/details/119191385
https://www.jianshu.com/p/43fdfeb105ff?from=timeline&isappinstalled=0
https://www.163.com/dy/article/G8JO3BI60511FQO9.html
评论(0)
您还未登录,请登录后发表或查看评论