当前位置:网站首页>Introduction to spark basic operation
Introduction to spark basic operation
2022-04-23 13:56:00 【Lionel leemon】
Spark Basic operation
One ,Spark Installation of
Fill in later
Two ,Spark Introduce
2.1 RDD
2.1.1 RDD And its characteristics
RDD yes Spark The core data model of , But an abstract class , Its full name is Resillient Distributed Dataset, namely Elastic distributed data sets .
2、RDD In the abstract, it is a collection of elements , Contains data . It's partitioned , It's divided into sections , Each partition is distributed on different nodes in the cluster , So that RDD The data in can be operated in parallel .( Distributed datasets )
3、RDD Usually by Hadoop File on , namely HDFS Documents or Hive surface , To create ; Sometimes you can create... From collections in your application .
4、RDD The most important characteristic is , Provides fault tolerance , Can automatically recover from node failure . That is, if the RDDpartition, Because of node failure , Cause data loss , that RDD Will automatically recalculate the partition. It's all transparent to users .
5、RDD Data stored in memory by default , But when memory resources are low ,Spark Will automatically RDD Data written to disk .( elastic )
2.1.2 Create RDD
Conduct Spark The first step in core programming is to create an initial RDD. The RDD, It usually represents and contains Spark Input source data for the application . And then through Spark Core Provided transformation operator , For the RDD convert , To get something else RDD.
Spark Core Three kinds of creation are provided RDD The way :
-
Use the collection in the program to create RDD( Mainly used for testing )
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers);
-
Create... Using local files RDD( It is mainly used for temporary processing of files with large amount of data )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)"); sparkConf.setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); String tmpPath = "E:\\data\\spark\\input.txt"; JavaRDD<String> lines = javaSparkContext.textFile(tmpPath);
-
Use HDFS File creation RDD( Common ways of producing environments )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Import files ( Create RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
2.1.3 operation RDD
Spark Two kinds of support RDD operation :transformation and action.
transformation
transformation The operation will focus on the existing RDD Create a new RDD.transformation have lazy characteristic , namely transformation Not trigger spark The execution of the program That's ok , They just record right RDD Operation done , Will not spontaneously execute . Only one... Has been executed action, Everything before transformation Will execute .
frequently-used transformation Introduce :
- map : take RDD Each element in passes a custom function , Get a new element , Then make up a new... With new elements RDD.
- flatMap: And map similar , But one or more elements can be returned for each element .
- mapToPair: On map Similar , The output type is <k, v> Format .
- filter: Yes RDD Each element of the , If you return true The retention , return false Then remove .
- groupByKey: according to key Grouping , Every key Corresponding to one
Iterable<value>
. - reduceByKey: For each key Corresponding value Conduct reduce operation .
- sortByKey: For each key Corresponding value Sort operations .
- join: For two contains <key,value> Right RDD Conduct join operation , Every keyjoin Upper pair, Will be passed in custom functions for processing .
- cogroup: Same as join, But every one of them key Corresponding Iterable Will be passed in custom functions for processing .
action
action The operation is mainly for RDD Do the last thing , For example, traversal ,reduce, Save to file, etc , And can return the result to Driver Program .action Operation execution , It will trigger a spark job Operation of , And trigger this action All before transformation Implementation , This is a action Characteristics of .
frequently-used action Introduce :
- reduce: take RDD All elements in are aggregated . The first and second elements converge , Values aggregate with the third element , Value converges with the fourth element , And so on .
- collect: take RDD All elements in get to local client ( It is generally not recommended to use ).
- count: obtain RDD The total number of elements .
- take(n): obtain RDD Middle front n Elements .
- saveAsTextFile: take RDD The element is saved to a file , Call for each element toString Method .
- countByKey: For each key Corresponding value count Count .
- foreach: Traverse RDD Every element in .
2.2 Sorting and associating
2.2.1 SortByKey
This method is only right Key Sort ,value Don't order .
Default sort
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source = Arrays.asList(
new Tuple2<>("lmc", 89),
new Tuple2<>("hh", 89),
new Tuple2<>("tom", 79),
new Tuple2<>("marry", 89),
new Tuple2<>("sb", 89),
new Tuple2<>("lemoon", 79));
// Import data
JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);
System.out.println(textFile.sortByKey().collect());// Descending
System.out.println(textFile.sortByKey(false).collect());// Ascending
Print the results :
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]
Custom sort
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
Print the results :
1 : 1
1 : 5
1 : 7
1 : 12
2 : 4
2 : 7
3 : 6
5 : 3
Custom comparison class MySort:
class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
2.2.2 join and cogroup
join Is to put two sets according to key, Content aggregation , and cogroup In the process of aggregation, we will first analyze RDD The same in key A merger .
package com.lmc.spark.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: JoinTest * @author: Leemon * @Description: TODO * @date: 2021/3/17 13:54 * @version: 1.0 */
public class JoinTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source1 = Arrays.asList(
new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));
List<Tuple2<String, Integer>> source2 = Arrays.asList(
new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),
new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));
// Import data
JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);
JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);
// Associated data
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);
JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);
List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();
List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();
// Output
target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));
System.out.println("===================================================");
target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));
}
}
// Output results
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58
3、 ... and ,Spark Basic case (WordCount)
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lmc-user</artifactId>
<groupId>com.lmc</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-test</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
</project>
3.2 WordCount.java
demand : Count the number of occurrences of each word in the file
Source file :
One morning a fox sees a cock.He think,"This is my breakfast.''
He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.
he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.
They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''
Calculation :
package com.lmc.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: WorkCount * @author: Leemon * @Description: TODO * @date: 2021/1/28 15:10 * @version: 1.0 */
public class WordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/wordCount/output2/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
JavaPairRDD<String, Integer> counts = textFile
// Each line is broken into words , After returning, form a large collection
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
//key Is the word ,value yes 1
.mapToPair(word -> new Tuple2<>(word, 1))
// be based on key Conduct reduce, Logic is to value Add up
.reduceByKey((a, b) -> a + b);
// First the key and value Come back , Again according to key Sort
JavaPairRDD<String, Integer> sorts = counts
//key and value Reverse , Generate a new map
.mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));
// according to key Reverse order
//.sortByKey(false);
// Take before 10 individual
List<Tuple2<String, Integer>> collect = sorts.take(10);
// Print out
for(Tuple2<String, Integer> tuple2 : collect){
System.out.println(tuple2._1() + "\t" + tuple2._2());
}
// Merge partitions into one , Then export it as a txt Save in hdfs
javaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);
// close context
javaSparkContext.close();
}
}
It turns out :
(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)
3.3 FileSort.java
demand :
- Sort by the first column in the file .
- If the first column is the same , Then sort by the second column .
/** * @ClassName: FileSort * @author: Leemon * @Description: TODO * @date: 2021/3/8 16:33 * @version: 1.0 */
public class FileSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
}
static class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
}
//1 : 1
//1 : 5
//1 : 7
//1 : 12
//2 : 4
//2 : 7
//3 : 6
//5 : 3
3.4 ClassSort.java
demand : For the students in each class , Before removal 3 name .
Data source :
class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91
Calculation :
/** * @ClassName: ClassSort * @author: Leemon * @Description: TODO * @date: 2021/3/9 17:15 * @version: 1.0 */
public class ClassSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/spark/classSort/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/spark/classSort/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
// branch
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));
// Sort
JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());
// cutting key - value
JavaPairRDD<String, Integer> splits =
sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));
// Grouping
JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();
// Get only value Top three
JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());
// Get 10 One key Content of
List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);
for (Tuple2<String, List<Integer>> t: tuple2s) {
for (Integer i : t._2() ) {
System.err.println(t._1() + " " + i);
}
System.err.println("--------------------");
}
}
static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {
@Override
public Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
Iterator<Integer> its = stringIterableTuple2._2().iterator();
List<Integer> ints = new ArrayList<>();
int count = 0;
while (its.hasNext()) {
++count;
ints.add(its.next());
if (count >= 3) break;
}
return new Tuple2<>(stringIterableTuple2._1(), ints);
}
}
static class MySort1 implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_2 >= o2s_2) return -1;
return 1;
}
}
}
It turns out :
class2 88
class2 78
class2 64
--------------------
class3 91
class3 88
class3 77
--------------------
class1 92
class1 88
class1 79
--------------------
版权声明
本文为[Lionel leemon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231353535502.html
边栏推荐
- Oracle RAC database instance startup exception analysis IPC send timeout
- 2022年江西最新建筑八大员(质量员)模拟考试题库及答案解析
- Analysis of unused index columns caused by implicit conversion of timestamp
- Ora-16047 of a DG environment: dgid mismatch between destination setting and target database troubleshooting and listening vncr features
- Failure to connect due to improper parameter setting of Rac environment database node. Troubleshooting
- Lenovo Saver y9000x 2020
- Apache Atlas Compilation and installation records
- leetcode--357. 统计各位数字都不同的数字个数
- OSS cloud storage management practice (polite experience)
- L2-024 部落 (25 分)
猜你喜欢
Postman reference summary
Oracle job scheduled task usage details
SQL learning window function
Decentralized Collaborative Learning Framework for Next POI Recommendation
【vmware】vmware tools 地址
UML Unified Modeling Language
The query did not generate a result set exception resolution when the dolphin scheduler schedules the SQL task to create a table
专题测试05·二重积分【李艳芳全程班】
SQL learning | complex query
Multithreading
随机推荐
Android interview theme collection
MySQL [SQL performance analysis + SQL tuning]
Express ② (routing)
3300万IOPS、39微秒延迟、碳足迹认证,谁在认真搞事情?
Two ways to deal with conflicting data in MySQL and PG Libraries
JS 烧脑面试题大赏
Oracle job scheduled task usage details
Android篇:2019初中级Android开发社招面试解答(中
Wechat applet
Strange bug of cnpm
Express②(路由)
Lenovo Saver y9000x 2020
Quartus Prime硬件实验开发(DE2-115板)实验二功能可调综合计时器设计
freeCodeCamp----arithmetic_ Arranger exercise
神经元与神经网络
Core concepts of microservice architecture
leetcode--357. 统计各位数字都不同的数字个数
Tensorflow Download
About note 1
Storage scheme of video viewing records of users in station B