当前位置:网站首页>使用stream load向doris写数据的案例
使用stream load向doris写数据的案例
2022-04-22 06:01:00 【Z-hhhhh】
使用stream load向doris写数据的案例
代码涉及到的依赖如下
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
Java代码如下
public class DorisStreamLoader {
// FE IP Address
private final static String HOST = "xxx.xx.xx.xxx";
// FE port
private final static int PORT = 8030;
// db name
private final static String DATABASE = "test_db";
// table name
private final static String TABLE = "...";
//user name
private final static String USER = "xxx";
//user password
private final static String PASSWD = "xxxx";
//The path of the local file to be imported
private final static String LOAD_FILE_NAME = "......csv";
//http path of stream load task submission
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
//构建HTTP客户端
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// If the connection target is FE, you need to deal with 307 redirect。
return true;
}
});
/** * 文件数据导入 * @param file * @throws Exception */
public void load(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", "#");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * JSON格式的数据导入 * @param jsonData * @throws Exception */
public void loadJson(String jsonData) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", ",");
put.setHeader("format", "json");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
StringEntity entity = new StringEntity(jsonData,"UTF-8");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * 封装认证信息 * @param username * @param password * @return */
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public static void main(String[] args) throws Exception {
DorisStreamLoader loader = new DorisStreamLoader();
//file load
/*File file = new File(LOAD_FILE_NAME); loader.load(file);*/
//json load
String jsonData = "";
loader.loadJson(jsonData);
}
}
需要注意的是:JSON需要转码,否则写入doris中后会出现乱码现象
官网建议不要过于频繁的使用stream load写数据,建议使用流式的方式。stream load适合微批处理。后期准备测一下五秒一次写入数据会怎么样。
参考大佬的文章:https://www.jianshu.com/p/01e47ae333d8
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_45399602/article/details/123198416
边栏推荐
猜你喜欢

redis存入数据显示乱码问题

从零开始学安卓(kotlin)二——Activity

生成订单核销二维码,扫码完成核销

Mvcc transaction isolation in PostgreSQL

EXCEL 利用替换、分列、填充功能综合整理财务数据

使用@Autowired出现Field injection is not recommended

Iframe child parent pass parameter

Pixel mobile telecom 4G cracking (including unlocking BL and root)

(4种)实现垂直居中的方法总结

EXCEL VLOOKUP函数的使用
随机推荐
uniapp解决首页点击返回(手机返回键)跳到登录页
MySQL is a classic question often asked in an interview.
创新实训(五)中期检查
创新实训(六)路由
点击触发其他dom元素:< $refs,$el >
EXCEL 分列功能的使用
Pgdoucer best practices: a series
List&lt;Map&gt;复制:浅拷贝与深拷贝
sqoop连接MySQL失败解决案例
记一次 Redhat 6 yum无法使用的问题
ArcGIS 纵断面分析
api集中管理
js判断数据类型最全的5种方法
从零开始学安卓(kotlin)一 ——入门
使用AES加密-复用前人的智慧
Intersection of interval lists
Wechat applet interface encapsulation (uniapp)
负载平衡的意义什么
Differences between OLAP and OLTP and corresponding models (basic knowledge)
EXCEL IFS函数的使用