当前位置:网站首页>Introduction to spark basic operation
Introduction to spark basic operation
2022-04-23 13:56:00 【Lionel leemon】
Spark Basic operation
One ,Spark Installation of
Fill in later
Two ,Spark Introduce
2.1 RDD
2.1.1 RDD And its characteristics
RDD yes Spark The core data model of , But an abstract class , Its full name is Resillient Distributed Dataset, namely Elastic distributed data sets .
2、RDD In the abstract, it is a collection of elements , Contains data . It's partitioned , It's divided into sections , Each partition is distributed on different nodes in the cluster , So that RDD The data in can be operated in parallel .( Distributed datasets )
3、RDD Usually by Hadoop File on , namely HDFS Documents or Hive surface , To create ; Sometimes you can create... From collections in your application .
4、RDD The most important characteristic is , Provides fault tolerance , Can automatically recover from node failure . That is, if the RDDpartition, Because of node failure , Cause data loss , that RDD Will automatically recalculate the partition. It's all transparent to users .
5、RDD Data stored in memory by default , But when memory resources are low ,Spark Will automatically RDD Data written to disk .( elastic )
2.1.2 Create RDD
Conduct Spark The first step in core programming is to create an initial RDD. The RDD, It usually represents and contains Spark Input source data for the application . And then through Spark Core Provided transformation operator , For the RDD convert , To get something else RDD.
Spark Core Three kinds of creation are provided RDD The way :
-
Use the collection in the program to create RDD( Mainly used for testing )
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers);
-
Create... Using local files RDD( It is mainly used for temporary processing of files with large amount of data )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)"); sparkConf.setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); String tmpPath = "E:\\data\\spark\\input.txt"; JavaRDD<String> lines = javaSparkContext.textFile(tmpPath);
-
Use HDFS File creation RDD( Common ways of producing environments )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Import files ( Create RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
2.1.3 operation RDD
Spark Two kinds of support RDD operation :transformation and action.
transformation
transformation The operation will focus on the existing RDD Create a new RDD.transformation have lazy characteristic , namely transformation Not trigger spark The execution of the program That's ok , They just record right RDD Operation done , Will not spontaneously execute . Only one... Has been executed action, Everything before transformation Will execute .
frequently-used transformation Introduce :
- map : take RDD Each element in passes a custom function , Get a new element , Then make up a new... With new elements RDD.
- flatMap: And map similar , But one or more elements can be returned for each element .
- mapToPair: On map Similar , The output type is <k, v> Format .
- filter: Yes RDD Each element of the , If you return true The retention , return false Then remove .
- groupByKey: according to key Grouping , Every key Corresponding to one
Iterable<value>
. - reduceByKey: For each key Corresponding value Conduct reduce operation .
- sortByKey: For each key Corresponding value Sort operations .
- join: For two contains <key,value> Right RDD Conduct join operation , Every keyjoin Upper pair, Will be passed in custom functions for processing .
- cogroup: Same as join, But every one of them key Corresponding Iterable Will be passed in custom functions for processing .
action
action The operation is mainly for RDD Do the last thing , For example, traversal ,reduce, Save to file, etc , And can return the result to Driver Program .action Operation execution , It will trigger a spark job Operation of , And trigger this action All before transformation Implementation , This is a action Characteristics of .
frequently-used action Introduce :
- reduce: take RDD All elements in are aggregated . The first and second elements converge , Values aggregate with the third element , Value converges with the fourth element , And so on .
- collect: take RDD All elements in get to local client ( It is generally not recommended to use ).
- count: obtain RDD The total number of elements .
- take(n): obtain RDD Middle front n Elements .
- saveAsTextFile: take RDD The element is saved to a file , Call for each element toString Method .
- countByKey: For each key Corresponding value count Count .
- foreach: Traverse RDD Every element in .
2.2 Sorting and associating
2.2.1 SortByKey
This method is only right Key Sort ,value Don't order .
Default sort
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source = Arrays.asList(
new Tuple2<>("lmc", 89),
new Tuple2<>("hh", 89),
new Tuple2<>("tom", 79),
new Tuple2<>("marry", 89),
new Tuple2<>("sb", 89),
new Tuple2<>("lemoon", 79));
// Import data
JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);
System.out.println(textFile.sortByKey().collect());// Descending
System.out.println(textFile.sortByKey(false).collect());// Ascending
Print the results :
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]
Custom sort
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
Print the results :
1 : 1
1 : 5
1 : 7
1 : 12
2 : 4
2 : 7
3 : 6
5 : 3
Custom comparison class MySort:
class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
2.2.2 join and cogroup
join Is to put two sets according to key, Content aggregation , and cogroup In the process of aggregation, we will first analyze RDD The same in key A merger .
package com.lmc.spark.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: JoinTest * @author: Leemon * @Description: TODO * @date: 2021/3/17 13:54 * @version: 1.0 */
public class JoinTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source1 = Arrays.asList(
new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));
List<Tuple2<String, Integer>> source2 = Arrays.asList(
new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),
new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));
// Import data
JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);
JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);
// Associated data
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);
JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);
List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();
List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();
// Output
target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));
System.out.println("===================================================");
target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));
}
}
// Output results
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58
3、 ... and ,Spark Basic case (WordCount)
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lmc-user</artifactId>
<groupId>com.lmc</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-test</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
</project>
3.2 WordCount.java
demand : Count the number of occurrences of each word in the file
Source file :
One morning a fox sees a cock.He think,"This is my breakfast.''
He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.
he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.
They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''
Calculation :
package com.lmc.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: WorkCount * @author: Leemon * @Description: TODO * @date: 2021/1/28 15:10 * @version: 1.0 */
public class WordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/wordCount/output2/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
JavaPairRDD<String, Integer> counts = textFile
// Each line is broken into words , After returning, form a large collection
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
//key Is the word ,value yes 1
.mapToPair(word -> new Tuple2<>(word, 1))
// be based on key Conduct reduce, Logic is to value Add up
.reduceByKey((a, b) -> a + b);
// First the key and value Come back , Again according to key Sort
JavaPairRDD<String, Integer> sorts = counts
//key and value Reverse , Generate a new map
.mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));
// according to key Reverse order
//.sortByKey(false);
// Take before 10 individual
List<Tuple2<String, Integer>> collect = sorts.take(10);
// Print out
for(Tuple2<String, Integer> tuple2 : collect){
System.out.println(tuple2._1() + "\t" + tuple2._2());
}
// Merge partitions into one , Then export it as a txt Save in hdfs
javaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);
// close context
javaSparkContext.close();
}
}
It turns out :
(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)
3.3 FileSort.java
demand :
- Sort by the first column in the file .
- If the first column is the same , Then sort by the second column .
/** * @ClassName: FileSort * @author: Leemon * @Description: TODO * @date: 2021/3/8 16:33 * @version: 1.0 */
public class FileSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
}
static class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
}
//1 : 1
//1 : 5
//1 : 7
//1 : 12
//2 : 4
//2 : 7
//3 : 6
//5 : 3
3.4 ClassSort.java
demand : For the students in each class , Before removal 3 name .
Data source :
class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91
Calculation :
/** * @ClassName: ClassSort * @author: Leemon * @Description: TODO * @date: 2021/3/9 17:15 * @version: 1.0 */
public class ClassSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/spark/classSort/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/spark/classSort/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
// branch
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));
// Sort
JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());
// cutting key - value
JavaPairRDD<String, Integer> splits =
sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));
// Grouping
JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();
// Get only value Top three
JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());
// Get 10 One key Content of
List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);
for (Tuple2<String, List<Integer>> t: tuple2s) {
for (Integer i : t._2() ) {
System.err.println(t._1() + " " + i);
}
System.err.println("--------------------");
}
}
static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {
@Override
public Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
Iterator<Integer> its = stringIterableTuple2._2().iterator();
List<Integer> ints = new ArrayList<>();
int count = 0;
while (its.hasNext()) {
++count;
ints.add(its.next());
if (count >= 3) break;
}
return new Tuple2<>(stringIterableTuple2._1(), ints);
}
}
static class MySort1 implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_2 >= o2s_2) return -1;
return 1;
}
}
}
It turns out :
class2 88
class2 78
class2 64
--------------------
class3 91
class3 88
class3 77
--------------------
class1 92
class1 88
class1 79
--------------------
版权声明
本文为[Lionel leemon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231353535502.html
边栏推荐
- Oracle clear SQL cache
- Decentralized Collaborative Learning Framework for Next POI Recommendation
- Using Baidu Intelligent Cloud face detection interface to achieve photo quality detection
- China creates vast research infrastructure to support ambitious climate goals
- 【vmware】vmware tools 地址
- Haruki Murakami -- Excerpt from "what do I talk about when I talk about running"
- 2021年秋招,薪资排行NO
- Processing of ASM network not automatically started in 19C
- 19c environment ora-01035 login error handling
- Window function row commonly used for fusion and de duplication_ number
猜你喜欢
美联储数字货币最新进展
SQL learning window function
MySQL and PgSQL time related operations
What is the difference between blue-green publishing, rolling publishing and gray publishing?
redis如何解决缓存雪崩、缓存击穿和缓存穿透问题
Express ② (routage)
Handling of high usage of Oracle undo
【报名】TF54:工程师成长地图与卓越研发组织打造
Quartus Prime硬件实验开发(DE2-115板)实验一CPU指令运算器设计
Quartus Prime硬件实验开发(DE2-115板)实验二功能可调综合计时器设计
随机推荐
UNIX final exam summary -- for direct Department
SSM project deployed in Alibaba cloud
redis如何解决缓存雪崩、缓存击穿和缓存穿透问题
[code analysis (2)] communication efficient learning of deep networks from decentralized data
Using Jupiter notebook in virtual environment
Port occupied 1
Oracle creates tablespaces and modifies user default tablespaces
pycharm Install packages failed
【项目】小帽外卖(八)
Core concepts of microservice architecture
[code analysis (6)] communication efficient learning of deep networks from decentralized data
Information: 2021 / 9 / 29 10:01 - build completed with 1 error and 0 warnings in 11S 30ms error exception handling
Small case of web login (including verification code login)
Express ② (routage)
大专的我,闭关苦学 56 天,含泪拿下阿里 offer,五轮面试,六个小时灵魂拷问
神经元与神经网络
MySQL index [data structure + index creation principle]
MySQL [SQL performance analysis + SQL tuning]
Using Baidu Intelligent Cloud face detection interface to achieve photo quality detection
Apache seatunnel 2.1.0 deployment and stepping on the pit