当前位置:网站首页>Case of using stream load to write data to Doris
Case of using stream load to write data to Doris
2022-04-23 04:41:00 【Z-hhhhh】
Use stream load towards doris Cases of writing data
The dependencies involved in the code are as follows
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
Java The code is as follows
public class DorisStreamLoader {
// FE IP Address
private final static String HOST = "xxx.xx.xx.xxx";
// FE port
private final static int PORT = 8030;
// db name
private final static String DATABASE = "test_db";
// table name
private final static String TABLE = "...";
//user name
private final static String USER = "xxx";
//user password
private final static String PASSWD = "xxxx";
//The path of the local file to be imported
private final static String LOAD_FILE_NAME = "......csv";
//http path of stream load task submission
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
// structure HTTP client
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
// If the connection target is FE, you need to deal with 307 redirect.
return true;
}
});
/** * File data import * @param file * @throws Exception */
public void load(File file) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", "#");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * JSON Format data import * @param jsonData * @throws Exception */
public void loadJson(String jsonData) throws Exception {
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", ",");
put.setHeader("format", "json");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
StringEntity entity = new StringEntity(jsonData,"UTF-8");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResult = "";
if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
}
System.out.println("Get load result: " + loadResult);
}
}
}
/** * Encapsulate authentication information * @param username * @param password * @return */
private String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
public static void main(String[] args) throws Exception {
DorisStreamLoader loader = new DorisStreamLoader();
//file load
/*File file = new File(LOAD_FILE_NAME); loader.load(file);*/
//json load
String jsonData = "";
loader.loadJson(jsonData);
}
}
It should be noted that :JSON Transcoding required , Otherwise write doris There will be garbled code after
The official website suggests not to use it too often stream load Writing data , Streaming is recommended .stream load Suitable for micro batch processing . How about writing data once every five seconds .
Refer to the article of big brother :https://www.jianshu.com/p/01e47ae333d8
版权声明
本文为[Z-hhhhh]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220559122138.html
边栏推荐
- Error occurs when thymeleaf th: value is null
- Leetcode - > 1 sum of two numbers
- 【Pytorch基础】torch.split()用法
- 阿里十年技术专家联合打造“最新”Jetpack Compose项目实战演练(附Demo)
- Chlamydia infection -- causes, symptoms, treatment and Prevention
- Go reflection - go language Bible learning notes
- Leetcode005 -- delete duplicate elements in the array in place
- 递归调用--排列的穷举
- zynq平台交叉编译器的安装
- Inverse system of RC low pass filter
猜你喜欢
Use recyclerview to realize left-right side-by-side classification selection
Fusobacterium -- symbiotic bacteria, opportunistic bacteria, oncobacterium
test
AWS eks add cluster user or Iam role
The perfect combination of collaborative process and multi process
Installation of zynq platform cross compiler
zynq平台交叉编译器的安装
Go反射法则
383. Ransom letter
Recommended scheme for national production of electronic components for wireless charging
随机推荐
FAQ of foreign lead and alliance Manager
在AWS控制台创建VPC(无图版)
Microbial neuroimmune axis -- the hope of prevention and treatment of cardiovascular diseases
win10, mysql-8.0.26-winx64.zip 安装
383. Ransom letter
MySQL queries users logged in for at least N consecutive days
leetcode007--判断字符串中的括号是否匹配
520. Detect capital letters
Improving 3D object detection with channel wise transformer
无线充电全国产化电子元件推荐方案
Summary of MySQL de duplication methods
Chapter 4 - understanding standard equipment documents, filters and pipelines
leetcode001--返回和为target的数组元素的下标
The 14th issue of HMS core discovery reviews the long article | enjoy the silky clip and release the creativity of the video
AWS EKS添加集群用户或IAM角色
第四章 --- 了解标准设备文件、过滤器和管道
Supplément: annotation
Go reflection - go language Bible learning notes
Use recyclerview to realize left-right side-by-side classification selection
Detailed explanation of life cycle component of jetpack