当前位置:网站首页>GRPC整体学习
GRPC整体学习
2022-08-09 11:41:00 【猫哥灬01】
What
是一款高性能,跨语言,跨平台的rpc框架
Why
优点
protobuf二进制消息,性能好/效率高(空间和时间效率都很不错)
GRPC可以通过protobuf来定义接口,从而可以有更加严格的接口约束条件
缺点
GRPC尚未提供连接池,需要自行实现
尚未提供“服务发现”、“负载均衡”机制
Protobuf二进制可读性差
How
添加依赖
https://github.com/grpc/grpc-java/blob/master/README.md
定义pb文件
通过插件生成java文件
源码分析
总体架构
服务注册(Registry Pack)负责对服务描述、接口描述等信息进行注册,以供方法调用模块查询;
方法调用(Call Pack)为真正的方法调用逻辑,最终它会调用到我们实现的服务接口对应的方法;
网络流(Stream Pack)是对方法调用的网络会话的封装,即一次方法调用为一个流;
传输逻辑(Transport Pack)负责构建真正的底层IO数据传输和相应的事件监听器;
服务管理(Server Pack)负责逻辑服务ServerImpl和监听服务NettyServer的构建与启动。它是整个服务端逻辑的功能工厂;
服务调用原理
服务调用方式
普通 RPC 调用方式,即请求 - 响应模式。
基于 HTTP/2.0 的 streaming 调用方式。
普通调用-同步阻塞调用
代码
blockingStub = GreeterGrpc.newBlockingStub(channel);
...
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
...
原理
同步服务调用是由 gRPC 框架的 ClientCalls 在框架层做了封装,异步发起服务调用之后,同步阻塞调用方线程,直到收到响应再唤醒被阻塞的业务线程
基于 Future 的异步 RPC 调用
代码
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
try {
com.google.common.util.concurrent.ListenableFuture<io.grpc.examples.helloworld.HelloReply>
listenableFuture = futureStub.sayHello(request);
Futures.addCallback(listenableFuture, new FutureCallback<HelloReply>() {
@Override
public void onSuccess(@Nullable HelloReply result) {
logger.info("Greeting: " + result.getMessage());
}
原理
将 ListenableFuture 加入到 gRPC 的 Future 列表中,创建一个新的 FutureCallback 对象,当 ListenableFuture 获取到响应之后,gRPC 的 DirectExecutor 线程池会调用新创建的 FutureCallback,执行 onSuccess 或者 onFailure,实现异步回调通知。
Reactive 风格异步 RPC 调用
代码
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply> responseObserver =
new io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply>()
{
public void onNext(HelloReply value)
{
logger.info("Greeting: " + value.getMessage());
}
public void onError(Throwable t){
logger.warning(t.getMessage());
}
public void onCompleted(){}
};
stub.sayHello(request,responseObserver);
原理
将响应 StreamObserver 作为入参传递到异步服务调用中,该方法返回空,程序继续向下执行,不阻塞当前业务线程
Streaming 模式服务调用
服务端 streaming
客户端 streaming
服务端和客户端双向 streaming
服务端streaming
代码
rpc ListFeatures(Rectangle) returns (stream Feature) {}
public void listFeatures(io.grpc.examples.routeguide.Rectangle request,
io.grpc.stub.StreamObserver<io.grpc.examples.routeguide.Feature> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_LIST_FEATURES, responseObserver);
}
原理
构造 io.grpc.stub.StreamObserver responseObserver,实现它的三个回调接口,注意由于是服务端 streaming 模式,所以它的 onNext(Feature value) 将会被回调多次,每次都代表一个响应,如果所有的响应都返回,则会调用 onCompleted() 方法。
客户端 streaming
代码
rpc RecordRoute(stream Point) returns (RouteSummary) {}
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
for (int i = 0; i < numPoints; ++i) {
int index = random.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
原理
异步服务调用获取请求 StreamObserver 对象,循环调用 requestObserver.onNext(point),异步发送请求消息到服务端,发送完成之后,调用 requestObserver.onCompleted(),通知服务端所有请求已经发送完成,可以接收服务端的响应了。
双向 streaming
代码
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
if (testHelper != null) {
testHelper.onMessage(note);
}
}
//发送多个请求
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
原理
构造 Streaming 响应对象 StreamObserver并实现 onNext 等接口,由于服务端也是 Streaming 模式,因此响应是多个的,也就是说 onNext 会被调用多次。
通过在循环中调用 requestObserver 的 onNext 方法,发送请求消息
边栏推荐
猜你喜欢
F280049库函数API编程、直接寄存器控制编程和混合编程方法
wait system call
ARP协议原理
拍频造成的轻微震荡
Redis高可用部署
二重指针-char **、int **的作用
WPF implements a MessageBox message prompt box with a mask
电解电容漏电流及均压
Visual Studio 2017 ASP.NET Framework MVC 项目 MySQL 配置连接
x86 Exception Handling and Interrupt Mechanism (1) Overview of the source and handling of interrupts
随机推荐
Django cannot link mysql database
PAT1013 并查集 DFS(查找联通分量的个数)
log4net使用指南(winform版,sqlserver记录)
Ways to prevent data fraud
This application has no explicit mapping for /error, so you are seeing this as a fallback
Notepad++安装插件
[C language] creation and use of dynamic arrays
ICML 2022 | Out-of-Distribution Detection with Deep Nearest Neighbors
TI的片上固化好的boot ROM(上电引导加载程序)退出后的去向
修改VOT2018.json文件,去掉图片路径中的color
web course design
redis内存的淘汰机制
proto3-2语法
ACM01 Backpack problem
实验记录:搭建网络过程
【Basic model】Transformer-实现中英翻译
[现代控制理论]5_系统的可控性_controllability
PAT1004
Experiment record: the process of building a network
Redis高可用部署