当前位置:网站首页>Introduction to spark basic operation
Introduction to spark basic operation
2022-04-23 13:56:00 【Lionel leemon】
Spark Basic operation
One ,Spark Installation of
Fill in later
Two ,Spark Introduce
2.1 RDD
2.1.1 RDD And its characteristics
RDD yes Spark The core data model of , But an abstract class , Its full name is Resillient Distributed Dataset, namely Elastic distributed data sets .
2、RDD In the abstract, it is a collection of elements , Contains data . It's partitioned , It's divided into sections , Each partition is distributed on different nodes in the cluster , So that RDD The data in can be operated in parallel .( Distributed datasets )
3、RDD Usually by Hadoop File on , namely HDFS Documents or Hive surface , To create ; Sometimes you can create... From collections in your application .
4、RDD The most important characteristic is , Provides fault tolerance , Can automatically recover from node failure . That is, if the RDDpartition, Because of node failure , Cause data loss , that RDD Will automatically recalculate the partition. It's all transparent to users .
5、RDD Data stored in memory by default , But when memory resources are low ,Spark Will automatically RDD Data written to disk .( elastic )
2.1.2 Create RDD
Conduct Spark The first step in core programming is to create an initial RDD. The RDD, It usually represents and contains Spark Input source data for the application . And then through Spark Core Provided transformation operator , For the RDD convert , To get something else RDD.
Spark Core Three kinds of creation are provided RDD The way :
-
Use the collection in the program to create RDD( Mainly used for testing )
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers); -
Create... Using local files RDD( It is mainly used for temporary processing of files with large amount of data )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)"); sparkConf.setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); String tmpPath = "E:\\data\\spark\\input.txt"; JavaRDD<String> lines = javaSparkContext.textFile(tmpPath); -
Use HDFS File creation RDD( Common ways of producing environments )
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Import files ( Create RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
2.1.3 operation RDD
Spark Two kinds of support RDD operation :transformation and action.
transformation
transformation The operation will focus on the existing RDD Create a new RDD.transformation have lazy characteristic , namely transformation Not trigger spark The execution of the program That's ok , They just record right RDD Operation done , Will not spontaneously execute . Only one... Has been executed action, Everything before transformation Will execute .
frequently-used transformation Introduce :
- map : take RDD Each element in passes a custom function , Get a new element , Then make up a new... With new elements RDD.
- flatMap: And map similar , But one or more elements can be returned for each element .
- mapToPair: On map Similar , The output type is <k, v> Format .
- filter: Yes RDD Each element of the , If you return true The retention , return false Then remove .
- groupByKey: according to key Grouping , Every key Corresponding to one
Iterable<value>. - reduceByKey: For each key Corresponding value Conduct reduce operation .
- sortByKey: For each key Corresponding value Sort operations .
- join: For two contains <key,value> Right RDD Conduct join operation , Every keyjoin Upper pair, Will be passed in custom functions for processing .
- cogroup: Same as join, But every one of them key Corresponding Iterable Will be passed in custom functions for processing .
action
action The operation is mainly for RDD Do the last thing , For example, traversal ,reduce, Save to file, etc , And can return the result to Driver Program .action Operation execution , It will trigger a spark job Operation of , And trigger this action All before transformation Implementation , This is a action Characteristics of .
frequently-used action Introduce :
- reduce: take RDD All elements in are aggregated . The first and second elements converge , Values aggregate with the third element , Value converges with the fourth element , And so on .
- collect: take RDD All elements in get to local client ( It is generally not recommended to use ).
- count: obtain RDD The total number of elements .
- take(n): obtain RDD Middle front n Elements .
- saveAsTextFile: take RDD The element is saved to a file , Call for each element toString Method .
- countByKey: For each key Corresponding value count Count .
- foreach: Traverse RDD Every element in .
2.2 Sorting and associating
2.2.1 SortByKey
This method is only right Key Sort ,value Don't order .
Default sort
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source = Arrays.asList(
new Tuple2<>("lmc", 89),
new Tuple2<>("hh", 89),
new Tuple2<>("tom", 79),
new Tuple2<>("marry", 89),
new Tuple2<>("sb", 89),
new Tuple2<>("lemoon", 79));
// Import data
JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);
System.out.println(textFile.sortByKey().collect());// Descending
System.out.println(textFile.sortByKey(false).collect());// Ascending
Print the results :
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]
Custom sort
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
Print the results :
1 : 1
1 : 5
1 : 7
1 : 12
2 : 4
2 : 7
3 : 6
5 : 3
Custom comparison class MySort:
class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
2.2.2 join and cogroup
join Is to put two sets according to key, Content aggregation , and cogroup In the process of aggregation, we will first analyze RDD The same in key A merger .
package com.lmc.spark.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: JoinTest * @author: Leemon * @Description: TODO * @date: 2021/3/17 13:54 * @version: 1.0 */
public class JoinTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<Tuple2<String, Integer>> source1 = Arrays.asList(
new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));
List<Tuple2<String, Integer>> source2 = Arrays.asList(
new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),
new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));
// Import data
JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);
JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);
// Associated data
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);
JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);
List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();
List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();
// Output
target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));
System.out.println("===================================================");
target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));
}
}
// Output results
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58
3、 ... and ,Spark Basic case (WordCount)
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lmc-user</artifactId>
<groupId>com.lmc</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-test</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
</project>
3.2 WordCount.java
demand : Count the number of occurrences of each word in the file
Source file :
One morning a fox sees a cock.He think,"This is my breakfast.''
He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.
he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.
They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''
Calculation :
package com.lmc.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/** * @ClassName: WorkCount * @author: Leemon * @Description: TODO * @date: 2021/1/28 15:10 * @version: 1.0 */
public class WordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/wordCount/output2/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
JavaPairRDD<String, Integer> counts = textFile
// Each line is broken into words , After returning, form a large collection
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
//key Is the word ,value yes 1
.mapToPair(word -> new Tuple2<>(word, 1))
// be based on key Conduct reduce, Logic is to value Add up
.reduceByKey((a, b) -> a + b);
// First the key and value Come back , Again according to key Sort
JavaPairRDD<String, Integer> sorts = counts
//key and value Reverse , Generate a new map
.mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));
// according to key Reverse order
//.sortByKey(false);
// Take before 10 individual
List<Tuple2<String, Integer>> collect = sorts.take(10);
// Print out
for(Tuple2<String, Integer> tuple2 : collect){
System.out.println(tuple2._1() + "\t" + tuple2._2());
}
// Merge partitions into one , Then export it as a txt Save in hdfs
javaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);
// close context
javaSparkContext.close();
}
}
It turns out :
(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)
3.3 FileSort.java
demand :
- Sort by the first column in the file .
- If the first column is the same , Then sort by the second column .
/** * @ClassName: FileSort * @author: Leemon * @Description: TODO * @date: 2021/3/8 16:33 * @version: 1.0 */
public class FileSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
// Import data
JavaRDD<String> textFile = javaSparkContext.parallelize(source);
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
List<Tuple2<String, String>> list = ss.take(10);
for (Tuple2<String, String> t: list) {
System.err.println(t._1() + " : " + t._2());
}
}
static class MySort implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_1 = Integer.valueOf(o1s[0]);
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_1 = Integer.valueOf(o2s[0]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_1 > o2s_1) {
return 1;
}else if (o1s_1 == o2s_1){
return o1s_2 - o2s_2;
}
return -1;
}
}
}
//1 : 1
//1 : 5
//1 : 7
//1 : 12
//2 : 4
//2 : 7
//3 : 6
//5 : 3
3.4 ClassSort.java
demand : For the students in each class , Before removal 3 name .
Data source :
class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91
Calculation :
/** * @ClassName: ClassSort * @author: Leemon * @Description: TODO * @date: 2021/3/9 17:15 * @version: 1.0 */
public class ClassSort {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route
String inputPath = hdfsBasePath + "/spark/classSort/input.txt";
// Of the output result file hdfs route
String outputPath = hdfsBasePath + "/spark/classSort/";
System.out.println("input path : " + inputPath);
System.out.println("output path : " + outputPath);
// Import files
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
// branch
JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));
// Sort
JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());
// cutting key - value
JavaPairRDD<String, Integer> splits =
sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));
// Grouping
JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();
// Get only value Top three
JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());
// Get 10 One key Content of
List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);
for (Tuple2<String, List<Integer>> t: tuple2s) {
for (Integer i : t._2() ) {
System.err.println(t._1() + " " + i);
}
System.err.println("--------------------");
}
}
static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {
@Override
public Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
Iterator<Integer> its = stringIterableTuple2._2().iterator();
List<Integer> ints = new ArrayList<>();
int count = 0;
while (its.hasNext()) {
++count;
ints.add(its.next());
if (count >= 3) break;
}
return new Tuple2<>(stringIterableTuple2._1(), ints);
}
}
static class MySort1 implements Serializable, Comparator<String> {
@Override
public int compare(String o1, String o2) {
String[] o1s = o1.split(" ");
String[] o2s = o2.split(" ");
int o1s_2 = Integer.valueOf(o1s[1]);
int o2s_2 = Integer.valueOf(o2s[1]);
if (o1s_2 >= o2s_2) return -1;
return 1;
}
}
}
It turns out :
class2 88
class2 78
class2 64
--------------------
class3 91
class3 88
class3 77
--------------------
class1 92
class1 88
class1 79
--------------------
版权声明
本文为[Lionel leemon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231353535502.html
边栏推荐
- Test the time required for Oracle library to create an index with 7 million data in a common way
- Detailed explanation of Oracle tablespace table partition and query method of Oracle table partition
- Apache Atlas Compilation and installation records
- Leetcode? The first common node of two linked lists
- Solution of discarding evaluate function in surprise Library
- Strange bug of cnpm
- 10g database cannot be started when using large memory host
- freeCodeCamp----time_ Calculator exercise
- Function executes only the once function for the first time
- Modify the Jupiter notebook style
猜你喜欢

Solution of discarding evaluate function in surprise Library

Dolphin scheduler source package Src tar. GZ decompression problem

神经元与神经网络

解决方案架构师的小锦囊 - 架构图的 5 种类型
![[machine learning] Note 4. KNN + cross validation](/img/a1/5afccedf509eda92a0fe5bf9b6cbe9.png)
[machine learning] Note 4. KNN + cross validation

Oracle告警日志alert.log和跟踪trace文件中文乱码显示

cnpm的诡异bug

10g database cannot be started when using large memory host

OSS cloud storage management practice (polite experience)

JMeter pressure test tool
随机推荐
freeCodeCamp----arithmetic_ Arranger exercise
SQL learning | complex query
Tensorflow Download
SQL learning window function
蓝绿发布、滚动发布、灰度发布,有什么区别?
Express ② (routage)
Oracle database combines the query result sets of multiple columns into one row
JMeter pressure test tool
Quartus Prime硬件实验开发(DE2-115板)实验二功能可调综合计时器设计
专题测试05·二重积分【李艳芳全程班】
【vmware】vmware tools 地址
Move blog to CSDN
大专的我,闭关苦学 56 天,含泪拿下阿里 offer,五轮面试,六个小时灵魂拷问
Interval query through rownum
Oracle and MySQL batch query all table names and table name comments under users
leetcode--380.O(1) 时间插入、删除和获取随机元素
elmo(BiLSTM-CRF+elmo)(Conll-2003 命名实体识别NER)
Oracle clear SQL cache
美联储数字货币最新进展
项目中遇到的问题(五)操作Excel接口Poi的理解