当前位置:网站首页>Spark入门基本操作

Spark入门基本操作

2022-04-23 13:54:00 李奈 - Leemon

Spark基本操作

一,Spark的安裝

以后补上

二,Spark介紹

2.1 RDD

2.1.1 RDD及其特點

RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集

2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。

4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

2.1.2 創建RDD

进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

Spark Core提供了三种创建RDD的方式:

  1. 使用程序中的集合创建RDD(主要用于测试)

    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    JavaRDD<Integer> numbersRDD = javaSparkContext.parallelize(numbers);
    
  2. 使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)

    SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
    sparkConf.setMaster("local[*]");
    
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    String tmpPath = "E:\\data\\spark\\input.txt";
    JavaRDD<String> lines = javaSparkContext.textFile(tmpPath);
    
  3. 使用HDFS文件创建RDD(生产环境的常用方式)

SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
sparkConf.setMaster("local[*]");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

String hdfsBasePath = "hdfs://10.197.29.203:9000";
//文本文件的hdfs路径
String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
//导入文件(創建RDD)
JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

2.1.3 操作RDD

Spark支持两种RDD操作:transformation和action。

transformation

transformation 操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执 行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。

常用的transformation介绍:

  • map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。
  • flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。
  • mapToPair:於map類似,輸出類型為 <k, v>格式。
  • filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。
  • groupByKey:根据key进行分组,每个key对应一个Iterable<value>
  • reduceByKey:对每个key对应的value进行reduce操作。
  • sortByKey:对每个key对应的value进行排序操作。
  • join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。
  • cogroup:同join,但是每个key对应的Iterable都会传入自定义函数进行处理。

action

action 操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个 spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

常用的action介绍:

  • reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。
  • collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。
  • count:获取RDD元素总数。
  • take(n):获取RDD中前n个元素。
  • saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。
  • countByKey:对每个key对应的值进行count计数。
  • foreach:遍历RDD中的每个元素。

2.2 排序和關聯

2.2.1 SortByKey

这个方法只是对Key进行排序,value不排序。

默認排序

		JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        List<Tuple2<String, Integer>> source = Arrays.asList(
            new Tuple2<>("lmc", 89), 
            new Tuple2<>("hh", 89), 
            new Tuple2<>("tom", 79),
            new Tuple2<>("marry", 89), 
            new Tuple2<>("sb", 89), 
            new Tuple2<>("lemoon", 79));
        //導入數據
        JavaPairRDD<String, Integer> textFile = javaSparkContext.parallelizePairs(source);
        System.out.println(textFile.sortByKey().collect());//降序
        System.out.println(textFile.sortByKey(false).collect());//升序

打印結果:
[(hh,89), (lemoon,79), (lmc,89), (marry,89), (sb,89), (tom,79)]
[(tom,79), (sb,89), (marry,89), (lmc,89), (lemoon,79), (hh,89)]

自定義排序

		List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
        //導入數據
        JavaRDD<String> textFile = javaSparkContext.parallelize(source);
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
        JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
        JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
        List<Tuple2<String, String>> list = ss.take(10);
        for (Tuple2<String, String> t: list) {
    
            System.err.println(t._1() + " : " + t._2());
        }

打印結果:
1  :   1
1  :   5
1  :   7
1  :   12
2  :   4
2  :   7
3  :   6
5  :   3

自定義比較類MySort:

class MySort implements Serializable, Comparator<String> {
    

    @Override
    public int compare(String o1, String o2) {
    
        String[] o1s = o1.split(" ");
        String[] o2s = o2.split(" ");
        int o1s_1 = Integer.valueOf(o1s[0]);
        int o1s_2 = Integer.valueOf(o1s[1]);
        int o2s_1 = Integer.valueOf(o2s[0]);
        int o2s_2 = Integer.valueOf(o2s[1]);
        if (o1s_1 > o2s_1) {
    
            return 1;
        }else if (o1s_1 == o2s_1){
    
            return o1s_2 - o2s_2;
        }
        return -1;
    }
}

2.2.2 join和cogroup

join就是把两个集合根据key,进行内容聚合,而cogroup在聚合时会先对RDD中相同的key进行合并。

