当前位置:网站首页>Introduction to spark basic operation

Introduction to spark basic operation

2022-04-23 13:56:00 Lionel leemon

Spark Basic operation

One ,Spark Installation of

Fill in later

Two ,Spark Introduce

2.1 RDD

2.1.1 RDD And its characteristics

RDD yes Spark The core data model of , But an abstract class , Its full name is Resillient Distributed Dataset, namely Elastic distributed data sets .

2、RDD In the abstract, it is a collection of elements , Contains data . It's partitioned , It's divided into sections , Each partition is distributed on different nodes in the cluster , So that RDD The data in can be operated in parallel .( Distributed datasets )

3、RDD Usually by Hadoop File on , namely HDFS Documents or Hive surface , To create ; Sometimes you can create... From collections in your application .

4、RDD The most important characteristic is , Provides fault tolerance , Can automatically recover from node failure . That is, if the RDDpartition, Because of node failure , Cause data loss , that RDD Will automatically recalculate the partition. It's all transparent to users .

5、RDD Data stored in memory by default , But when memory resources are low ,Spark Will automatically RDD Data written to disk .( elastic )

2.1.2 Create RDD

Conduct Spark The first step in core programming is to create an initial RDD. The RDD, It usually represents and contains Spark Input source data for the application . And then through Spark Core Provided transformation operator , For the RDD convert , To get something else RDD.

Spark Core Three kinds of creation are provided RDD The way :

  1. Use the collection in the program to create RDD( Mainly used for testing )

    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers);
    
  2. Create... Using local files RDD( It is mainly used for temporary processing of files with large amount of data )

    SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
    sparkConf.setMaster("local[*]");
    
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    String tmpPath = "E:\\data\\spark\\input.txt";
    JavaRDD<String> lines = javaSparkContext.textFile(tmpPath);
    
  3. Use HDFS File creation RDD( Common ways of producing environments )

SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

String hdfsBasePath = "hdfs://10.197.29.203:9000";
// Of a text file hdfs route 
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
// Import files ( Create RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

2.1.3 operation RDD

Spark Two kinds of support RDD operation :transformation and action.

transformation

transformation The operation will focus on the existing RDD Create a new RDD.transformation have lazy characteristic , namely transformation Not trigger spark The execution of the program That's ok , They just record right RDD Operation done , Will not spontaneously execute . Only one... Has been executed action, Everything before transformation Will execute .

frequently-used transformation Introduce :

  • map : take RDD Each element in passes a custom function , Get a new element , Then make up a new... With new elements RDD.
  • flatMap: And map similar , But one or more elements can be returned for each element .
  • mapToPair: On map Similar , The output type is <k, v> Format .
  • filter: Yes RDD Each element of the , If you return true The retention , return false Then remove .
  • groupByKey: according to key Grouping , Every key Corresponding to one Iterable<value>.
  • reduceByKey: For each key Corresponding value Conduct reduce operation .
  • sortByKey: For each key Corresponding value Sort operations .
  • join: For two contains <key,value> Right RDD Conduct join operation , Every keyjoin Upper pair, Will be passed in custom functions for processing .
  • cogroup: Same as join, But every one of them key Corresponding Iterable Will be passed in custom functions for processing .

action

action The operation is mainly for RDD Do the last thing , For example, traversal ,reduce, Save to file, etc , And can return the result to Driver Program .action Operation execution , It will trigger a spark job Operation of , And trigger this action All before transformation Implementation , This is a action Characteristics of .

frequently-used action Introduce :

  • reduce: take RDD All elements in are aggregated . The first and second elements converge , Values aggregate with the third element , Value converges with the fourth element , And so on .
  • collect: take RDD All elements in get to local client ( It is generally not recommended to use ).
  • count: obtain RDD The total number of elements .
  • take(n): obtain RDD Middle front n Elements .
  • saveAsTextFile: take RDD The element is saved to a file , Call for each element toString Method .
  • countByKey: For each key Corresponding value count Count .
  • foreach: Traverse RDD Every element in .

