当前位置:网站首页>Spark基础【RDD单Value类型转换算子】

Spark基础【RDD单Value类型转换算子】

2022-08-09 23:25:00 hike76

RDD转换算子

RDD的方法有很多,但一般分为两大类,第一类是逻辑的封装,将旧的逻辑转换为新的逻辑,称之为转换算子;第二类是执行逻辑,将封装好的逻辑进行执行,让整个作业运行起来,称之为行动算子

算子

问题(初始)–> operator(算子,操作,方法) –> 问题(解决,完成)

RDD根据数据处理方式的不同将算子整体上分为单Value类型、双Value类型和Key-Value类型

将RDD的方法称为算子的原因是与Scala集合的方法进行区分

如以下代码中的两个foreach方法,第一个为scala集合(单点)中的方法,第二个为RDD(分布式)的方法

val wordCount: RDD[(String, Int)] = sc.textFile("data/word.txt").map((_,1)).reduceByKey(_ + _)
val array: Array[(String, Int)] = wordCount.collect()
array.foreach(println)
wordCount.foreach(println)

单Value类型

1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

def map[U: ClassTag](f: T => U): RDD[U]

map算子表示将数据源中的每一条数据进行处理

map算子的参数是函数类型:Int => U,输入Int类型的数据,输出类型不确定

“转换”概念的体现过程:从rdd算子转换成了newRdd算子

def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val rdd = sc.makeRDD(List(1,2,3,4))
  
  def mapFunction ( num : Int) : Int = {
    
    num * 2
  }
  val newRdd: RDD[Int] = rdd.map(mapFunction)
  newRdd.collect().foreach(println)

  sc.stop()
}

每次写mapFunction有些麻烦,可以使用函数至简原则

val newRdd: RDD[Int] = rdd.map(_ * 2)

转换之后如何分区,数据执行的顺序如何

查看newRdd分区数量(2个)

newRdd.saveAsTextFile("output")

在RDD进行转换时,新的RDD和旧的RDD的分区数量保持一致,源码如下,返回所有依赖的RDD中的第一个RDD的分区数量

/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
    
  dependencies.head.rdd.asInstanceOf[RDD[U]]
}

数据在处理过程中,默认情况下,分区不变,原来数据在哪个分区,转换完成之后还是在哪里

数据在处理过程中,要遵循执行顺序:分区内有序,分区间无序

使用如下代码查看是否满足分区内有序,分区间无序的规则

val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

val newRdd: RDD[Int] = rdd.map(
      num => {
    
        println("num = " + num)
        num * 2
      }
    )

RDD其实就是封装的逻辑,如果有多个RDD,那么第一条数据应该所有的逻辑执行完毕,再执行下一条数据,RDD没有等待的功能

val newRdd1: RDD[Int] = rdd.map(
  num => {
    
    println("############### num = " + num)
    num * 2
  }
)

val newRdd2: RDD[Int] = newRdd1.map(
  num => {
    
    println("*************** num = " + num)
    num * 2
  }
)

(1)案例:从服务器日志数据agent.log中获取第四列数据

部分数据如下

1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val lineRDD: RDD[String] = sc.textFile("data/agent.log")
  val forthRDD: RDD[String] = lineRDD.map(
    line => {
    
      val datas = line.split(" ")
      datas(3)
    }
  )
  forthRDD.collect().foreach(println)

  sc.stop()
}

2 mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,Iterator[T] => Iterator[U],数据量可以增加

def mapPartitions[U: ClassTag](
 f: Iterator[T] => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
val rdd = sc.makeRDD(List(1,2,3,4),2)

val rdd1 = rdd.mapPartitions(
  list => {
    
    println("*************")
    list.map(_ * 2)
  }
)

将分区内的数据先放在Executor中(缓存),增加效率,以上程序中mapPartitions执行两次,而map执行四次,虽然提升了效率,但也存在缺点

  • 占用内存多
  • 处理完的数据不会释放

(1)map和mapPartitions的区别

  • 数据处理角度

    Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

  • 功能的角度

    Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

  • 性能的角度

    Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,一般使用map操作

综上,有时完成比完美更重要

(2)java克隆浅复制

何时考虑不使用接口而是使用实现类:当需要使用实现类中的特有方法时,会定义为实现类类型,第一种声明方式,ArrayList中的特有方式无法使用,比如clone方法

public class Test {
    
    public static void main(String[] args) {
    
        User user = new User();
        user.name = "zhangsan";

        List<User> userList = new ArrayList<User>();
        //userList.clone();
        ArrayList<User> userList1 = new ArrayList<User>();
        userList1.clone();
    }
}
class User{
    
    public String name;
}

克隆之后,两块内存空间不同

userList1.add(user);
ArrayList<User> userList2 = (ArrayList<User>)userList1.clone();
System.out.println(userList1 == userList2);	//false

以下代码,两块内存地址引用了相同的对象

final User user1 = userList2.get(0);
user1.name = "lisi";
System.out.println(userList1 == userList2);	//false
System.out.println(userList1);	//[[email protected]]
System.out.println(userList2);	//[[email protected]]

这种现象,称为java克隆浅复制,只复制集合最外层集合的内存,但是如果集合引用了其他内存,不会复制

java克隆浅复制存在引用问题,mapPartitions处理完的数据不会释放,那么引用也不会被释放,当全部处理完成时,才会释放,意味着数据越多,持续时间越长,占用内存空间越大

(3)案例:获取每个数据分区的最大值

def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val rdd = sc.makeRDD(List(1,2,3,4),2)

  val rdd1: RDD[Int] = rdd.mapPartitions(
    list => {
    
      val max = list.max
      List(max).iterator
    }
  )
  rdd1.collect().foreach(println)
}

