当前位置:网站首页>GRPC整体学习
GRPC整体学习
2022-08-09 11:41:00 【猫哥灬01】
What
是一款高性能,跨语言,跨平台的rpc框架
Why
优点
protobuf二进制消息,性能好/效率高(空间和时间效率都很不错)
GRPC可以通过protobuf来定义接口,从而可以有更加严格的接口约束条件
缺点
GRPC尚未提供连接池,需要自行实现
尚未提供“服务发现”、“负载均衡”机制
Protobuf二进制可读性差
How
添加依赖
https://github.com/grpc/grpc-java/blob/master/README.md
定义pb文件

通过插件生成java文件
源码分析
总体架构

服务注册(Registry Pack)负责对服务描述、接口描述等信息进行注册,以供方法调用模块查询;
方法调用(Call Pack)为真正的方法调用逻辑,最终它会调用到我们实现的服务接口对应的方法;
网络流(Stream Pack)是对方法调用的网络会话的封装,即一次方法调用为一个流;
传输逻辑(Transport Pack)负责构建真正的底层IO数据传输和相应的事件监听器;
服务管理(Server Pack)负责逻辑服务ServerImpl和监听服务NettyServer的构建与启动。它是整个服务端逻辑的功能工厂;
服务调用原理
服务调用方式
普通 RPC 调用方式,即请求 - 响应模式。
基于 HTTP/2.0 的 streaming 调用方式。
普通调用-同步阻塞调用
代码
blockingStub = GreeterGrpc.newBlockingStub(channel);
...
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
...
原理
同步服务调用是由 gRPC 框架的 ClientCalls 在框架层做了封装,异步发起服务调用之后,同步阻塞调用方线程,直到收到响应再唤醒被阻塞的业务线程

基于 Future 的异步 RPC 调用
代码
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
try {
com.google.common.util.concurrent.ListenableFuture<io.grpc.examples.helloworld.HelloReply>
listenableFuture = futureStub.sayHello(request);
Futures.addCallback(listenableFuture, new FutureCallback<HelloReply>() {
@Override
public void onSuccess(@Nullable HelloReply result) {
logger.info("Greeting: " + result.getMessage());
}
原理
将 ListenableFuture 加入到 gRPC 的 Future 列表中,创建一个新的 FutureCallback 对象,当 ListenableFuture 获取到响应之后,gRPC 的 DirectExecutor 线程池会调用新创建的 FutureCallback,执行 onSuccess 或者 onFailure,实现异步回调通知。

Reactive 风格异步 RPC 调用
代码
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply> responseObserver =
new io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply>()
{
public void onNext(HelloReply value)
{
logger.info("Greeting: " + value.getMessage());
}
public void onError(Throwable t){
logger.warning(t.getMessage());
}
public void onCompleted(){}
};
stub.sayHello(request,responseObserver);
原理

将响应 StreamObserver 作为入参传递到异步服务调用中,该方法返回空,程序继续向下执行,不阻塞当前业务线程
Streaming 模式服务调用
服务端 streaming
客户端 streaming
服务端和客户端双向 streaming
服务端streaming
代码
rpc ListFeatures(Rectangle) returns (stream Feature) {}
public void listFeatures(io.grpc.examples.routeguide.Rectangle request,
io.grpc.stub.StreamObserver<io.grpc.examples.routeguide.Feature> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_LIST_FEATURES, responseObserver);
}
原理

构造 io.grpc.stub.StreamObserver responseObserver,实现它的三个回调接口,注意由于是服务端 streaming 模式,所以它的 onNext(Feature value) 将会被回调多次,每次都代表一个响应,如果所有的响应都返回,则会调用 onCompleted() 方法。
客户端 streaming
代码
rpc RecordRoute(stream Point) returns (RouteSummary) {}
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
for (int i = 0; i < numPoints; ++i) {
int index = random.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
原理

异步服务调用获取请求 StreamObserver 对象,循环调用 requestObserver.onNext(point),异步发送请求消息到服务端,发送完成之后,调用 requestObserver.onCompleted(),通知服务端所有请求已经发送完成,可以接收服务端的响应了。
双向 streaming
代码
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
if (testHelper != null) {
testHelper.onMessage(note);
}
}
//发送多个请求
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
原理

构造 Streaming 响应对象 StreamObserver并实现 onNext 等接口,由于服务端也是 Streaming 模式,因此响应是多个的,也就是说 onNext 会被调用多次。
通过在循环中调用 requestObserver 的 onNext 方法,发送请求消息
边栏推荐
猜你喜欢

proto3-2语法

win10 outlook邮件设置

C# async 和 await 理解

x86异常处理与中断机制(1)概述中断的来源和处理方式

VS Code有趣插件

Redis的下载安装

二重指针-char **、int **的作用

Number theory knowledge

字符串 | 反转字符串 | 双指针法 | leecode刷题笔记

Semaphore SIGCHLD use, how to make the parent that the child performs over, how to make the distinction between multiple child processes. The end
随机推荐
专业人士使用的 11 种渗透测试工具
electron 应用开发优秀实践
[Essence] Analysis of the special case of C language structure: structure pointer / basic data type pointer, pointing to other structures
LeetCode_单调栈_中等_456.132 模式
ICML 2022 | Out-of-Distribution检测与深最近的邻居
[工程数学]1_特征值与特征向量
网页控制台控制编辑框
PAT1007
OpenSSF的开源软件风险评估工具:Scorecards
es6递归函数
fork creates multiple child processes
【概率论】一元概率分布的平均化
结构体知识点整合(前篇)
WPF implements a MessageBox message prompt box with a mask
Installation of gdb 10.2
链表基本操作(详解)
x86 Exception Handling and Interrupt Mechanism (1) Overview of the source and handling of interrupts
Notepad++安装插件
VS Code有趣插件
【C language】typedef的使用:结构体、基本数据类型、数组
