当前位置:网站首页>Coalesce and repartition of spark operators
Coalesce and repartition of spark operators
2022-04-23 15:48:00 【Uncle flying against the wind】
Preface
We know ,Spark In carrying out the task , It can be executed in parallel , Data can be distributed to different partitions for processing , But in actual use , For example, in some scenarios , At first, there was a large amount of data , The partition given is 4 individual , But at the end of data processing , Want partition reduction , Reduce resource overhead , This involves the dynamic adjustment of partitions , It's about to use Spark Provided coalesce And repartition These two operators ;
coalesce
Function signature
def coalesce( numPartitions: Int , shuffle: Boolean = false ,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
Function description
According to the amount of data Reduce partitions , Used after big data set filtering , Improve the execution efficiency of small data sets , When spark In the program , When there are too many small tasks , Can pass coalesce Method , Shrink merge partition , Reduce Number of partitions , Reduce the cost of task scheduling ;
Case presentation
Create a collection , Save data to 3 In a partition file
import org.apache.spark.{SparkConf, SparkContext}
object Coalesce_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
rdd.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code , You can see in the local directory , Generated 3 File
At this time, we want to reduce the partition , After reducing the partition , The number of files generated will be reduced accordingly
import org.apache.spark.{SparkConf, SparkContext}
object Coalesce_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
rdd.saveAsTextFile("E:\\output")
rdd.coalesce(2,true)
sc.stop()
}
}
Run the above program again , You can see that only 2 File
In this way, the purpose of reducing partitions is achieved ;
repartition
and coalesce contrary ,repartition It can achieve the purpose of expanding partitions
Function signature
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Function description
This operation actually performs coalesce operation , Parameters shuffle The default value is true . Regardless of the number of partitions RDD Convert to less partitions RDD , Or the one with fewer partitions RDD Convert to a with a large number of partitions RDD , repartition All operations can be completed , Because it will go through shuffle The process ;
Case presentation
Create a collection , Use 2 Save files to local partition
import org.apache.spark.{SparkConf, SparkContext}
object Repartition_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code , You can see that it is generated locally 2 File
Now , We hope to expand the zoning , Improve the parallel processing ability of tasks , Use repartition
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Repartition_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
// coalesce The operator can expand the partition , But if not shuffle operation , It doesn't make sense , It doesn't work .
// So if you want to achieve the effect of expanding partitions , Need to use shuffle operation
// spark Provides a simplified operation
// Reduce partitions :coalesce, If you want data balance , May adopt shuffle
// Expansion of zoning :repartition, What the underlying code calls is coalesce, And definitely use shuffle
//val newRDD: RDD[Int] = rdd.coalesce(3, true)
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code again , At this time, you can see that... Is generated in the original file directory 3 File
版权声明
本文为[Uncle flying against the wind]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231544587369.html
边栏推荐
- 贫困的无网地区怎么有钱建设网络?
- js正則判斷域名或者IP的端口路徑是否正確
- Upgrade MySQL 5.1 to 5.68
- cadence SPB17. 4 - Active Class and Subclass
- s16.基于镜像仓库一键安装containerd脚本
- [section 5 if and for]
- 糖尿病眼底病变综述概要记录
- c语言---字符串+内存函数
- dlopen/dlsym/dlclose的简单用法
- [AI weekly] NVIDIA designs chips with AI; The imperfect transformer needs to overcome the theoretical defect of self attention
猜你喜欢
How can poor areas without networks have money to build networks?
C language --- advanced pointer
R语言中实现作图对象排列的函数总结
Load Balancer
JVM-第2章-类加载子系统(Class Loader Subsystem)
Timing model: gated cyclic unit network (Gru)
王启亨谈Web3.0与价值互联网“通证交换”
Spark 算子之filter使用
C language --- string + memory function
IronPDF for .NET 2022.4.5455
随机推荐
【AI周报】英伟达用AI设计芯片;不完美的Transformer要克服自注意力的理论缺陷
IronPDF for . NET 2022.4.5455
Go concurrency and channel
s16.基于镜像仓库一键安装containerd脚本
字符串排序
【开源工具分享】单片机调试助手(示波/改值/日志) - LinkScope
PS add texture to picture
多线程原理和常用方法以及Thread和Runnable的区别
Config learning notes component
Application case of GPS Beidou high precision satellite time synchronization system
Go language, condition, loop, function
PHP operators
Large factory technology implementation | industry solution series tutorials
Redis主从复制过程
Multitimer V2 reconstruction version | an infinitely scalable software timer
Why disable foreign key constraints
Upgrade MySQL 5.1 to 5.68
Spark 算子之groupBy使用
Demonstration meeting on startup and implementation scheme of swarm intelligence autonomous operation smart farm project
Upgrade MySQL 5.1 to 5.67