当前位置:网站首页>04 Spark on 读取外部数据分区策略(源码角度分析)
04 Spark on 读取外部数据分区策略(源码角度分析)
2022-08-08 23:31:00 【YaPengLi.】
Spark读取外部数据分区策略
先来看一段代码,使用textFile方式读取外部数据。
val conf: SparkConf = new SparkConf().setAppName("SparkWordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("/Users/liyapeng/Spark/data",2)
textFile可以将文件作为数据处理的数据源,用户未给定时Hadoop RDD的默认最小分区数为2,注意最小分区数并不是最终分区数。
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
Spark读取文件,底层其实使用的就是Hadoop的读取方式。
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
package org.apache.hadoop.mapred;
public class TextInputFormat extends FileInputFormat<LongWritable, Text>
读取字节数总和
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
FileStatus[] stats = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, stats.length);
long totalSize = 0; // compute total size
boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
&& job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
List<FileStatus> files = new ArrayList<>(stats.length);
for (FileStatus file: stats) { // check we have valid files
if (file.isDirectory()) {
if (!ignoreDirs) {
throw new IOException("Not a file: "+ file.getPath());
}
} else {
files.add(file);
totalSize += file.getLen();
}
}
计算每个分区存储字节数
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
最终结果:
注:有个hadoop分区存储有个1.1的概念即是10%
totalSize = 7
goalSize = 7 / 2 = 3(byte)
part = 7 / 3 = 2...1 (1.1) + 1 = 3(分区)
边栏推荐
- 小程序banner图展示
- Hi3516 使用 wifi模块
- 2022牛客多校六 B-Eezie and Pie (dfs)
- 【latex异常与错误】There were undefined references.Reference `xxx‘ on page x undefined.参考引用公式编号时发生错误
- Manacher(求解最长回文子串)
- RecyclerView的多选模式
- 每日一R「01」跟着大佬学 Rust
- (2022牛客多校四)K-NIO‘s Sword(思维)
- 北斗网络同步时钟与GPS卫星时钟同步设备的区别
- (Codeforce 757)E. Bash Plays with Functions(积性函数)
猜你喜欢
51nod 1706 最短路 + 思维
STM8L LCD digital tube driver, thermometer LCD display
域前置通信过程和溯源思路
Low-Light Image Enhancement via a Deep Hybrid Network阅读札记
2022牛客多校六 J-Number Game(简单推理)
(2022牛客多校三)J-Journey(dijkstra)
Hi3516 使用 wifi模块
2022杭电多校五 C - Slipper (dijkstra+虚拟结点)
2022杭电多校六 1009-Map (巴那赫不动点)
51nod 2882最短路 (树链剖分)
随机推荐
PHP regular to img SRC to add the domain name
(2022杭电多校四)1011-Link is as bear(思维+线性基)
Low-Light Image Enhancement via a Deep Hybrid Network阅读札记
moved异常,ask重定向
考证必看 | PMP扫盲贴+PMP材料
sess.restore() 和 tf.import_meta_graph() 在使用时的一些关联
生成二维码容错级别的问题
【瑞吉外卖】day04:员工分页查询、启用/禁用员工账号、编辑员工信息
【Tensorflow2】tensorflow1.x-tensorflow2.x一些接口的转变
用模态框 实现 注册 登陆
【Pytorch】学习笔记(一)
力扣每日一题-第50天-383. 赎金信
积性函数
(2022杭电多校四)1001-Link with Bracket Sequence II(区间动态规划)
php convert timestamp to just, minutes ago, hours ago, days ago format
(codeforce547)C-Mike and Foam(质因子+容斥原理)
STM8L 液晶数码管驱动,温度计液晶屏显示
(2022牛客多校四)A-Task Computing (排序+动态规划)
(2022牛客多校五)H-Cutting Papers(签到)
(2022牛客多校三)J-Journey(dijkstra)