当前位置:网站首页>Flink快速上手 完整使用 (第二章)
Flink快速上手 完整使用 (第二章)
2022-08-10 08:49:00 【小坏讲微服务】
Flink快速上手 完整使用
本教程使用到的工具
NC链接
一、环境准备
系统环境为Windows 10。
需提前安装Java 8。
集成开发环境(IDE)使用IntelliJ IDEA,具体的安装流程参见IntelliJ官网。
安装IntelliJ IDEA之后,还需要安装一些插件——Maven和Git。Maven用来管理项目依赖;通过Git可以轻松获取我们的示例代码,并进行本地代码的版本控制。
二、创建项目
1、创建项目
2、添加项目依赖
在项目的pom文件中,增加标签设置属性,然后增加标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入slf4j和log4j进行日志管理。
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入Flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${
scala.binary.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${
scala.binary.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${
slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${
slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
在属性中,我们定义了<scala.binary.version>,这指代的是所依赖的Scala版本。这有一点奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。我们本书中用到的Scala版本为2.12。
3、配置日志管理
在目录src/main/resources下添加文件:log4j.properties,内容配置如下:
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
4、编写代码
搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的Hello World。
我们的源码位于src/main/java目录下。首先新建一个包,命名为com.atguigu.wc,在这个包下我们将编写Flink入门的WordCount程序。
我们已经知道,尽管Flink自身的定位是流式处理引擎,但它同样拥有批处理的能力。所以接下来,我们会针对不同的处理模式、不同的输入数据形式,分别讲述WordCount代码的实现。
5) 批处理
对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一个文本文档,然后读取这个文件处理数据就可以了。
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
(2)在words.txt中输入一些文字,例如:
hello world
hello flink
hello java
(3)在com.atguigu.chapter02包下新建Java类BatchWordCount,在静态main方法中编写测试代码。
我们进行单词频次统计的基本思路是:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
package com.example.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/** * 批处理 * WOrd Count */
public class BachWordCount {
public static void main(String[] args) throws Exception {
//1、创建执行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> textFile = executionEnvironment.readTextFile("input/words.txt");
//将每行数据进行分词,转换成二元组类型
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
//将一行文本进行分词
String[] words = line.split(" ");
//将每个单词转换成二元组输出
for (String word : words) {
out.collect(Tuple2.of(word, 1l));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//按照word分组
UnsortedGrouping<Tuple2<String, Long>> groupBy = wordAndOneTuple.groupBy(0);
//5、分组内进行聚合统计
AggregateOperator<Tuple2<String, Long>> operator = groupBy.sum(1);
operator.print();
}
}
(4) 代码说明
① Flink在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
② Flink同时提供了Java和Scala两种语言的API,有些类在两套API中名称是一样的。所以在引入包时,如果有Java和Scala两种选择,要注意选用Java的包。
③ 直接调用执行环境的readTextFile方法,可以从文件中读取数据。
④我们的目标是将每个单词对应的个数统计出来,所以调用flatmap方法可以对一行文字进行分词转换。将文件中每一行文字拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。returns方法指定的返回数据类型Tuple2,就是Flink自带的二元组数据类型。
⑤ 在分组时调用了groupBy方法,它不能使用分组选择器,只能采用位置索引或属性名称进行分组。
// 使用索引定位
dataStream.groupBy(0)
// 使用类属性名称
dataStream.groupBy("id")
⑤ 在分组之后调用sum方法进行聚合,同样只能指定聚合字段的位置索引或属性名称。
(4) 运行程序,控制台会打印出结果:
(java,1)
(flink,1)
(world,1)
(hello,3)
可以看到,我们将文档中的所有单词的频次,全部统计出来,以二元组的形式在控制台打印输出了。
需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
这样,DataSet API就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只要维护一套DataStream API就可以了。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。
6、流处理
我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。
DataStream API作为“数据流”的处理接口,又怎样处理批数据呢?
回忆一下上一章中我们讲到的Flink世界观。在Flink的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
1、读取文件
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。这是一个“有界流”的处理,整体思路与之前的批处理非常类似,代码模式也基本一致。
package com.example.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/** * 流式处理 * wordCount * 并行度就是当前任务分成多少分、做多线程并行处理的他的程度个数 * * 没有设置多个 * 就是电脑CPU核心数量 */
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataStreamSource<String> textFile = env.readTextFile("input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Long>> returns = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
//包装二元组
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
//returns.keyBy(0) 这样提示放弃了
KeyedStream<Tuple2<String, Long>, String> keyBy = returns.keyBy(data -> data.f0);
//求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1);
sum.print();
env.execute();
}
}
package com.example.wc;
//package com.example.wc.StreamWordCount;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/** * 无界数据流 */
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取文本
DataStreamSource<String> textFile = executionEnvironment.socketTextStream("hadoop102", 7777);
SingleOutputStreamOperator<Tuple2<String, Long>> returns = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
//包装二元组
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
//returns.keyBy(0) 这样提示放弃了
KeyedStream<Tuple2<String, Long>, String> keyBy = returns.keyBy(data -> data.f0);
//求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1);
sum.print();
executionEnvironment.execute();
}
}
代码说明和注意事项:
socket文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机
在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定。
socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
(2)在windows环境的安装NC、发送数据进行测试:
nc lp -7777
hello flink
hello world
hello java
可以看到控制台输出结果如下
4> (flink,1)
2> (hello,1)
3> (world,1)
2> (hello,2)
2> (hello,3)
1> (java,1)
二、本章总结
本章主要实现一个Flink开发的入门程序——词频统计WordCount。通过批处理和流处理两种不同模式的实现,可以对Flink的API风格和编程方式有所熟悉,并且更加深刻地理解批处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(socket文本流)进行流处理的比较,我们也可以更加直观地体会到Flink流处理的方式和特点。
这是我们Flink长征路上的第一步,是后续学习的基础。有了这番初体验,想必大家会发现Flink提供了非常易用的API,基于它进行开发并不是难事。之后我们会逐步深入展开,为大家打开Flink神奇世界的大门。
边栏推荐
- [OAuth2] Nineteen, OpenID Connect dynamic client registration
- How to use [jmeter regular expression extractor] to solve the problem of returning the value as a parameter
- 【REST架构】OData、JsonAPI、GraphQL 有什么区别?
- Delphi实现的一个文件在线查询显示下载功能
- dayjs-----time format
- 【OAuth2】十九、OpenID Connect 动态客户端注册
- iwemeta元宇宙:阿里首任COO:如何打造销售铁军
- ARM Architecture 2: Processor Core and Assembly Instruction Set
- CTFSHOW七夕杯web
- ARM Architecture 3: Addressing and Exception Handling of ARM Instructions
猜你喜欢

【OAuth2】十九、OpenID Connect 动态客户端注册

Delphi实现的一个文件在线查询显示下载功能

How AliExpress sellers seize product search weight

The implementation of the seemingly useless component (text gradient) in NaiveUI is so simple

速卖通卖家如何抓住产品搜索权重

J9数字科普:Web 3.0 是关于数据所有权还是去中心化?

短视频同城流量宣传小魔推有何优势?如何给实体商家带来销量?

NaiveUI中看起来没啥用的组件(文字渐变)实现原来这么简单

数据库注入提权总结(一)

2022-08-01 网工进阶(二十三) VLAN高级技术-VLAN聚合、MUX VLAN
随机推荐
js--------对象数组转换成二维数组(excel表格导出)
详解构建mock服务最方便的神器——Moco
明明加了唯一索引,为什么还是产生重复数据?
Solve the problem that the win10win7win8 system cannot find the specified module and cannot register the desert plug-in
不同的命令行风格
js函数聚合的三种实现方式
Docker搭建Mysql一主一从
js reduce
dayjs-----时间格式化
Rust learning: 6.5_Array of composite types
问下cdc mysql to doris.不显示具体行数,怎么办?
【Unity入门计划】2D游戏实现敌人来回移动控制脚本
2022-08-01 网工进阶(二十四) STP进阶知识
钻石价格预测的ML全流程!从模型构建调优道部署应用!
It is obvious that a unique index is added, why does it still generate duplicate data?
11111
1499. The maximum pile.then/deque
[OAuth2] 20. OAuth2 Extended Protocol PKCE
Rust学习:6.4_复合类型之枚举
2022-08-09:以下go语言代码输出什么?A:否,会 panic;B:是,能正确运行;C:不清楚,看投票结果。