当前位置:网站首页>GRPC整体学习

GRPC整体学习

2022-08-09 11:41:00 猫哥灬01

What

是一款高性能,跨语言,跨平台的rpc框架

Why

优点

  1. protobuf二进制消息,性能好/效率高(空间和时间效率都很不错)

  1. GRPC可以通过protobuf来定义接口,从而可以有更加严格的接口约束条件

缺点

  1. GRPC尚未提供连接池,需要自行实现

  1. 尚未提供“服务发现”、“负载均衡”机制

  1. Protobuf二进制可读性差

How

  1. 添加依赖

    1. https://github.com/grpc/grpc-java/blob/master/README.md

  1. 定义pb文件

  2.  

  3. 通过插件生成java文件

  1.  

源码分析

总体架构

 

服务注册(Registry Pack)负责对服务描述、接口描述等信息进行注册,以供方法调用模块查询;

方法调用(Call Pack)为真正的方法调用逻辑,最终它会调用到我们实现的服务接口对应的方法;

网络流(Stream Pack)是对方法调用的网络会话的封装,即一次方法调用为一个流;

传输逻辑(Transport Pack)负责构建真正的底层IO数据传输和相应的事件监听器;

服务管理(Server Pack)负责逻辑服务ServerImpl和监听服务NettyServer的构建与启动。它是整个服务端逻辑的功能工厂;

服务调用原理

服务调用方式

  • 普通 RPC 调用方式,即请求 - 响应模式。

  • 基于 HTTP/2.0 的 streaming 调用方式。

普通调用-同步阻塞调用

代码

blockingStub = GreeterGrpc.newBlockingStub(channel);

...

HelloRequest request = HelloRequest.newBuilder().setName(name).build();

HelloReply response;

try {

response = blockingStub.sayHello(request);

...

原理

同步服务调用是由 gRPC 框架的 ClientCalls 在框架层做了封装,异步发起服务调用之后,同步阻塞调用方线程,直到收到响应再唤醒被阻塞的业务线程

 

基于 Future 的异步 RPC 调用

代码

HelloRequest request = HelloRequest.newBuilder().setName(name).build();

try {

com.google.common.util.concurrent.ListenableFuture<io.grpc.examples.helloworld.HelloReply>

listenableFuture = futureStub.sayHello(request);

Futures.addCallback(listenableFuture, new FutureCallback<HelloReply>() {

@Override

public void onSuccess(@Nullable HelloReply result) {

logger.info("Greeting: " + result.getMessage());

}

原理

将 ListenableFuture 加入到 gRPC 的 Future 列表中,创建一个新的 FutureCallback 对象,当 ListenableFuture 获取到响应之后,gRPC 的 DirectExecutor 线程池会调用新创建的 FutureCallback,执行 onSuccess 或者 onFailure,实现异步回调通知。

 

Reactive 风格异步 RPC 调用

代码

HelloRequest request = HelloRequest.newBuilder().setName(name).build();

io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply> responseObserver =

new io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply>()

{

public void onNext(HelloReply value)

{

logger.info("Greeting: " + value.getMessage());

}

public void onError(Throwable t){

logger.warning(t.getMessage());

}

public void onCompleted(){}

};

stub.sayHello(request,responseObserver);

原理

 

将响应 StreamObserver 作为入参传递到异步服务调用中,该方法返回空,程序继续向下执行,不阻塞当前业务线程

Streaming 模式服务调用

  • 服务端 streaming

  • 客户端 streaming

  • 服务端和客户端双向 streaming

服务端streaming

代码

rpc ListFeatures(Rectangle) returns (stream Feature) {}

public void listFeatures(io.grpc.examples.routeguide.Rectangle request,

io.grpc.stub.StreamObserver<io.grpc.examples.routeguide.Feature> responseObserver) {

asyncUnimplementedUnaryCall(METHOD_LIST_FEATURES, responseObserver);

}

原理

 

构造 io.grpc.stub.StreamObserver responseObserver,实现它的三个回调接口,注意由于是服务端 streaming 模式,所以它的 onNext(Feature value) 将会被回调多次,每次都代表一个响应,如果所有的响应都返回,则会调用 onCompleted() 方法。

客户端 streaming

代码

rpc RecordRoute(stream Point) returns (RouteSummary) {}

StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);

try {

// Send numPoints points randomly selected from the features list.

for (int i = 0; i < numPoints; ++i) {

int index = random.nextInt(features.size());

Point point = features.get(index).getLocation();

info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),

RouteGuideUtil.getLongitude(point));

requestObserver.onNext(point);

原理

 

异步服务调用获取请求 StreamObserver 对象,循环调用 requestObserver.onNext(point),异步发送请求消息到服务端,发送完成之后,调用 requestObserver.onCompleted(),通知服务端所有请求已经发送完成,可以接收服务端的响应了。

双向 streaming

代码

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

StreamObserver<RouteNote> requestObserver =

asyncStub.routeChat(new StreamObserver<RouteNote>() {

@Override

public void onNext(RouteNote note) {

info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()

.getLatitude(), note.getLocation().getLongitude());

if (testHelper != null) {

testHelper.onMessage(note);

}

}

//发送多个请求

for (RouteNote request : requests) {

info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()

.getLatitude(), request.getLocation().getLongitude());

requestObserver.onNext(request);

}

} catch (RuntimeException e) {

// Cancel RPC

requestObserver.onError(e);

throw e;

}

// Mark the end of requests

requestObserver.onCompleted();

原理

 

构造 Streaming 响应对象 StreamObserver并实现 onNext 等接口,由于服务端也是 Streaming 模式,因此响应是多个的,也就是说 onNext 会被调用多次。

通过在循环中调用 requestObserver 的 onNext 方法,发送请求消息

原网站

版权声明
本文为[猫哥灬01]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_29857681/article/details/126229434