3 mapPartitionsWithIndex

现有三个分区,只获取第二个分区的数据

分区间无序,所以以下代码是错误的

var count = 0
val rdd1 = rdd.mapPartitions(
  list => {
    
    if(count == 1){
    
      count = count + 1
      list
    }else{
    
      count = count + 1
      Nil.iterator
    }
  }
)

使用mapPartitionsWithIndex方法

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)

  val rdd1 = rdd.mapPartitionsWithIndex(
    (index,list) => {
    
      if(index == 1){
    
        list
      }else{
    
        Nil.iterator
      }
    }
  )

  rdd1.collect().foreach(println)
}

4 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val rdd: RDD[String] = sc.makeRDD(
    List("hello scala", "hello spark")
  )
  val rdd1: RDD[String] = rdd.flatMap(_.split(" "))
  rdd1.collect().foreach(println)
}

flatMap输入是数据集的整体(一个),返回的是拆分后的个体(多个),使用容器将个体包装起来

val rdd1 = rdd.flatMap(
  str => {
    
    str.split(" ")
  }
)
def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val rdd: RDD[List[Int]] = sc.makeRDD(
    List(
      List(1, 2), List(3, 4)
    )
  )

  val rdd1 = rdd.flatMap(
    List =>List
  )

  rdd1.collect().foreach(println)
}

第一个List是整体,第二个List代表的是容器

(1)案例:将List(List(1,2),3,List(4,5))进行扁平化操作

模式匹配

val rdd1 = rdd.flatMap {
    
  case list : List[_] => list
  case other => List(other)
}

5 glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

def glom(): RDD[Array[T]]

将个体变成整体

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 2
)
val rdd1: RDD[Array[Int]] = rdd.glom()
rdd1.collect().foreach(a => println(a.mkString(",")))
// 1,2,3
// 4,5,6

(1)案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 2
)
val rdd1: RDD[Array[Int]] = rdd.glom()

val rdd2: RDD[Int] = rdd1.map(_.max)

println(rdd2.collect().sum)

6 groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

此算子根据函数计算结果进行分组,执行结果为KV键值对数据类型,K是分组标识,V为同一个组中的数据集合

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 2
)

val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
rdd1.collect().foreach(println)

默认情况下,数据处理后所在的分区不会发生改变

Spark要求,一个组的数据必须在一个分区中

一个分区的数据被打乱,和其他分区的数据组合在一起,这个操作称为shuffle,现在想进行两分区数据相加,但第一个RDD中有很多分区,分区内有很多数据,shuffle如何做计算呢,在内存中是否等待所有数据到来之后再进行运算,即使等待,内存不够怎么办?

所以,shuffle操作不允许在内存中等待,必须落盘,因此shuffle的速度慢

shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据

写数据的阶段如果没有完成,读数据的阶段不能执行

conf.set("spark.local.dir","e:/")

通过windows下的spark环境,执行groupBy相关代码,可以在监控页面查看到“Shuffle Read”和“Shuffle Write”两个阶段

shuffle的操作可以更改分区

val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2,2)

(1)案例:将List(“Hello”,“hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

val rdd: RDD[String] = sc.makeRDD(
  List("Hello","hive", "hbase", "Hadoop")
)

val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1))
rdd1.collect().foreach(println)

不区分首字母大小写

val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1).toUpperCase())

(2)案例:按照agent.log中的第二列数据分组,相同求和

def main(args: Array[String]): Unit = {
    
  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val lines: RDD[String] = sc.textFile("data/agent.log")
  val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map(
    lines => {
    
      val datas: Array[String] = lines.split(" ")
      (datas(1), 1)
    }
  ).groupBy(_._1)
  val value: RDD[(String, Int)] = groupRDD.mapValues(_.size)
  value.collect().foreach(println)
}

groupBy算子可以实现WordCount(共有十种算子可以实现,1/10)

原网站

版权声明
本文为[hike76]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_43923463/article/details/126253887