当前位置:网站首页>Coalesce and repartition of spark operators
Coalesce and repartition of spark operators
2022-04-23 15:48:00 【Uncle flying against the wind】
Preface
We know ,Spark In carrying out the task , It can be executed in parallel , Data can be distributed to different partitions for processing , But in actual use , For example, in some scenarios , At first, there was a large amount of data , The partition given is 4 individual , But at the end of data processing , Want partition reduction , Reduce resource overhead , This involves the dynamic adjustment of partitions , It's about to use Spark Provided coalesce And repartition These two operators ;
coalesce
Function signature
def coalesce( numPartitions: Int , shuffle: Boolean = false ,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
Function description
According to the amount of data Reduce partitions , Used after big data set filtering , Improve the execution efficiency of small data sets , When spark In the program , When there are too many small tasks , Can pass coalesce Method , Shrink merge partition , Reduce Number of partitions , Reduce the cost of task scheduling ;
Case presentation
Create a collection , Save data to 3 In a partition file
import org.apache.spark.{SparkConf, SparkContext}
object Coalesce_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
rdd.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code , You can see in the local directory , Generated 3 File
At this time, we want to reduce the partition , After reducing the partition , The number of files generated will be reduced accordingly
import org.apache.spark.{SparkConf, SparkContext}
object Coalesce_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
rdd.saveAsTextFile("E:\\output")
rdd.coalesce(2,true)
sc.stop()
}
}
Run the above program again , You can see that only 2 File
In this way, the purpose of reducing partitions is achieved ;
repartition
and coalesce contrary ,repartition It can achieve the purpose of expanding partitions
Function signature
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Function description
This operation actually performs coalesce operation , Parameters shuffle The default value is true . Regardless of the number of partitions RDD Convert to less partitions RDD , Or the one with fewer partitions RDD Convert to a with a large number of partitions RDD , repartition All operations can be completed , Because it will go through shuffle The process ;
Case presentation
Create a collection , Use 2 Save files to local partition
import org.apache.spark.{SparkConf, SparkContext}
object Repartition_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// In the beginning, set 3 Zones
val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code , You can see that it is generated locally 2 File
Now , We hope to expand the zoning , Improve the parallel processing ability of tasks , Use repartition
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Repartition_Test {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
// coalesce The operator can expand the partition , But if not shuffle operation , It doesn't make sense , It doesn't work .
// So if you want to achieve the effect of expanding partitions , Need to use shuffle operation
// spark Provides a simplified operation
// Reduce partitions :coalesce, If you want data balance , May adopt shuffle
// Expansion of zoning :repartition, What the underlying code calls is coalesce, And definitely use shuffle
//val newRDD: RDD[Int] = rdd.coalesce(3, true)
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("E:\\output")
sc.stop()
}
}
Run the above code again , At this time, you can see that... Is generated in the original file directory 3 File
版权声明
本文为[Uncle flying against the wind]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231544587369.html
边栏推荐
猜你喜欢
多生成树MSTP的配置
负载均衡器
Demonstration meeting on startup and implementation scheme of swarm intelligence autonomous operation smart farm project
Spark 算子之filter使用
Neodynamic Barcode Professional for WPF V11.0
糖尿病眼底病变综述概要记录
Merging of Shanzhai version [i]
Import address table analysis (calculated according to the library file name: number of imported functions, function serial number and function name)
Redis主从复制过程
Config learning notes component
随机推荐
JVM-第2章-类加载子系统(Class Loader Subsystem)
API IX JWT auth plug-in has an error. Risk announcement of information disclosure in response (cve-2022-29266)
Spark 算子之交集、并集、差集
Neodynamic Barcode Professional for WPF V11.0
Spark 算子之sortBy使用
Spark 算子之groupBy使用
网站压测工具Apache-ab,webbench,Apache-Jemeter
字符串最后一个单词的长度
Go语言数组,指针,结构体
Modèle de Cluster MySQL et scénario d'application
贫困的无网地区怎么有钱建设网络?
s16.基于镜像仓库一键安装containerd脚本
Named in pytoch_ parameters、named_ children、named_ Modules function
PHP classes and objects
Spark 算子之coalesce与repartition
shell_ two
utils. Deprecated in35 may be cancelled due to upgrade. What should I do
使用 Bitnami PostgreSQL Docker 镜像快速设置流复制集群
一文掌握vscode远程gdb调试
Why is IP direct connection prohibited in large-scale Internet