当前位置:网站首页>The conversion between RDD and dataframe in pyspark is realized by RDD processing dataframe: data segmentation and other functions
The conversion between RDD and dataframe in pyspark is realized by RDD processing dataframe: data segmentation and other functions
2022-04-21 14:43:00 【I_ belong_ to_ jesus】
RDD and DataFrame by Spark Two data structures often used in , For the comparison of two data structures , In short ,Dataframe Than RDD Fast , For structured data , Use DataFrame Write more concise code , because DataFrame Itself corresponds to a table structure .
RDD yes Spark User oriented main API. Core level ,RDD Is a distributed collection of data elements , Partition between nodes in the cluster , It provides the bottom layer of parallel transformation and operation API.
Generally speaking , Use... In the following cases RDD It's more convenient :
1. The data set needs to be transformed at the bottom 、 Operation and control ;
2. Structured data , For example, the of various documents stream.
Here is a DataFrame Convert to RDD There is a reversal after the bottom conversion DataFrame Example :
from pyspark.sql import SparkSession
import sys
from pyspark.sql.functions import greatest
from pyspark.sql.types import *
def picewise_func(x):
if x < 0:
return 0
elif x>=0 and x<=5:
return 1
elif x>=6 and x<=10:
return 2
elif x>=11 and x<=15:
return 3
elif x>=16 and x<=20:
return 4
else:
return 5
if __name__ == "__main__":
spark = SparkSession.builder.master('local').enableHiveSupport().getOrCreate()
valuesB = [('A',1),('B',-8),('C',7),('D',13),('E',18),('F',23)]
TableB = spark.createDataFrame(valuesB,['name','id'])
TableB.show()
rdd2 = TableB.rdd
rdd1 = rdd2.map(lambda x: (x[0]+"_rdd",picewise_func(x[1])))
for element in rdd1.collect():
print(element)
schema1 = StructType([StructField('col1', StringType(), True),StructField('col2', IntegerType(), True)])
TableC = spark.createDataFrame(rdd1, schema = schema1)
TableC.show()
You can see DataFrame Members of the rdd Direct access to RDD, After through RDD Of map Corresponding lambda Operation implementation conversion ( The piecewise function is implemented here ), Last pass createDataFrame Generate a new DataFrame.
Output results :
+----+---+
|name| id|
+----+---+
| A| 1|
| B| -8|
| C| 7|
| D| 13|
| E| 18|
| F| 23|
+----+---+
('A', 11)
('B', 2)
('C', 17)
('D', 23)
('E', 28)
('F', 33)
('A_rdd', 1)
('B_rdd', 0)
('C_rdd', 2)
('D_rdd', 3)
('E_rdd', 4)
('F_rdd', 5)
You can see Rdd It is convenient to realize various transformations , Here is another example of finding the sum of any two columns :
from pyspark.sql import SparkSession
import sys
from pyspark.sql.functions import greatest
from pyspark.sql.types import *
def picewise_func(x):
if x < 0:
return 0
elif x>=0 and x<=5:
return 1
elif x>=6 and x<=10:
return 2
elif x>=11 and x<=15:
return 3
elif x>=16 and x<=20:
return 4
else:
return 5
if __name__ == "__main__":
spark = SparkSession.builder.master('local').enableHiveSupport().getOrCreate()
valuesB = [('A',1,1),('B',-8,2),('C',7,3),('D',13,4),('E',18,5),('F',23,6)]
TableB = spark.createDataFrame(valuesB,['name','id'])
print('********TableB********')
TableB.show()
rdd2 = TableB.rdd
rdd1 = rdd2.map(lambda x: (x[0]+"_rdd",picewise_func(x[1])))
print('*********rdd1**********')
for element in rdd1.collect():
print(element)
schema1 = StructType([StructField('col1', StringType(), True),StructField('col2', IntegerType(), True)])
TableC = spark.createDataFrame(rdd1, schema = schema1)
print('*********TableC********')
TableC.show()
rdd3 = rdd2.map(lambda x: (x[0]+"_rdd1",x[1]+x[2]))
print('*********rdd3**********')
for element in rdd3.collect():
print(element)
TableD = spark.createDataFrame(rdd3, schema = schema1)
TableD.show()
Here is another example of more complex batch processing :
from pyspark.sql import SparkSession
import sys
from pyspark.sql.functions import greatest
from pyspark.sql.types import *
def picewise_func(xx):
tmp_list = []
tmp_list.append(xx[0]+"_rdd")
for x in xx[1:]:
if x < 0:
tmp_list.append(0)
elif x>=0 and x<=5:
tmp_list.append(1)
elif x>=6 and x<=10:
tmp_list.append(2)
elif x>=11 and x<=15:
tmp_list.append(3)
elif x>=16 and x<=20:
tmp_list.append(4)
else:
tmp_list.append(5)
return tmp_list
if __name__ == "__main__":
spark = SparkSession.builder.master('local').enableHiveSupport().getOrCreate()
valuesB = [('A',1,2),('B',-8,-7),('C',7,8),('D',13,14),('E',18,19),('F',23,24)]
TableB = spark.createDataFrame(valuesB,['name','id1','id2'])
print('********TableB********')
TableB.show()
rdd2 = TableB.rdd
rdd1 = rdd2.map(lambda x: (picewise_func(x)))
print('*********rdd1**********')
for element in rdd1.collect():
print(element)
schema1 = StructType([StructField('col1', StringType(), True),StructField('col2', IntegerType(), True),StructField('col3', IntegerType(), True)])
TableC = spark.createDataFrame(rdd1, schema = schema1)
print('*********TableC********')
TableC.show()
~
版权声明
本文为[I_ belong_ to_ jesus]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204211437440934.html
边栏推荐
猜你喜欢

如何在excel中插入文件?Excel插入对象和附件有什么区别?(插入对象能直接显示内容,但我没显示?)

Translation of mastering ABP framework

MySQL 8.0.11 installation tutorial (Windows version)

脚本操作ES

翻译《Mastering ABP Framework》

从技术原理、主流平台、市场展望快速入门NFT

OpenSea 是如何成为最受欢迎的 NFT 市场的?

The use of toString and wrapper class

Use go language to complete the student information management system through restful API

如何在代码层面提供CPU分支预测效率
随机推荐
Golang zap log
如何在excel中插入文件?Excel插入对象和附件有什么区别?(插入对象能直接显示内容,但我没显示?)
MySQL 8.0.11 installation tutorial (Windows version)
SQL Server 批处理变量定义和赋值
IK分词器
我们还能依赖Play to Earn经济获利多久?
Insect sequence table
文本处理——sed
Insect stack to queue queue to stack
IK word splitter
lightGBM专题3:PySpark中的StringIndexer和pipeline功能实现
MySQL multi condition query
五一劳动节理财产品没有收益吗?
MySQL download and installation tutorial
尚硅谷智慧校园 —— 6、管理员功能实现
虫子 插入 希尔
Translation of mastering ABP framework
Insect binary tree OJ
String类
Software testing has been in office for 2 months and wants to resign