当前位置:网站首页>spark学习笔记(九)——sparkSQL核心编程-DataFrame/DataSet/DF、DS、RDD三者之间的转换关系
spark学习笔记(九)——sparkSQL核心编程-DataFrame/DataSet/DF、DS、RDD三者之间的转换关系
2022-08-10 19:01:00 【一个人的牛牛】
目录
RDD<=>DataSet<=>DataFrame转换编码实现
前言
Spark SQL可以理解为Spark Core的一种封装,在模型上和上下文环境对象上进行了封装;
SQLContext查询起始点:用于Spark自己提供的SQL查询;
HiveContext查询起始点:用于连接Hive的查询。
SparkSession:是Spark最新的SQL查询起始点,是SQLContext和HiveContext的组合,在 SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。
注:Spark Core首先构建上下文环境对象SparkContext才可以执行应用程序,sparkSQL和spark core类似。使用spark-shell的时候, spark框架会自动创建一个名称叫做spark的SparkSession对象, 就像我们以前可以自动获取到一个sc来表示SparkContext对象一样。
DataFrame
创建DataFrame
在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:
(1) 从spark数据源创建
spark支持创建的数据源格式:csv format jdbc json load option options orc parquet schema
table text textFile
在bin目录下创建input目录,在input目录下创建user.json,内容为:
{"username":"zj","age":20}
{"username":"xx","age":21}
{"username":"yy","age":22}
读取bin/input/user.json文件
(2)从RDD转换
(3)从hive table查询返回
SQL语法
SQL语法是指查询数据的时候使用SQL语句查询,这种查询必须要有临时视图或全局视图来辅助。
(1)读取json文件
(2)创建临时表
(3)实现查询
(4)创建全局表
df.createOrReplaceGlobalTempView("user2")
注:普通临时表是Session范围内的;如果想扩大有效应用范围,可以使用全局临时表。使用全局临时表时需要全路径访问,global_temp.user2
(5)实现查询
#查询
spark.sql("select * from global_temp.user2").show()
#使用新的session查询
spark.newSession().sql("select * from global_temp.user2").show()
spark.newSession().sql("select age from global_temp.user2").show()
DSL语法
DataFrame提供一个特定领域语言DSL(domain-specific language)去管理结构化的数据。可以在Scala, Java, Python、R中使用DSL,使用DSL语法不用去创建临时视图。
(1)创建DataFrame
(2)查看DataFrame的schema信息
(3)查看数据
1)查看age数据
2)查看username和age+1数据
3)filter查看大于21数据
4)groupBy按age分组查看数据条数
RDD转换为DataFrame
(1)sc.textFile创建RDD,转换为DataFrame
val wordRDD = sc.textFile("input/word.txt")
wordRDD.toDF("word").show()
(2)makeRDDR创建RDD并直接转换为DataFrame
case class user(name:String,age:Int)
sc.makeRDD(List(("zj",20),("zx",21),("xc",23))).map(t => user(t._1,t._2)).toDF.show()
注:在IDEA中开发程序时,如果需要RDD与DF或者DS之间互相操作,那么需要引入import spark.implicits._
import spark.implicits._:必须先创建SparkSession对象再导入,这里的spark是创建的sparkSession对象的变量名称。Scala只支持val修饰的对象的引入,切记这里的spark对象不能使用var声明。
spark-shell自动完成此操作。
DataFrame转换为RDD
DataFrame其实就是对RDD的封装,可以直接获取内部的RDD。
(1)创建RDD并转换为DataFrame,DataFrame转换为RDD
val df = sc.makeRDD(List(("zj",20),("zx",21),("xc",23))).map(t => user(t._1,t._2)).toDF
val rdd = df.rdd
(2)RDD的collect操作
DataSet
创建DataSet
(1)使用样例类序列创建DataSet
case class person(name:String,age:Long)
val caseClassDS = Seq(person("zj",2)).toDS()
caseClassDS.show
(2)使用基本类型的序列创建DataSet
val ds = Seq(1,2,3,4).toDS
ds.show
RDD转换为DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。case类可以包含诸如Seq、Array等复杂的结构。
注:实际中很少把序列转换成DataSet,更多的是通过RDD来得到DataSet。
case class user(name:String,age:Int)
sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDS
DataSet转换为RDD
DataSet也是对RDD的封装,可以直接获取内部的RDD。
#创建RDD并转换为DataSet
case class user(name:String,age:Int)
val ds = sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDS
#DataSet转换为RDD
val rdd = ds.rdd
#RDD的collect操作
rdd.collect
DataSet和DataFrame的转换
DataFrame是DataSet的特例,它们之间可以互相转换。
(1)DataFrame转换为DataSet
val df = sc.makeRDD(List(("zj",21),("zz",22),("xx",24))).map(t => user(t._1,t._2)).toDF("name","age")
val ds = df.as[user]
(2)DataSet转化为DataFrame
val df = ds.toDF
RDD、DataFrame、DataSet之间的关系
相同点
(1)三者都有partition的概念;
(2)三者有许多共同的函数,如map、filter等;
(3)RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
(4)三者都有惰性机制,在进行创建转换时不会立即执行,只有在遇到Action时三者才会开始运算;
(5)在对DataFrame和Dataset进行操作许多操作都需要这个包import spark.implicits._;
(6)三者都会根据Spark的内存情况自动缓存运算,即使数据量很大,也不用担心内存溢出;
(7)DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型。
区别点
(1)RDD不支持sparksql操作;
(2)RDD一般和spark mllib同时使用;
(3)DataFrame和DataSet一般不与spark mllib同时使用;
(4)DataFrame与DataSet支持一些特别方便的保存方式,比如:csv,csv可以带上表头;
(5)DataFrame与RDD和Dataset不同,每一行的类型固定为Row,每一列的值没法直 接访问,只有通过解析才能获取各个字段的值;
(6)DataFrame与DataSet均支持SparkSQL的操作,还能注册临时表/视窗进行sql语句操作;
(7)DataFrame和Dataset拥有完全相同的成员函数,区别只是每一行的数据类型不同,DataFrame就是DataSet的一个特例:type DataFrame = Dataset[Row]
(8)DataFrame每一行的类型是Row,不解析各个字段是什么类型无从得知,只能用模式匹配拿出特定字段;而Dataset中每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。
相互转换
sparkSQL-IDEA编程
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
全部依赖展示
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--该插件用于把Scala代码编译成为class文件-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!--声明绑定到maven的compile阶段-->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
RDD<=>DataSet<=>DataFrame转换编码实现
下面是代码部分
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object sparkSQL_Basic {
def main(args: Array[String]): Unit = {
//TODO 创建sparkSQL运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//TODO 执行逻辑操作
//DataFrame
println("DataFrame")
val df = spark.read.json("datas/user.json")
println("输出源数据")
df.show()
//DataFrame => SQL 要创建视图
df.createOrReplaceTempView("user")
println("输出所有")
spark.sql("select * from user").show()
println("输出age")
spark.sql("select age from user").show()
//DataFrame => DSL 不用创建视图
println("输出所有")
df.select("age","username").show()
println("输出age+1")
df.select($"age" + 1).show()
df.select('age + 1).show()
println(">>>>>>>>>>>>>>>>>>>>>>")
//DataSet
println("DataSet")
val seq = Seq(1,2,3,4)
val ds = seq.toDS()
ds.show()
println(">>>>>>>>>>>>>>>>>>>>>>>>")
//RDD <=> DataFrame
println("RDD <=> DataFrame DF->RDD")
val rdd = spark.sparkContext.makeRDD(List((1,"zj",10),(2,"as",20),(3,"sd",30)))
val frame = rdd.toDF("ID", "NAME", "AGE")
val rowRDD : RDD[Row] = frame.rdd
frame.show()
println(rowRDD)
//RDD <=> DataSet
println("RDD <=> DataSet DS->RDD")
val ds2 = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}.toDS()
val userRDD = ds2.rdd
ds2.show()
println(userRDD)
//DataFrame <=> DataSet
println("DataFrame <=> DataSet DS->DF")
val ds1:Dataset[User] = frame.as[User]
val df1:DataFrame = ds1.toDF()
ds1.show()
df1.show()
//TODO 关闭环境
spark.stop()
}
case class User(id:Int,name:String,age:BigInt)
}
运行结果展示!
本文为学习笔记的记录!!
边栏推荐
- 【知识分享】在音视频开发领域中SEI到底是个啥?
- QoS服务质量七交换机拥塞管理
- MySQL安装步骤
- 【greenDao】Cannot access ‘org.greenrobot.greendao.AbstractDaoSession‘ which is a supertype of
- dumpsys meminfo 详解
- The Biotin-PEG3-Br/acid/NHS ester/alcohol/amine collection that everyone wants to share
- 力扣18-四数之和——双指针法
- Win11连接投影仪没反应怎么解决?
- What is the upstream bandwidth and downstream bandwidth of the server?
- [教你做小游戏] 只用几行原生JS,写一个函数,播放音效、播放BGM、切换BGM
猜你喜欢
Redis 持久化机制
漫谈测试成长之探索——测试文档
云渲染的应用正在扩大,越来越多的行业需要可视化服务
MySQL 查询出重复出现两次以上的数据 - having
whois信息收集&企业备案信息
[Natural Language Processing] [Vector Representation] PairSupCon: Pairwise Supervised Contrastive Learning for Sentence Representation
redis.exceptions.DataError: Invalid input of type: ‘dict‘. Convert to a byte, string or number first
基于TCP的聊天系统
消息队列初见:一起聊聊引入系统mq 之后的问题
网站架构探测&chrome插件用于信息收集
随机推荐
QoS服务质量七交换机拥塞管理
Major upgrade of MSE Governance Center - Traffic Governance, Database Governance, Same AZ Priority
【C#】WCF和TCP消息通信练习,实现群聊功能
7-2 乒乓人训练大师(双指针)
Keras deep learning combat (17) - image segmentation using U-Net architecture
什么是企业知识库?有什么作用?如何搭建?
人生苦短,开始用go
redis.exceptions.DataError: Invalid input of type: ‘dict‘. Convert to a byte, string or number first
FPGA工程师面试试题集锦91~100
弘玑Cyclone与风变科技达成战略合作:优势互补聚焦数字化人才培养
含有PEG 间隔基和一个末端伯胺基团(CAS:1006592-62-6)化学试剂
子域名收集&Google搜索引擎语法
799. 最长连续不重复(双指针)
dumpsys meminfo 详解
3D游戏建模学习路线
【毕业设计】基于Stm32的智能疫情防控门禁系统 - 单片机 嵌入式 物联网
flask生成路由的2种方式和反向生成url
LeetCode·26.删除有序数组中的重复项·双指针
Site Architecture Detection & Chrome Plugin for Information Gathering
常量