当前位置:网站首页>01-Read&Write
01-Read&Write
2022-04-22 04:09:00 【wangyanglongcc】
Reader
Read from CSV files
spark.read.csv It can also read csv file , And more often .
Read from CSV with DataFrameReader’s csv method and the following options:
Tab separator, use first line as header, infer schema
file_csv = "/mnt/training/ecommerce/users/users-500k.csv"
df = (spark.read
.option("sep", "\t") # The specified delimiter is \t
.option("header", True) # The first line is the header
.option("inferSchema", True) # Judge for yourself schema/ Field type
.csv(file_csv ) # File path
)
df.printSchema() # Look at the data schema

Sometimes the program judges itself schema Maybe not , In this case, we can also specify . Appoint schema Also have 2 Ways of planting , Namely StructType and DDLSchema
StructType
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructField, StructType, TimestampType
userDefinedSchema = StructType([
StructField("user_id", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("email", StringType(), True)
])
df = (spark.read
.option("sep", "\t") # The specified delimiter is \t
.option("header", True) # The first line is the header
.schema(userDefinedSchema) # Use... As defined above schema
.csv(file_csv ) # File path
)
DDLSchema
DDLSchema = "user_id string, user_first_touch_timestamp long, email string"
df = (spark.read
.option("sep", "\t")
.option("header", True)
.schema(DDLSchema)
.csv(file_csv)
)
schema Information can be obtained in two ways , One is to pass. df.schema Direct access to , Generated by this method schema yes StructField Type of . Another way is to generate DDLSchema type , This method requires the use of scala obtain ,python There is no corresponding method .
spark.read.parquet("/mnt/training/ecommerce/events/events.parquet").schema.toDDL

Read from JSON files
spark.read.json It can also read json file , And more often .
file_json = "/mnt/training/ecommerce/events/events-500k.json"
df = (spark.read
.option("inferSchema", True)
.json(file_json)
)
df.printSchema()

Here you can see json Documents are sometimes hierarchical , So if you specify schema When , Also set the corresponding hierarchical relationship . The above case, ,
-
ecommerce
purchaseRevenus
total_item_quantity
unique_items
-
geo
city
stats
-
items It's a array
coupon
item_id
item_name
item_revenue_in_usd
price_in_usd
quantity
userDefinedSchema = StructType([
StructField("device", StringType(), True),
StructField("ecommerce", StructType([
StructField("purchaseRevenue", DoubleType(), True),
StructField("total_item_quantity", LongType(), True),
StructField("unique_items", LongType(), True)
])
, True),
StructField("event_name", StringType(), True),
StructField("event_previous_timestamp", LongType(), True),
StructField("event_timestamp", LongType(), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("state", StringType(), True)
])
, True),
StructField("items", ArrayType(
StructType([
StructField("coupon", StringType(), True),
StructField("item_id", StringType(), True),
StructField("item_name", StringType(), True),
StructField("item_revenue_in_usd", DoubleType(), True),
StructField("price_in_usd", DoubleType(), True),
StructField("quantity", LongType(), True)
])
)
, True),
StructField("traffic_source", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("user_id", StringType(), True)
])
You can also use it DDLSchema Make a designation .
Writer
Write DataFrame to file
Usually we will use the data as parquet Write... In the format , Of course csv Format is also supported .
With parquet Write... In the format
outputfile_path = f'/mnt/dbwarehouse/files/{
filename}.parquet'
df\
.write\
.option("comparession","snappy")\ # Specify the compression method
.mode("overwrite")\ # How to write
.parquet(outputfile_path) # Specify the path
In fact, there is a shorter way to write
df.write.parquet(outputfile_path,mode='overwrite')
With csv Write... In the format
outputfile_path = f'/mnt/dbwarehouse/files/{
filename}.csv'
df\
.write\
.mode("overwrite")\ # How to write
.csv(outputfile_path) # Specify the path
In fact, there is a shorter way to write
df.write.csv(outputfile_path,mode='overwrite')
Write DataFrame to table
Use saveAsTable take DataFrame Save as table
tb_name = 'users'
df.write.mode("overwrite").saveAsTable(tb_name)
Write DataFrame to delta table
By designation format by delta and save Method , take DataFrame Save as delta table.
delta_tb_path = '/mnt/dbwarehouse/delat/users'
df.write.format('delat').mode('overwrite').save(delta_tb_path)
This document is about the most basic way of reading and writing , In the actual work scenario , When data is written , We usually build the table first , Specify the data type when creating a table , Storage path ( The path is usually specified when building the database ), Such as parquet,textfile,delta, take DataFrame Register as a temporary view createOrReplaceTempView after , Then write... Into the table . Such as
create database if not exists demo location "/mnt/dbwarehouse/demo";
create table if not exists demo.first_table(
id int,
value string
) using delta
df.createOrReplaceTempView('df')
spark.sql('insert overwrite demo.first_table select * from df')
版权声明
本文为[wangyanglongcc]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204220408303817.html
边栏推荐
- 小程序 关于分包
- Sr-te policy (Cisco) -- supplement
- Length of circular queue "in datastructure"
- 高斯分布——在误差测量中的推导
- Leetcode1615. Maximum network rank (medium)
- . net debugging: use visual studio to debug dump files
- 01 knapsack problem (two-dimensional array solution and one bit array optimization)
- OpenSCA版本升级 | OpenSCA v1.0.4版本发布
- [machine learning] long and short term memory network (LSTM)
- MySQL Download
猜你喜欢

01背包问题(二维数组解法以及一位数组优化)

Rsync remote synchronization

Autodesk Genuine Service2020删除

调用函数时,关于传参那些事~

English | Day11, 12 x sentence true research daily sentence (meaning group)

虚拟dom

24 pictures to conquer border image

Data cleaning chapter05 | data grouping and data imbalance

英语 | Day11、12 x 句句真研每日一句(意思群)

【机器学习】长短时记忆网络(LSTM)
随机推荐
浏览器 概述本地缓存 cookie 等
安装班和免安装版
Sumo tutorial - Manhattan
【机器学习】长短时记忆网络(LSTM)
Optimisation des performances des pages Web
Common tool NC Wireshark rebound shell
Principle of average bilateral locking strategy
Rsync remote synchronization
The core of improving data utilization efficiency in the transportation industry is to do a good job in data exchange and sharing
Sumo course - public transport course
[perihelion force deduction] (bit operation set) addition without addition, subtraction, multiplication and division + number appearing only once + number appearing only once II + number appearing onl
There is no input method after win11 system starts up - the solution is effective through personal test
交通行業提昇數據利用效率的核心是做好數據交換與共享
How do programmers ensure that software is free of bugs?
Ncurses installation package and PKG config information
Mongodb - $match operation of aggregation pipeline
光标——迭代器模式
rsync远程同步
See how the project manager brings a project to ruin
English | Day11, 12 x sentence true research daily sentence (meaning group)