package com.lmc.spark.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/** * @ClassName: JoinTest * @author: Leemon * @Description: TODO * @date: 2021/3/17 13:54 * @version: 1.0 */
public class JoinTest {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        List<Tuple2<String, Integer>> source1 = Arrays.asList(
                new Tuple2<>("lmc", 89), new Tuple2<>("hh", 89), new Tuple2<>("tom", 79), new Tuple2<>("lmc", 41));
        List<Tuple2<String, Integer>> source2 = Arrays.asList(
                new Tuple2<>("lmc", 46), new Tuple2<>("hh", 74), new Tuple2<>("tom", 92),
                new Tuple2<>("lmc", 68), new Tuple2<>("hh", 83), new Tuple2<>("tom", 58));
        //導入數據
        JavaPairRDD<String, Integer> textFile1 = javaSparkContext.parallelizePairs(source1);
        JavaPairRDD<String, Integer> textFile2 = javaSparkContext.parallelizePairs(source2);

        //關聯數據
        JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroup = textFile1.cogroup(textFile2);
        JavaPairRDD<String, Tuple2<Integer, Integer>> join = textFile1.join(textFile2);

        List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> target1 = cogroup.collect();
        List<Tuple2<String, Tuple2<Integer, Integer>>> target2 = join.collect();

        //輸出
        target1.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ": " + t._2()._2()));
        System.out.println("===================================================");
        target2.forEach(t -> System.out.println(t._1() + " -> (" + t._2()._1() + ", " + t._2()._2()));

    }

}

