当前位置:网站首页>Spark 算子之交集、并集、差集

Spark 算子之交集、并集、差集

2022-04-23 15:45:00 逆风飞翔的小叔

前言

在日常开发中,经常涉及到对不同集合数据进行交集,并集和差集的操作,在Spark 中,也提供了类似的算子帮助我们处理这样的业务,即Value 类型 数据处理;

intersection

函数签名

def intersection(other: RDD[T]): RDD[T]

函数说明

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

案例一,求两个集合的交集

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object DoubleValueTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1,2,3,4))
    val rdd2 = sc.makeRDD(List(3,4,5,6))
    val rdd7 = sc.makeRDD(List("3","4","5","6"))

    // 交集 : 【3,4】
    val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    //val rdd8 = rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))


    sc.stop()
  }

}

运行上面的代码,观察控制台输出效果

 

union

函数签名

def union(other: RDD[T]): RDD[T]

函数说明

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object DoubleValueTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1,2,3,4))
    val rdd2 = sc.makeRDD(List(3,4,5,6))
    val rdd7 = sc.makeRDD(List("3","4","5","6"))

    // 交集 : 【3,4】
    /*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    //val rdd8 = rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))*/

  // 并集 : 【1,2,3,4,3,4,5,6】
    val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    

    sc.stop()
  }

}

subtract

函数签名

def subtract(other: RDD[T]): RDD[T]

函数说明

以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object DoubleValueTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1,2,3,4))
    val rdd2 = sc.makeRDD(List(3,4,5,6))
    val rdd7 = sc.makeRDD(List("3","4","5","6"))

    // 交集 : 【3,4】
    /*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    //val rdd8 = rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))*/

  // 并集 : 【1,2,3,4,3,4,5,6】
    /*val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))*/

      // 差集 : 【1,2】
    val rdd5: RDD[Int] = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))


    sc.stop()
  }

}

zip

zip又俗称拉链算子,函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 RDD
中的元素, Value 为第 2 RDD 中的相同位置的元素

import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}
  
  object DoubleValueTest {
  
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
  
      // TODO 算子 - 双Value类型
  
      // 交集,并集和差集要求两个数据源数据类型保持一致
      // 拉链操作两个数据源的类型可以不一致
  
      val rdd1 = sc.makeRDD(List(1,2,3,4))
      val rdd2 = sc.makeRDD(List(3,4,5,6))
      val rdd7 = sc.makeRDD(List("3","4","5","6"))
  
      // 交集 : 【3,4】
      /*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
      //val rdd8 = rdd1.intersection(rdd7)
      println(rdd3.collect().mkString(","))*/
  
    // 并集 : 【1,2,3,4,3,4,5,6】
      /*val rdd4: RDD[Int] = rdd1.union(rdd2)
      println(rdd4.collect().mkString(","))*/
  
        // 差集 : 【1,2】
      /*val rdd5: RDD[Int] = rdd1.subtract(rdd2)
      println(rdd5.collect().mkString(","))*/
  
      // 拉链 : 【1-3,2-4,3-5,4-6】
      val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
      val rdd8 = rdd1.zip(rdd7)
      println(rdd6.collect().mkString(","))
  
      sc.stop()
    }
  
  }

版权声明
本文为[逆风飞翔的小叔]所创,转载请带上原文链接,感谢
https://blog.csdn.net/congge_study/article/details/124360329