2.2 Sorting and associating

2.2.1 SortByKey

This method is only right Key Sort ,value Don't order .

Default sort

		JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        List<Tuple2<String, Integer>> source = Arrays.asList(
            new Tuple2<>("lmc", 89), 
            new Tuple2<>("hh", 89), 
            new Tuple2<>("tom", 79),
            new Tuple2<>("marry", 89), 
            new Tuple2<>("sb", 89), 
            new Tuple2<>("lemoon", 79));
        // Import data 
        JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);
        System.out.println(textFile.sortByKey().collect());// Descending 
        System.out.println(textFile.sortByKey(false).collect());// Ascending 

 Print the results :
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]

Custom sort

		List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
        // Import data 
        JavaRDD<String> textFile = javaSparkContext.parallelize(source);
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
        JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
        JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
        List<Tuple2<String, String>> list = ss.take(10);
        for (Tuple2<String, String> t: list) {
    
            System.err.println(t._1() + " : " + t._2());
        }

 Print the results :
1  :   1
1  :   5
1  :   7
1  :   12
2  :   4
2  :   7
3  :   6
5  :   3

Custom comparison class MySort:

class MySort implements Serializable, Comparator<String> {
    

    @Override
    public int compare(String o1, String o2) {
    
        String[] o1s = o1.split(" ");
        String[] o2s = o2.split(" ");
        int o1s_1 = Integer.valueOf(o1s[0]);
        int o1s_2 = Integer.valueOf(o1s[1]);
        int o2s_1 = Integer.valueOf(o2s[0]);
        int o2s_2 = Integer.valueOf(o2s[1]);
        if (o1s_1 > o2s_1) {
    
            return 1;
        }else if (o1s_1 == o2s_1){
    
            return o1s_2 - o2s_2;
        }
        return -1;
    }
}

2.2.2 join and cogroup

join Is to put two sets according to key, Content aggregation , and cogroup In the process of aggregation, we will first analyze RDD The same in key A merger .

package com.lmc.spark.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/** * @ClassName: JoinTest * @author: Leemon * @Description: TODO * @date: 2021/3/17 13:54 * @version: 1.0 */
public class JoinTest {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        List<Tuple2<String, Integer>> source1 = Arrays.asList(
                new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));
        List<Tuple2<String, Integer>> source2 = Arrays.asList(
                new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),
                new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));
        // Import data 
        JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);
        JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);

        // Associated data 
        JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);
        JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);

        List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();
        List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();

        // Output 
        target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));
        System.out.println("===================================================");
        target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));

    }

}

// Output results 
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58

3、 ... and ,Spark Basic case (WordCount)

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lmc-user</artifactId>
        <groupId>com.lmc</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-test</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.3</version>
        </dependency>

    </dependencies>

</project>

3.2 WordCount.java

demand : Count the number of occurrences of each word in the file

Source file :

One morning a fox sees a cock.He think,"This is my breakfast.''

He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.

he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.

They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''

Calculation :

package com.lmc.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/** * @ClassName: WorkCount * @author: Leemon * @Description: TODO * @date: 2021/1/28 15:10 * @version: 1.0 */
public class WordCount {
    

    public static void main(String[] args) {
    

        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://10.197.29.203:9000";
        // Of a text file hdfs route 
        String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
        // Of the output result file hdfs route 
        String outputPath = hdfsBasePath + "/wordCount/output2/";

        System.out.println("input path : " + inputPath);

        System.out.println("output path : " + outputPath);

        // Import files 
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

        JavaPairRDD<String, Integer> counts = textFile
                // Each line is broken into words , After returning, form a large collection 
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
                //key Is the word ,value yes 1
                .mapToPair(word -> new Tuple2<>(word, 1))
                // be based on key Conduct reduce, Logic is to value Add up 
                .reduceByKey((a, b) -> a + b);

        // First the key and value Come back , Again according to key Sort 
        JavaPairRDD<String, Integer> sorts = counts
                //key and value Reverse , Generate a new map
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));
                // according to key Reverse order 
                //.sortByKey(false);

        // Take before 10 individual 
        List<Tuple2<String, Integer>> collect = sorts.take(10);

        // Print out 
        for(Tuple2<String, Integer> tuple2 : collect){
    
            System.out.println(tuple2._1() + "\t" + tuple2._2());
        }

        // Merge partitions into one , Then export it as a txt Save in hdfs
        javaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);

        // close context
        javaSparkContext.close();
    }

}

