当前位置:网站首页>Spark基础【RDD单Value类型转换算子】
Spark基础【RDD单Value类型转换算子】
2022-08-09 23:25:00 【hike76】
RDD转换算子
RDD的方法有很多,但一般分为两大类,第一类是逻辑的封装,将旧的逻辑转换为新的逻辑,称之为转换算子;第二类是执行逻辑,将封装好的逻辑进行执行,让整个作业运行起来,称之为行动算子
算子
问题(初始)–> operator(算子,操作,方法) –> 问题(解决,完成)
RDD根据数据处理方式的不同将算子整体上分为单Value类型、双Value类型和Key-Value类型
将RDD的方法称为算子的原因是与Scala集合的方法进行区分
如以下代码中的两个foreach方法,第一个为scala集合(单点)中的方法,第二个为RDD(分布式)的方法
val wordCount: RDD[(String, Int)] = sc.textFile("data/word.txt").map((_,1)).reduceByKey(_ + _)
val array: Array[(String, Int)] = wordCount.collect()
array.foreach(println)
wordCount.foreach(println)
单Value类型
1 map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
def map[U: ClassTag](f: T => U): RDD[U]
map算子表示将数据源中的每一条数据进行处理
map算子的参数是函数类型:Int => U,输入Int类型的数据,输出类型不确定
“转换”概念的体现过程:从rdd算子转换成了newRdd算子
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1,2,3,4))
def mapFunction ( num : Int) : Int = {
num * 2
}
val newRdd: RDD[Int] = rdd.map(mapFunction)
newRdd.collect().foreach(println)
sc.stop()
}
每次写mapFunction有些麻烦,可以使用函数至简原则
val newRdd: RDD[Int] = rdd.map(_ * 2)
转换之后如何分区,数据执行的顺序如何
查看newRdd分区数量(2个)
newRdd.saveAsTextFile("output")
在RDD进行转换时,新的RDD和旧的RDD的分区数量保持一致,源码如下,返回所有依赖的RDD中的第一个RDD的分区数量
/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
数据在处理过程中,默认情况下,分区不变,原来数据在哪个分区,转换完成之后还是在哪里
数据在处理过程中,要遵循执行顺序:分区内有序,分区间无序
使用如下代码查看是否满足分区内有序,分区间无序的规则
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val newRdd: RDD[Int] = rdd.map(
num => {
println("num = " + num)
num * 2
}
)
RDD其实就是封装的逻辑,如果有多个RDD,那么第一条数据应该所有的逻辑执行完毕,再执行下一条数据,RDD没有等待的功能
val newRdd1: RDD[Int] = rdd.map(
num => {
println("############### num = " + num)
num * 2
}
)
val newRdd2: RDD[Int] = newRdd1.map(
num => {
println("*************** num = " + num)
num * 2
}
)
(1)案例:从服务器日志数据agent.log中获取第四列数据
部分数据如下
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("data/agent.log")
val forthRDD: RDD[String] = lineRDD.map(
line => {
val datas = line.split(" ")
datas(3)
}
)
forthRDD.collect().foreach(println)
sc.stop()
}
2 mapPartitions
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,Iterator[T] => Iterator[U],数据量可以增加
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] )
val rdd = sc.makeRDD(List(1,2,3,4),2)
val rdd1 = rdd.mapPartitions(
list => {
println("*************")
list.map(_ * 2)
}
)
将分区内的数据先放在Executor中(缓存),增加效率,以上程序中mapPartitions执行两次,而map执行四次,虽然提升了效率,但也存在缺点
- 占用内存多
- 处理完的数据不会释放
(1)map和mapPartitions的区别
数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
功能的角度
Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
性能的角度
Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用,一般使用map操作
综上,有时完成比完美更重要
(2)java克隆浅复制
何时考虑不使用接口而是使用实现类:当需要使用实现类中的特有方法时,会定义为实现类类型,第一种声明方式,ArrayList中的特有方式无法使用,比如clone方法
public class Test {
public static void main(String[] args) {
User user = new User();
user.name = "zhangsan";
List<User> userList = new ArrayList<User>();
//userList.clone();
ArrayList<User> userList1 = new ArrayList<User>();
userList1.clone();
}
}
class User{
public String name;
}
克隆之后,两块内存空间不同
userList1.add(user);
ArrayList<User> userList2 = (ArrayList<User>)userList1.clone();
System.out.println(userList1 == userList2); //false
以下代码,两块内存地址引用了相同的对象
final User user1 = userList2.get(0);
user1.name = "lisi";
System.out.println(userList1 == userList2); //false
System.out.println(userList1); //[[email protected]]
System.out.println(userList2); //[[email protected]]
这种现象,称为java克隆浅复制,只复制集合最外层集合的内存,但是如果集合引用了其他内存,不会复制
java克隆浅复制存在引用问题,mapPartitions处理完的数据不会释放,那么引用也不会被释放,当全部处理完成时,才会释放,意味着数据越多,持续时间越长,占用内存空间越大
(3)案例:获取每个数据分区的最大值
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
val rdd1: RDD[Int] = rdd.mapPartitions(
list => {
val max = list.max
List(max).iterator
}
)
rdd1.collect().foreach(println)
}
3 mapPartitionsWithIndex
现有三个分区,只获取第二个分区的数据
分区间无序,所以以下代码是错误的
var count = 0
val rdd1 = rdd.mapPartitions(
list => {
if(count == 1){
count = count + 1
list
}else{
count = count + 1
Nil.iterator
}
}
)
使用mapPartitionsWithIndex方法
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
val rdd1 = rdd.mapPartitionsWithIndex(
(index,list) => {
if(index == 1){
list
}else{
Nil.iterator
}
}
)
rdd1.collect().foreach(println)
}
4 flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(
List("hello scala", "hello spark")
)
val rdd1: RDD[String] = rdd.flatMap(_.split(" "))
rdd1.collect().foreach(println)
}
flatMap输入是数据集的整体(一个),返回的是拆分后的个体(多个),使用容器将个体包装起来
val rdd1 = rdd.flatMap(
str => {
str.split(" ")
}
)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val rdd: RDD[List[Int]] = sc.makeRDD(
List(
List(1, 2), List(3, 4)
)
)
val rdd1 = rdd.flatMap(
List =>List
)
rdd1.collect().foreach(println)
}
第一个List是整体,第二个List代表的是容器
(1)案例:将List(List(1,2),3,List(4,5))进行扁平化操作
模式匹配
val rdd1 = rdd.flatMap {
case list : List[_] => list
case other => List(other)
}
5 glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
def glom(): RDD[Array[T]]
将个体变成整体
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 3, 4, 5, 6), 2
)
val rdd1: RDD[Array[Int]] = rdd.glom()
rdd1.collect().foreach(a => println(a.mkString(",")))
// 1,2,3
// 4,5,6
(1)案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 3, 4, 5, 6), 2
)
val rdd1: RDD[Array[Int]] = rdd.glom()
val rdd2: RDD[Int] = rdd1.map(_.max)
println(rdd2.collect().sum)
6 groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
此算子根据函数计算结果进行分组,执行结果为KV键值对数据类型,K是分组标识,V为同一个组中的数据集合
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 3, 4, 5, 6), 2
)
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
rdd1.collect().foreach(println)
默认情况下,数据处理后所在的分区不会发生改变
Spark要求,一个组的数据必须在一个分区中
一个分区的数据被打乱,和其他分区的数据组合在一起,这个操作称为shuffle,现在想进行两分区数据相加,但第一个RDD中有很多分区,分区内有很多数据,shuffle如何做计算呢,在内存中是否等待所有数据到来之后再进行运算,即使等待,内存不够怎么办?
所以,shuffle操作不允许在内存中等待,必须落盘,因此shuffle的速度慢
shuffle会将完整的计算过程一分为二,形成两个阶段,一个阶段用于写数据,一个阶段用于读数据
写数据的阶段如果没有完成,读数据的阶段不能执行
conf.set("spark.local.dir","e:/")
通过windows下的spark环境,执行groupBy相关代码,可以在监控页面查看到“Shuffle Read”和“Shuffle Write”两个阶段
shuffle的操作可以更改分区
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2,2)
(1)案例:将List(“Hello”,“hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
val rdd: RDD[String] = sc.makeRDD(
List("Hello","hive", "hbase", "Hadoop")
)
val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1))
rdd1.collect().foreach(println)
不区分首字母大小写
val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(_.substring(0,1).toUpperCase())
(2)案例:按照agent.log中的第二列数据分组,相同求和
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("data/agent.log")
val groupRDD: RDD[(String, Iterable[(String, Int)])] = lines.map(
lines => {
val datas: Array[String] = lines.split(" ")
(datas(1), 1)
}
).groupBy(_._1)
val value: RDD[(String, Int)] = groupRDD.mapValues(_.size)
value.collect().foreach(println)
}
groupBy算子可以实现WordCount(共有十种算子可以实现,1/10)
边栏推荐
猜你喜欢
随机推荐
GoLang 使用 goroutine 停止的几种办法
Golden Warehouse Database KingbaseGIS User Manual (6.4. Geometry Object Access Function)
Digital wallets, red sea ecological rapid introduction of small programs can help capture device entry wisdom
阿雷的血压有些低
【问题解决】训练和验证准确率很高,但测试准确率很低
恭喜获奖得主 | 互动有礼获赠 Navicat Premium
When knowledge and action are one
Distributed database problem (2): data replication
【剑指offer】第一题 第二题
Golden Warehouse Database KingbaseGIS User Manual (6.5. Geometry Object Editing Function)
大龄测试员刚迈过了 35 岁这个“坎儿”,和大家说点儿心里话
Wireshark classic practice and interview 13-point summary
Eureka protects itself
从TRPO到PPO(理论分析与数学证明)
Jpa 查询view or 无主键的table
什么是平面文件数据库? 如何导入多种格式的文件:DSV、JSON、XML?
The technical aspects of the byte have been passed, and the salary has been negotiated for 20K*13, but the result is still being brushed. I asked the HR why...
第十二,十三章 mysql数据类型,视图的课后练习
Eureka自我保护
Dry goods!Towards robust test-time adaptation