当前位置:网站首页>Thrift -- 跨语言RPC 框架
Thrift -- 跨语言RPC 框架
2022-08-10 09:49:00 【C0oOder】
Thrift – 跨语言RPC 框架
1.简介
The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.
用于跨语言的服务软件开发,结合代码生成引擎,构建 C++, Java, Python, PHP, Ruby… 生成多语言的RPC 调用代码;
借助IDL 语言(可以理解这个框架的模板语言,后面在详细说) 生成不同语言的调用代码
2.下载安装
我电脑是windows10 就直接下载 win 版本 下载地址是 : https://thrift.apache.org/download
安装环境变量 ,
我这里复制了一个副本把版本信息去除了
C:\Users\sff>thrift -version
Thrift version 0.16.0
安装完成
3.Thrift 架构
交互架构 CS架构,客户端请求服务端;这两边的代码都可以通过Thrift 来生成
技术架构就是 (区别于ISO 七层或者五层网络哈)
- 传输层 定义具体的网络连接传输协议 使用TCP /IP UDP
- 协议层 RPC 交互过程中数据的序列化和反序列化格式 有JSON XML 二进制数据等
- 处理层 Thrift gen 生成的各个语言的接口,然后我们服务端是实现这个接口来提供客户端的调用服务
- 服务层 接口上面 然后使用不同的IO 模型来处理数据
4.Thrift 开发优势
多语言 跨语言,不过要学习IDL(接口定义语言) . 这个还是相对比较简单。。
只要维护IDL 文件,代码生成引擎 会生产各个语言的服务端代码和客户端代码,省去定义各语言接口编写,网络传输,服务器IO模型类型(少量代码)的时间,可以专注于实现自己的业务代码;
注: 学习地址
IDL语法:https://thrift.apache.org/docs/idl
类型定义:https://thrift.apache.org/docs/types
示例:
//各个语言对应的包
namespace java com.sff.study
namespace js com.sff.study
namespace go com.sff.study
struct Demo{
1: i32 id
2: string desc
}
service DemoService {
Demo getDemo(1:i32 id)
bool setDemo(1:Demo demo)
}
## 生成 java go js cpp
D:\Resource\FrameMiddleware\Thrift\template>thrift -o D:\Resource\FrameMiddleware\Thrift\ --gen java demo.thrift
D:\Resource\FrameMiddleware\Thrift\template>thrift -o D:\Resource\FrameMiddleware\Thrift\ --gen go demo.thrift
D:\Resource\FrameMiddleware\Thrift\template>thrift -o D:\Resource\FrameMiddleware\Thrift\ --gen js demo.thrift
D:\Resource\FrameMiddleware\Thrift\template>thrift -o D:\Resource\FrameMiddleware\Thrift\ --gen cpp demo.thrift
5.协议层协议
父类 TProtocol
- TCompacProtocol :压缩二进制编码传输
- TbinaryProtocol:二进制编码传输
- TJSONProtocol JSON 协议传输 带类型信息
- TSimpleJSONProtocol JSON 只只有数据 需要自己有反序列化工具
6.传输层协议
- TServerSocket : Socket 常见的BIO
- TNonblockingServerTransport: 非阻塞IO Nio ;
- TFramedTransport
7.快速开始
基于前面代码生成的 java 版本的 代码
7.1 创建两个项目 导入生成的代码
7.2 添加依赖
<dependencies>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.16.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
7.3 服务端实现接口
public class DemoServiceImpl implements DemoService.Iface {
@Override
public Demo getDemo(int id) throws TException {
Demo demo = new Demo();
int nextInt = new Random().nextInt(100);
demo.setId(nextInt);
demo.setDesc("我是demo" + nextInt);
return null;
}
@Override
public boolean setDemo(Demo demo) throws TException {
System.out.println("保存了~~~~~:" + demo);
return false;
}
}
主启动类 简单模式
package com.sff.study;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
public class ServerMain {
public static void main(String[] args) {
try {
TServerTransport serverTransport = new TServerSocket(9090);
TServer.Args args1 = new TServer.Args(serverTransport);
// 业务处理器
DemoService.Processor processor = new DemoService.Processor(new DemoServiceImpl());
args1.processor(processor);
// 指定序列化工厂
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
args1.protocolFactory(factory);
// 绑定参数
TSimpleServer server = new TSimpleServer(args1);
// Use this for a multithreaded server
// TServer server = new TThreadPoolServer(new
// TServer server = new TThreadPoolServer(new
// TThreadPoolServer.Args(serverTransport).processor(processor));
System.out.println("Starting the simple server...");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4 客户端
测试连接调用
package com.sff.study;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
public class ClientMain {
public static void main(String[] args) throws Exception {
// 传输层IO
TSocket socket = new TSocket("127.0.0.1", 9090);
// 协议层 序列化
TProtocol protocol = new TBinaryProtocol(socket);
DemoService.Client client = new DemoService.Client(protocol);
// 打开 scoket
socket.open();
Demo demo = client.getDemo(11);
System.out.println(demo);
// int nextInt = new Random().nextInt(100);
}
}
7.5 调试
第一次调试
先开启服务端,后开启客户端 ,客户端报错
Connected to the target VM, address: '127.0.0.1:54713', transport: 'socket'
Exception in thread "main" org.apache.thrift.TApplicationException: getDemo failed: unknown result
at com.sff.study.DemoService$Client.recv_getDemo(DemoService.java:71)
at com.sff.study.DemoService$Client.getDemo(DemoService.java:56)
at com.sff.study.ClientMain.main(ClientMain.java:20)
Disconnected from the target VM, address: '127.0.0.1:54713', transport: 'socket'
发现是 DemoServiceImpl##getDemo() 返回的是null 导致 修改后可以调试通过
Connected to the target VM, address: '127.0.0.1:54809', transport: 'socket'
Demo(id:11, desc:我是demo11)
Disconnected from the target VM, address: '127.0.0.1:54809', transport: 'socket'
8.服务端服务类型
TServer 是所有服务的父类
// 主要方法 还是子类去实现的 开启服务 serve
public abstract void serve();
// 主要方法 还是子类去实现的 关闭服务 stop
public void stop() {
}
8.1 TSimpleServer
public class TSimpleServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TSimpleServer.class.getName());
public TSimpleServer(AbstractServerArgs args) {
super(args);
}
//核心方法 开启服务
public void serve() {
try {
//开启监听端口
this.serverTransport_.listen();
} catch (TTransportException var9) {
LOGGER.error("Error occurred during listening.", var9);
return;
}
if (this.eventHandler_ != null) {
this.eventHandler_.preServe();
}
this.setServing(true);
// 关闭服务置位 true 停止循环
while(!this.stopped_) {
.....
try {
// 阻塞方法 一直监听请求
client = this.serverTransport_.accept();
if (client != null) {
........
while(true) {
if (this.eventHandler_ != null) {
this.eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
//获取我们自己的处理器 处理业务逻辑
processor.process(inputProtocol, outputProtocol);
}
}
}
catch (TTransportException var10) {
......
}
}
this.setServing(false);
}
//核心方法 关闭服务
public void stop() {
this.stopped_ = true;
this.serverTransport_.interrupt();
}
}
执行流程如下
缺点: 单线程 阻塞;多客户端连接会阻塞,直到前面连接的释放才可以处理下一个
8.2 TThreadPoolServer
public class TThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);
private ExecutorService executorService_;
private final TimeUnit stopTimeoutUnit;
private final long stopTimeoutVal;
public TThreadPoolServer(TThreadPoolServer.Args args) {
super(args);
this.stopTimeoutUnit = args.stopTimeoutUnit;
this.stopTimeoutVal = (long)args.stopTimeoutVal;
// 初始化的时候使用 线程池
this.executorService_ = args.executorService != null ? args.executorService : createDefaultExecutorService(args);
}
// 线程池参数
private static ExecutorService createDefaultExecutorService(TThreadPoolServer.Args args) {
// args.maxWorkerThreads 参数为最大线程数
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
final AtomicLong count = new AtomicLong();
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(String.format("TThreadPoolServer WorkerProcess-%d", this.count.getAndIncrement()));
return thread;
}
});
}
//开启监听
protected boolean preServe() {
......
this.serverTransport_.listen();
......
return true;
}
public void serve() {
//开启监听
if (this.preServe()) {
// 真正执行的方法
this.execute();
this.executorService_.shutdownNow();
if (!this.waitForShutdown()) {
LOGGER.error("Shutdown is not done after " + this.stopTimeoutVal + this.stopTimeoutUnit);
}
this.setServing(false);
}
}
protected void execute() {
while(!this.stopped_) {
try {
//和8.1 的那个一样 会一致处于阻塞 直到新的socket 到来
TTransport client = this.serverTransport_.accept();
try {
//来一个连接就交给线程池处理
this.executorService_.execute(new TThreadPoolServer.WorkerProcess(client));
} catch (RejectedExecutionException var3) {
if (!this.stopped_) {
LOGGER.warn("ThreadPool is saturated with incoming requests. Closing latest connection.");
}
client.close();
}
} catch (TTransportException var4) {
if (!this.stopped_) {
LOGGER.warn("Transport error occurred during acceptance of message", var4);
}
}
}
}
public static class Args extends AbstractServerArgs<TThreadPoolServer.Args> {
public int minWorkerThreads = 5;
public int maxWorkerThreads = 2147483647;
public ExecutorService executorService;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit;
.....
}
}
执行流程图
优点:每个Socket 都会去线程池拿一个线程去处理,适合连接数量不多的这种场景
缺点:很多客户端同时连接的时候,当客户端比较多的时候,就需要服务端有对应数量的线程来处理,而且代码中设置的上限还比较大;线程资源比较重要,比较消耗服务器性能;可能会内存溢出;
8.3 AbstractNonblockingServer
AbstractNonblockingServer 父类 主要是约定主流程 ,实现类只能使用TFrameTransport
public void serve() {
if (this.startThreads()) {
if (this.startListening()) {
this.setServing(true);
this.waitForShutdown();
this.setServing(false);
this.stopListening();
}
}
}
8.4 TNonblockingServer React 模型
TNonblockingServer extends AbstractNonblockingServer
看看执行流程
server 父类
==> this.startThreads()
protected boolean startThreads() {
....
// TNonblockingServer.SelectAcceptThread
this.selectAcceptThread_ = new TNonblockingServer.SelectAcceptThread((TNonblockingServerTransport)this.serverTransport_);
this.selectAcceptThread_.start();
...
}
======>TNonblockingServer.SelectAcceptThread
protected class SelectAcceptThread extends AbstractSelectThread {
......
public void run() {
try {
....
while(!TNonblockingServer.this.stopped_) {
// 调用
this.select();
this.processInterestChanges();
}
.....
}
// 调用
private void select() {
....
this.handleRead(key);
....
}
}
=====》 父类 this.handleRead(key);
protected void handleRead(SelectionKey key) {
///this.requestInvoke(buffer) 调用
if (buffer.isFrameFullyRead() && !AbstractNonblockingServer.this.requestInvoke(buffer)) {
}
//this.requestInvoke(buffer) 调用
protected boolean requestInvoke(FrameBuffer frameBuffer) {
//调用
frameBuffer.invoke();
return true;
}
//frameBuffer.invoke(); 调用
public void invoke() {
....
AbstractNonblockingServer.this.processorFactory_.getProcessor(this.inTrans_).process(this.inProt_, this.outProt_);
}
流程图
缺点:单线程处理,多客户单耗时操作会使得客户端阻塞;
8.5 TThreadedSelectorServer 单React
TThreadedSelectorServer extends AbstractNonblockingServer
TThreadedSelectorServer.AcceptThread 负责连接
TNonblockingServer 的多线程 (选择器线程,业务逻辑处理线程 )版本,一个Accpet 代码就不展示了
流程图如下
这个优点 缺点 :其实不是很明显了 处理连接socket 状态和业务逻辑都给线程池去处理了
8.6 THsHaServer 单React
THsHaServer extends TNonblockingServer
TNonblockingServer 的多线程 (业务逻辑处理线程 )版本 代码就不展示了
边栏推荐
猜你喜欢
【API Management】What is API Management and why is it important?
Excel draws statistical graphs
第三章 搜索与图论(三)
09 【Attributes继承 provide与inject】
【物联网架构】最适合物联网的开源数据库
91.(cesium之家)cesium火箭发射模拟
[Data Architecture] Distributed Data Grid as a Solution for Centralized Data Monolith
UE4 Sequence添加基础动画效果 (04-在序列中使用粒子效果)
The first offline workshop in 2022!Data application experience day for application developers is coming | TiDB Workshop Day
【物联网架构】什么是物联网平台?
随机推荐
DataStream API(基础篇) 完整使用 (第五章)
LiveNVR操作日志页面快速筛选上级国标平台的调用记录直播观看录像回看等操作
中国驻越南使馆提醒在越北部、中部地区中国公民做好台风“木兰”安全防范
Shell脚本数组
LCD模块如何建立联系分析
腾讯发布四足机器人 Max 二代版本,梅花桩上完成跳跃、空翻
「应用架构」TOGAF建模:企业可管理性图
Flink运行时架构 完整使用 (第四章)
IDEA中xml文件头报错:URI is not registered (Settings | Languages & Frameworks | Schemas and DTDs)
因子分析(SPSS)
【数据库架构】OLTP 和 OLAP:实际比较
消息队列概述
用高质量图像标注数据加速AI商业化落地
[System Design] S3 Object Storage
线程池的基本概念、结构、类
「应用架构」六边型架构:三个原则和一个实现示例
CAD to WPF: Tips on converting CAD drawing files to WPF vector code files (xaml files)
Flink快速上手 完整使用 (第二章)
地平线:面向规模化量产的智能驾驶系统和软件开发
shell之函数和数组