//輸出結果
hh -> ([89]: [74, 83]
lmc -> ([89, 41]: [46, 68]
tom -> ([79]: [92, 58]
===================================================
hh -> (89, 74
hh -> (89, 83
lmc -> (89, 46
lmc -> (89, 68
lmc -> (41, 46
lmc -> (41, 68
tom -> (79, 92
tom -> (79, 58

三,Spark基本案例(WordCount)

3.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lmc-user</artifactId>
        <groupId>com.lmc</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-test</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.3</version>
        </dependency>

    </dependencies>

</project>

3.2 WordCount.java

需求:計算文件中每個單詞出現的次數

源文件:

One morning a fox sees a cock.He think,"This is my breakfast.''

He comes up to the cock and says,"I know you can sing very well.Can you sing for me?''The cock is glad.He closes his eyes and begins to sing.

he fox sees that and caches him in his mouth and carries him away. The people in the field see the fox.

They cry,"Look,look!The fox is carrying the cock away.''The cock says to the fox,"Mr Fox,do you understand?The people say you are carrying their cock away.Tell them it is yours.Not theirs.''

計算:

package com.lmc.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/** * @ClassName: WorkCount * @author: Leemon * @Description: TODO * @date: 2021/1/28 15:10 * @version: 1.0 */
public class WordCount {
    

    public static void main(String[] args) {
    

        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://10.197.29.203:9000";
        //文本文件的hdfs路径
        String inputPath = hdfsBasePath + "/wordCount/input/input.txt";
        //输出结果文件的hdfs路径
        String outputPath = hdfsBasePath + "/wordCount/output2/";

        System.out.println("input path : " + inputPath);

        System.out.println("output path : " + outputPath);

        //导入文件
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

        JavaPairRDD<String, Integer> counts = textFile
                //每一行都分割成单词,返回后组成一个大集合
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
                //key是单词,value是1
                .mapToPair(word -> new Tuple2<>(word, 1))
                //基于key进行reduce,逻辑是将value累加
                .reduceByKey((a, b) -> a + b);

        //先将key和value倒过来,再按照key排序
        JavaPairRDD<String, Integer> sorts = counts
                //key和value颠倒,生成新的map
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._1(), tuple2._2()));
                //按照key倒排序
                //.sortByKey(false);

        //取前10个
        List<Tuple2<String, Integer>> collect = sorts.take(10);

        //打印出来
        for(Tuple2<String, Integer> tuple2 : collect){
    
            System.out.println(tuple2._1() + "\t" + tuple2._2());
        }

        //分区合并成一个,再导出为一个txt保存在hdfs
        javaSparkContext.parallelize(collect).coalesce(1).saveAsTextFile(outputPath);

        //关闭context
        javaSparkContext.close();
    }

}

結果:

(him,2)
(cry,"Look,look!The,1)
(fox,"Mr,1)
(are,1)
(Fox,do,1)
(his,3)
(is,5)
(well.Can,1)
(away,1)
(can,1)

3.3 FileSort.java

需求:

  1. 按照文件中的第一列排序。
  2. 如果第一列相同,则按照第二列排序。
/** * @ClassName: FileSort * @author: Leemon * @Description: TODO * @date: 2021/3/8 16:33 * @version: 1.0 */
public class FileSort {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        List<String> source = Arrays.asList("1 5", "5 3", "2 7", "1 7", "3 6", "2 4", "1 1", "1 12");
        //導入數據
        JavaRDD<String> textFile = javaSparkContext.parallelize(source);
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2<>(l, null));
        JavaPairRDD<String, Integer> sorts = lines.sortByKey(new MySort());
        JavaPairRDD<String, String> ss = sorts.mapToPair(s -> new Tuple2<>(s._1().split(" ")[0], s._1().split(" ")[1]));
        List<Tuple2<String, String>> list = ss.take(10);
        for (Tuple2<String, String> t: list) {
    
            System.err.println(t._1() + " : " + t._2());
        }
    }

    static class MySort implements Serializable, Comparator<String> {
    

        @Override
        public int compare(String o1, String o2) {
    
            String[] o1s = o1.split(" ");
            String[] o2s = o2.split(" ");
            int o1s_1 = Integer.valueOf(o1s[0]);
            int o1s_2 = Integer.valueOf(o1s[1]);
            int o2s_1 = Integer.valueOf(o2s[0]);
            int o2s_2 = Integer.valueOf(o2s[1]);
            if (o1s_1 > o2s_1) {
    
                return 1;
            }else if (o1s_1 == o2s_1){
    
                return o1s_2 - o2s_2;
            }
            return -1;
        }

    }

}
//1 : 1
//1 : 5
//1 : 7
//1 : 12
//2 : 4
//2 : 7
//3 : 6
//5 : 3

3.4 ClassSort.java

需求:对每个班级内的学生成绩,取出前3名。

數據源:

class1 56
class2 64
class1 79
class3 88
class1 92
class3 67
class2 62
class3 77
class1 88
class2 78
class2 88
class3 91

計算:

/** * @ClassName: ClassSort * @author: Leemon * @Description: TODO * @date: 2021/3/9 17:15 * @version: 1.0 */
public class ClassSort {
    

    public static void main(String[] args) {
    
        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");
        sparkConf.setMaster("local[*]");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://10.197.29.203:9000";
        //文本文件的hdfs路径
        String inputPath = hdfsBasePath + "/spark/classSort/input.txt";
        //输出结果文件的hdfs路径
        String outputPath = hdfsBasePath + "/spark/classSort/";
        System.out.println("input path : " + inputPath);
        System.out.println("output path : " + outputPath);

        //导入文件
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);
        //分行
        JavaPairRDD<String, Integer> lines = textFile.mapToPair(l -> new Tuple2(l, null));
        //排序
        JavaPairRDD<String, Integer> sort = lines.sortByKey(new MySort1());
        //切割key - value
        JavaPairRDD<String, Integer> splits =
                sort.mapToPair(g -> new Tuple2<>(g._1().split(" ")[0], Integer.valueOf(g._1().split(" ")[1])));
        //分組
        JavaPairRDD<String, Iterable<Integer>> groups = splits.groupByKey();
        //只獲取value前三個
        JavaPairRDD<String, List<Integer>> g = groups.mapToPair(new MyList());
        //獲取10個key的內容
        List<Tuple2<String, List<Integer>>> tuple2s = g.take(10);
        for (Tuple2<String, List<Integer>> t: tuple2s) {
    
            for (Integer i : t._2() ) {
    
                System.err.println(t._1() + " " + i);
            }
            System.err.println("--------------------");
        }
    }

    static class MyList implements Serializable, PairFunction<Tuple2<String, Iterable<Integer>>, String, List<Integer>> {
    

        @Override
        public Tuple2<String, List<Integer>> call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
    
            Iterator<Integer> its = stringIterableTuple2._2().iterator();
            List<Integer> ints = new ArrayList<>();
            int count = 0;
            while (its.hasNext()) {
    
                ++count;
                ints.add(its.next());
                if (count >= 3) break;
            }
            return new Tuple2<>(stringIterableTuple2._1(), ints);
        }
    }

    static class MySort1 implements Serializable, Comparator<String> {
    

        @Override
        public int compare(String o1, String o2) {
    
            String[] o1s = o1.split(" ");
            String[] o2s = o2.split(" ");
            int o1s_2 = Integer.valueOf(o1s[1]);
            int o2s_2 = Integer.valueOf(o2s[1]);
            if (o1s_2 >= o2s_2) return -1;
            return 1;
        }
    }
}

結果:

class2  88
class2  78
class2  64
--------------------
class3  91
class3  88
class3  77
--------------------
class1  92
class1  88
class1  79
--------------------

版权声明
本文为[李奈 - Leemon]所创,转载请带上原文链接,感谢
https://blog.csdn.net/lmchhh/article/details/124350239