当前位置:网站首页>Spark 算子之交集、并集、差集
Spark 算子之交集、并集、差集
2022-04-23 15:45:00 【逆风飞翔的小叔】
前言
在日常开发中,经常涉及到对不同集合数据进行交集,并集和差集的操作,在Spark 中,也提供了类似的算子帮助我们处理这样的业务,即双 Value 类型 数据处理;
intersection
函数签名
def intersection(other: RDD[T]): RDD[T]
函数说明
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
案例一,求两个集合的交集
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DoubleValueTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
sc.stop()
}
}
运行上面的代码,观察控制台输出效果
union
函数签名
def union(other: RDD[T]): RDD[T]
函数说明
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DoubleValueTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
/*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))*/
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
sc.stop()
}
}
subtract
函数签名
def subtract(other: RDD[T]): RDD[T]
函数说明
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DoubleValueTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
/*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))*/
// 并集 : 【1,2,3,4,3,4,5,6】
/*val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))*/
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
sc.stop()
}
}
zip
zip又俗称拉链算子,函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素, Value 为第 2 个 RDD 中的相同位置的元素
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DoubleValueTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
/*val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))*/
// 并集 : 【1,2,3,4,3,4,5,6】
/*val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))*/
// 差集 : 【1,2】
/*val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))*/
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
版权声明
本文为[逆风飞翔的小叔]所创,转载请带上原文链接,感谢
https://blog.csdn.net/congge_study/article/details/124360329
边栏推荐
- The El tree implementation only displays a certain level of check boxes and selects radio
- PHP PDO ODBC将一个文件夹的文件装载到MySQL数据库BLOB列,并将BLOB列下载到另一个文件夹
- cadence SPB17. 4 - Active Class and Subclass
- WPS品牌再升级专注国内,另两款国产软件低调出国门,却遭禁令
- 使用 Bitnami PostgreSQL Docker 镜像快速设置流复制集群
- 现在做自媒体能赚钱吗?看完这篇文章你就明白了
- Explanation 2 of redis database (redis high availability, persistence and performance management)
- 建设星际计算网络的愿景
- IronPDF for .NET 2022.4.5455
- C language --- advanced pointer
猜你喜欢
ICE -- 源码分析
IronPDF for .NET 2022.4.5455
C language --- advanced pointer
大厂技术实现 | 行业解决方案系列教程
携号转网最大赢家是中国电信,为何人们嫌弃中国移动和中国联通?
Load Balancer
Special analysis of China's digital technology in 2022
mysql乐观锁解决并发冲突
Use bitnami PostgreSQL docker image to quickly set up stream replication clusters
Today's sleep quality record 76 points
随机推荐
What are the mobile app software testing tools? Sharing of third-party software evaluation
shell脚本中的DATE日期计算
utils. Deprecated in35 may be cancelled due to upgrade. What should I do
PHP function
What if the server is poisoned? How does the server prevent virus intrusion?
现在做自媒体能赚钱吗?看完这篇文章你就明白了
Introduction to dynamic programming of leetcode learning plan day3 (198213740)
utils.DeprecatedIn35 因升级可能取消,该如何办
控制结构(一)
IronPDF for . NET 2022.4.5455
Upgrade MySQL 5.1 to 5.69
Connect PHP to MSSQL via PDO ODBC
Go concurrency and channel
The length of the last word of the string
Rsync + inotify remote synchronization
Explanation of redis database (IV) master-slave replication, sentinel and cluster
时序模型:长短期记忆网络(LSTM)
Explanation 2 of redis database (redis high availability, persistence and performance management)
Neodynamic Barcode Professional for WPF V11.0
软件性能测试报告起着什么作用?第三方测试报告如何收费?