当前位置:网站首页>Flink流处理引擎系统学习(二)
Flink流处理引擎系统学习(二)
2022-04-22 08:22:00 【肥仔哥哥1930】
前言
第一个Flink示例,首先分享下课程李老师关于任何一门新技术的学习方法:
1、进官网
2、找项目构建
3、找example里的demo代码
4、拷贝到构建的项目里
go
一、Flink的项目构建
一般官网都会提供项目的构建步骤、或者脚本的。Flink在这里就是提供的脚本,步骤如下:
1、打开官网:https://flink.apache.org/

2、拷贝建Flink项目的脚本

当然,其实是可以在idea里New Project一步步选择Flink的依赖,新建项目的。
这里要说的是,这个脚本需要整理下,整理成一行 ""实际应该是换行符号。
整理后的脚本:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-java -DarchetypeVersion=1.14.4 -DgroupId=frauddetection -DartifactId=frauddetection -Dversion=0.1 -Dpackage=spendreport -DinteractiveMode=false
注意:
如果想自定义项目名称,修改frauddetection为自定义名称就行。


ok,创建demo项目成功。
特别说明:
脚本建的demo项目需要修改依赖范围provided为compile,否则运行报错。
二、第一个example
1.githup找example
打开github首页,搜索框输入flink,搜索。

先找个批处理的统计,wordCount试试。

2.拷贝到demo项目
这里我就整理下2个类,直接拷贝下来,按照自己自定义的包名改下,重现引包就行
WordCount类
package spendreport.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.shaded.curator4.com.google.common.base.Preconditions;
import org.apache.flink.util.Collector;
/** * @author zhengwen **/
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
//1、获取命令行参数
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataSet should not be null.");
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the * form of "(word,1)" ({@code Tuple2<String, Integer>}). */
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
WordCountData类
package spendreport.batch;
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/** * Provides the default data sets used for the WordCount example program. The default data sets are * used, if no parameters are given to the program. */
public class WordCountData {
public static final String[] WORDS =
new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {
return env.fromElements(WORDS);
}
}
demo项目结构

运行结果

总结
- 确实香,而且耗性能还低,比自己写java方法处理低。
- DataSet与DataStream区别(这个example里用的DataSet)
- 表示Flink app中的分布式数据集
- 包含重复的、不可变数据集
- DataSet有界、DataStream可以是无界
- 可以从数据源、也可以通过各种转换操作创建
- Flink共通的编码套路
- 获取执行环境(execution environment)
- 加载/创建初始数据集
- 对数据集进行各种转换操作(生成新的数据集)
- 指定将计算的结果放到何处去
- 触发APP执行
- 懒性计算
- Flink APP都是延迟执行的
- 只有当execute()被显示调用时才会真正执行
- 本地执行还是在集群上执行取决于执行环境的类型
- 好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划
好了,就跟大家分享到这里,tegether go up!!!
版权声明
本文为[肥仔哥哥1930]所创,转载请带上原文链接,感谢
https://zwsky.blog.csdn.net/article/details/124321243
边栏推荐
- Reorganize notes: [Ultimate method] create user-defined code templates in vscade
- C语言实现【关机程序】
- Restore MySQL service after computer reset
- 【系统分析师之路】2020年下系统分析师案例分析真题
- C 位开始公测啦
- Suspended else problem
- RHEL user and group management - Notes
- static的用法【详解】
- Install_ FAILED_ MISSING_ SHARED_ LIBRARY
- [path of system analyst] real topic of case analysis of system analyst in 2020
猜你喜欢
随机推荐
简述变长数组
Little known "three letter word"
【微信小程序】为小程序设置底部菜单(tabBar)
第一节:人像精修第一步-合理转档
Redhat7 configuration yum
About the fact that I was cheated of fifteen thousand when I wanted to borrow money
【图像隐写】Fixed Neural Network Steganography: Train the images, not the network 整理
Servlet模版引擎使用示例
SQL window function
Const modifier variable
A growing tree
Hyperledger Fabric1.4環境搭建及示例測試
MySQL进阶之视图
Learning objectives and general contents of C language
getchar函数的返回类型
C语言的学习目标和大致内容
QT文件读写实战教程
How does CSDN reprint articles
First knowledge of C language ~ circular statement
用栈实现队列(双栈,输入栈、输出栈)


![C language to realize [shutdown program]](/img/b3/0364fda1bc27d754dd11eec055979d.jpg)



![Reorganize notes: [Ultimate method] create user-defined code templates in vscade](/img/82/745bdc43c0a1191db1b2a13e5091d0.png)
![Binary search [detailed explanation]](/img/a0/0ae626b4b8cc742fccde3bd7c3e4a4.png)