It turns out :

(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)

3.3 FileSort.java

demand :

  1. Sort by the first column in the file .
  2. If the first column is the same , Then sort by the second column .
/** * @ClassName: FileSort * @author: Leemon * @Description: TODO * @date: 2021/3/8 16:33 * @version: 1.0 */
public class FileSort {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
        // Import data 
        JavaRDD<String> textFile = javaSparkContext.parallelize(source);
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
        JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
        JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
        List<Tuple2<String, String>> list = ss.take(10);
        for (Tuple2<String, String> t: list) {
    
            System.err.println(t._1() + " : " + t._2());
        }
    }

    static class MySort implements Serializable, Comparator<String> {
    

        @Override
        public int compare(String o1, String o2) {
    
            String[] o1s = o1.split(" ");
            String[] o2s = o2.split(" ");
            int o1s_1 = Integer.valueOf(o1s[0]);
            int o1s_2 = Integer.valueOf(o1s[1]);
            int o2s_1 = Integer.valueOf(o2s[0]);
            int o2s_2 = Integer.valueOf(o2s[1]);
            if (o1s_1 > o2s_1) {
    
                return 1;
            }else if (o1s_1 == o2s_1){
    
                return o1s_2 - o2s_2;
            }
            return -1;
        }

    }

}
//1 : 1
//1 : 5
//1 : 7
//1 : 12
//2 : 4
//2 : 7
//3 : 6
//5 : 3

3.4 ClassSort.java

demand : For the students in each class , Before removal 3 name .

Data source :

class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91

Calculation :

/** * @ClassName: ClassSort * @author: Leemon * @Description: TODO * @date: 2021/3/9 17:15 * @version: 1.0 */
public class ClassSort {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://10.197.29.203:9000";
        // Of a text file hdfs route 
        String inputPath = hdfsBasePath + "/spark/classSort/input.txt";
        // Of the output result file hdfs route 
        String outputPath = hdfsBasePath + "/spark/classSort/";
        System.out.println("input path : " + inputPath);
        System.out.println("output path : " + outputPath);

        // Import files 
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
        // branch 
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));
        // Sort 
        JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());
        // cutting key - value
        JavaPairRDD<String, Integer> splits =
                sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));
        // Grouping 
        JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();
        // Get only value Top three 
        JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());
        // Get 10 One key Content of 
        List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);
        for (Tuple2<String, List<Integer>> t: tuple2s) {
    
            for (Integer i : t._2() ) {
    
                System.err.println(t._1() + " " + i);
            }
            System.err.println("--------------------");
        }
    }

    static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {
    

        @Override
        public Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
    
            Iterator<Integer> its = stringIterableTuple2._2().iterator();
            List<Integer> ints = new ArrayList<>();
            int count = 0;
            while (its.hasNext()) {
    
                ++count;
                ints.add(its.next());
                if (count >= 3) break;
            }
            return new Tuple2<>(stringIterableTuple2._1(), ints);
        }
    }

    static class MySort1 implements Serializable, Comparator<String> {
    

        @Override
        public int compare(String o1, String o2) {
    
            String[] o1s = o1.split(" ");
            String[] o2s = o2.split(" ");
            int o1s_2 = Integer.valueOf(o1s[1]);
            int o2s_2 = Integer.valueOf(o2s[1]);
            if (o1s_2 >= o2s_2) return -1;
            return 1;
        }
    }
}

It turns out :

class2  88
class2  78
class2  64
--------------------
class3  91
class3  88
class3  77
--------------------
class1  92
class1  88
class1  79
--------------------

版权声明
本文为[Lionel leemon]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231353535502.html