当前位置:网站首页>Case of using stream load to write data to Doris
Case of using stream load to write data to Doris
2022-04-23 04:41:00 【Z-hhhhh】
Use stream load towards doris Cases of writing data
The dependencies involved in the code are as follows
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
Java The code is as follows
public class DorisStreamLoader {
// FE IP Address
private final static String HOST = "xxx.xx.xx.xxx";
// FE port
private final static int PORT = 8030;
// db name
private final static String DATABASE = "test_db";
// table name
private final static String TABLE = "...";
//user name
private final static String USER = "xxx";
//user password
private final static String PASSWD = "xxxx";
//The path of the local file to be imported
private final static String LOAD_FILE_NAME = "......csv";
//http path of stream load task submission
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
// structure HTTP client
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// If the connection target is FE, you need to deal with 307 redirect.
return true;
}
});
/** * File data import * @param file * @throws Exception */
public void load(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", "#");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * JSON Format data import * @param jsonData * @throws Exception */
public void loadJson(String jsonData) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", ",");
put.setHeader("format", "json");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
StringEntity entity = new StringEntity(jsonData,"UTF-8");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * Encapsulate authentication information * @param username * @param password * @return */
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public static void main(String[] args) throws Exception {
DorisStreamLoader loader = new DorisStreamLoader();
//file load
/*File file = new File(LOAD_FILE_NAME); loader.load(file);*/
//json load
String jsonData = "";
loader.loadJson(jsonData);
}
}
It should be noted that :JSON Transcoding required , Otherwise write doris There will be garbled code after
The official website suggests not to use it too often stream load Writing data , Streaming is recommended .stream load Suitable for micro batch processing . How about writing data once every five seconds .
Refer to the article of big brother :https://www.jianshu.com/p/01e47ae333d8
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220559122138.html
边栏推荐
- Go reflection rule
- What is a data island? Why is there still a data island in 2022?
- leetcode009--用二分查找在数组中搜索目标值
- 第四章 --- 了解标准设备文件、过滤器和管道
- AWS eks add cluster user or Iam role
- Interaction of diet gut microbiota on cardiovascular disease
- 重剑无锋,大巧不工
- MYSQL50道基础练习题
- Improving 3D object detection with channel wise transformer
- Chapter 4 - understanding standard equipment documents, filters and pipelines
猜你喜欢
![[paper reading] [3D target detection] point transformer](/img/c5/b1fe5f206b5fe6e4dcd88dce11592d.png)
[paper reading] [3D target detection] point transformer

Recommended scheme of national manufactured electronic components

Bridge between ischemic stroke and intestinal flora: short chain fatty acids

MySQL queries users logged in for at least N consecutive days

Key points of AWS eks deployment and differences between console and eksctl creation

229. Find mode II

Druid -- JDBC tool class case

【论文阅读】【3d目标检测】point transformer

针对NFT的网络钓鱼

Summary of Android development posts I interviewed in those years (attached test questions + answer analysis)
随机推荐
What is a data island? Why is there still a data island in 2022?
Leetcode005 -- delete duplicate elements in the array in place
thymeleaf th:value 为null时报错问题
Go反射法则
基于英飞凌MCU GTM模块的无刷电机驱动方案开源啦
leetcode006--查找字符串数组中的最长公共前缀
test
Redis 命令大全
Microbial neuroimmune axis -- the hope of prevention and treatment of cardiovascular diseases
New terminal play method: script guidance independent of technology stack
A heavy sword without a blade is a great skill
[pytoch foundation] torch Split() usage
io.Platform.packageRoot; // ignore: deprecated_member_use
Chlamydia infection -- causes, symptoms, treatment and Prevention
Interaction of diet gut microbiota on cardiovascular disease
MYSQL查询至少连续n天登录的用户
io. Platform. packageRoot; // ignore: deprecated_ Member_ use
记录一下盲注脚本
AWS EKS添加集群用户或IAM角色
leetcode009--用二分查找在数组中搜索目标值