当前位置:网站首页>Spark基础【RDD转换算子】

Spark基础【RDD转换算子】

2022-08-10 22:07:00 hike76

一 RDD单Value类型转换算子

1 filter

函数签名
def filter(f: T => Boolean): RDD[T]

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 2
)

val rdd1: RDD[Int] = rdd.filter(
  num => num % 2 == 1
)

rdd1.collect().foreach(println)

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

小功能:从agent.log文件中获取第二列为2的所有uid(第一列),部分数据展示如下

1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
val lines: RDD[String] = sc.textFile("data/agent.log")

val filterLines: RDD[String] = lines.filter(
  line => {
    
    val datas = line.split(" ")
    datas(1).contains("2")
  }
)

val value: RDD[String] = filterLines.map(
  line => {
    
    val datas = line.split(" ")
    datas(0)
  }
)

value.collect().foreach(println)

2 sample

函数签名
def sample(
  withReplacement: Boolean,
  fraction: Double,
  seed: Long = Utils.random.nextLong): RDD[T]

根据指定的规则从数据集中抽取数据

如何解决数据倾斜:最简单的方式就是扩容,如HashMap

在数据量非常大的情况下,有时为了分析造成数据倾斜的原因,需要从原数据中抽取出一部分进行分析,或者当需要将10000个存储在数据库中的对象存储到内存中,不确定内存是否足够,也可以抽取一部分对象,查看占用内存大小,从而推算出10000个对象所占的内存

第一个参数表示抽取数据的方式

  • 抽取放回,true,伯努利算法

    又叫0、1分布。例如扔硬币,要么正面,要么反面

  • 抽取不放回,false,泊松算法

第二个参数和第一个参数有关系

  • 如果是抽取不放回的场景,此参数表示每条数据被抽取的几率,并不是指被抽取的个数

    val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    val rdd1: RDD[Int] = rdd.sample(false,0.5)
    
  • 如果是抽取放回的场景,此参数表示每条数据希望被重复抽取的次数

    val rdd1: RDD[Int] = rdd.sample(true,2)
    

第三个参数表示随机数种子

随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数,使用这个随机数种子可以产生其他的数据,如另随机算法为XXX

3 = XXX(1),通过随机数种子1产生数据3

2 = XXX(3),通过随机数种子3,产生数据2

val rdd1: RDD[Int] = rdd.sample(false,0.5,2)

源码如下

if (withReplacement) {
    
  new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
    
  new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}

3 coalesce

函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

先假设有三个分区,每个分区内部各有1000条数据,经过过滤操作后,分区内剩余数据分为两种情况

  • 分区一10条,分区二100条,分区三998条
  • 分区一100条,分区二300条,分区三900条

针对第二种情况,三个分区需要申请三个资源去处理,前两个资源执行的快,因为分区三没执行完,所以前两个资源也不可以释放,造成资源浪费,为了解决这种情况,可以进行缩减分区,将一二两个分区的数据一起处理,申请一个资源

针对第一种情况,即使将一二两个分区的数据一起处理,整体的效率也不高,所以可以使用shuffle来打乱分区

这里需要注意,缩减并不是在原始数据上进行缩减,而是在分区后的数据进行缩减,也可以称其为合并

默认情况下分区不会shuffle

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 3
)
val rdd1: RDD[Int] = rdd.coalesce(2)

rdd.saveAsTextFile("output")	//分区一1,2 分区二3,4 分区三5,6
rdd1.saveAsTextFile("output1")	//分区一1,2 分区二3,4,5,6

shuffle表示一个分区的数据被打乱后和其他分区的数据重新组合在一起

在合并的过程中,程序并不一定会将两个数据少的分区缩减在一起,根据首选位置(地理位置)进行缩减,尽量减少网络传输,增加效率,所以在某些情况下,以上操作无法解决数据倾斜问题

所以还可以在缩减分区的同时,进行数据的shuffle操作,使得分区内的数据更加均匀,shuffle操作,前期慢,后期计算快

val rdd1: RDD[Int] = rdd.coalesce(2,true)

扩大分区:数据量多,分区数少,分区内的数据很多,给计算资源造成很大压力,增加分区,减少计算压力,简单来说就是降低负载

val rdd: RDD[Int] = sc.makeRDD(
  List(1, 2, 3, 4, 5, 6), 2
)

val rdd1: RDD[Int] = rdd.coalesce(3,true)

rdd.saveAsTextFile("output")
rdd1.saveAsTextFile("output1")

如果不进行shuffle,coalesce扩大分区没有意义

4 repartition

函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程

val rdd1: RDD[Int] = rdd.repartition(3)

repartition源码

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    
  coalesce(numPartitions, shuffle = true)
}

5 distinct

函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

将数据集中重复的数据去重

与scala集合(单点)的去重不同,scala集合直接将数据放到HashSet中,自动去重,rdd(分布式)的海量数据去重不能直接在内存中进行计算

scala去重源码

def distinct: Repr = {
    
  val isImmutable = this.isInstanceOf[immutable.Seq[_]]
  if (isImmutable && lengthCompare(1) <= 0) repr
  else {
    
    val b = newBuilder
    val seen = new mutable.HashSet[A]()
    var it = this.iterator
    var different = false
    while (it.hasNext) {
    
      val next = it.next
      if (seen.add(next)) b += next else different = true
    }
    if (different || !isImmutable) b.result() else repr
  }
}

rdd去重部分源码

map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
执行流程:
元数据【1,1,1】
【(1null),(1null),(1null)】
【nullnullnull】
【nullnull】
【(1null)】
【1

当发现分区有问题时,可以在distinct中传递分区数量,所以在distinct底层存在shuffle

val rdd1: RDD[Int] = rdd.distinct(3)

6 sortBy

函数签名
def sortBy[K](
  f: (T) => K,
  ascending: Boolean = true,
  numPartitions: Int = this.partitions.length)
  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程

val rdd: RDD[Int] = sc.makeRDD(
  List(4,7,2,1,4,8), 3
)

val rdd1: RDD[Int] = rdd.sortBy(num => num)

println(rdd1.collect().mkString(","))

降序排序

val rdd1: RDD[Int] = rdd.sortBy(num => num,false)

二 RDD双Value类型转换算子

1 intersection

对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的交集

函数签名
def intersection(other: RDD[T]): RDD[T]
val rdd: RDD[Int] = sc.makeRDD(
  List(1,2,3,4), 3
)

val rdd1: RDD[Int] = sc.makeRDD(
  List(3,4,5,6), 3
)

println(rdd.intersection(rdd1).collect().mkString(","))

2 union

对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的并集

函数签名
def union(other: RDD[T]): RDD[T]
println(rdd.union(rdd1).collect().mkString(","))

3 subtract

对源RDD和参数RDD求交集后返回一个新的RDD,取两集合的差集

函数签名
def subtract(other: RDD[T]): RDD[T]
println(rdd.subtract(rdd1).collect().mkString(","))

1 2 3 如果两个RDD数据类型不一致,不能进行操作

4 zip

函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素

println(rdd.zip(rdd1).collect().mkString(","))

如果两个RDD分区数据数量不一致

val rdd: RDD[Int] = sc.makeRDD(
  List(1,2,3,4), 2
)

val rdd1: RDD[Int] = sc.makeRDD(
  List(3,4,5,6,7,8), 2
)

Can only zip RDDs with same number of elements in each partition

使两个RDD分区内数据相同,两个RDD数据分区不一致

val rdd: RDD[Int] = sc.makeRDD(
  List(1,2,3,4), 2
)

val rdd1: RDD[Int] = sc.makeRDD(
  List(3,4,5,6,7,8), 3
)

Can’t zip RDDs with unequal numbers of partitions: List(2, 3)

所以需要保证两个RDD分区数量要相同,每个分区内的元素数量还要相等

两个RDD数据类型不一致,可以进行操作

val rdd2: RDD[String] = sc.makeRDD(
  List("3","4","5","6"),2
)
 println(rdd.zip(rdd2).collect().mkString(","))

三 RDD Key -Value类型转换算子

1 partitionBy

函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

将数据按照指定Partitioner对每一条数据重新进行分区。Spark默认的分区器是HashPartitioner

与repartition不相同,repartition强调分区数量的变化,不关心数据怎么变

partitionBy算子更关心数据的分区规则

val rdd: RDD[Int] = sc.makeRDD(
  List(1,2,3,4), 2
)

val rdd1: RDD[(Int, Int)] = rdd.map((_,1))

rdd1.partitionBy(null)

以上调用RDD对象的partitionBy方法会报错,因为经过二次编译,隐式转换使得RDD变为了PairRDDFunctions,源码如下

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    
  new PairRDDFunctions(rdd)
}

Spark的分区器:

  • RangePartitioner:按照一定的范围进行分区

    sortBy使用的就是RangePartitioner分区器

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
        : RDD[(K, V)] = self.withScope
    {
          
      val part = new RangePartitioner(numPartitions, self, ascending)
      new ShuffledRDD[K, V, V](self, part)
        .setKeyOrdering(if (ascending) ordering else ordering.reverse)
    }
    
  • HashPartitioner:默认shuffle分区器

    val rdd: RDD[Int] = sc.makeRDD(
      List(1,2,3,4), 2
    )
    
    val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
    
    rdd1.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
    

    HashPartitioner源码

    def getPartition(key: Any): Int = key match {
          
      case null => 0
      case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
    }
    def nonNegativeMod(x: Int, mod: Int): Int = {
          
        val rawMod = x % mod
        rawMod + (if (rawMod < 0) mod else 0)
    }
    

2 reduceByKey

函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

可以将数据按照相同的Key对Value分在一个组中,然后进行reduce操作

val rdd: RDD[(String,Int)] = sc.makeRDD(
  List(
    ("a", 1),
    ("a", 1),
    ("a", 1),
    ("b", 1)
  )
)
val wordCount: RDD[(String, Int)] = rdd.reduceByKey(_ + _)

reduceByKey可以实现WordCount(2/10)

原网站

版权声明
本文为[hike76]所创,转载请带上原文链接,感谢
https://blog.csdn.net/weixin_43923463/article/details/126272166