当前位置:网站首页>10-Streaming Query
10-Streaming Query
2022-04-22 19:16:00 【wangyanglongcc】
readStream
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"
df = (spark.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.parquet(eventsPath)
)
Build streaming DataFrames
df.isStreaming # Judge DataFrame Whether it is Streaming, Return here True
from pyspark.sql.functions import col, approx_count_distinct, count
emailTrafficDF = (df.filter(col("traffic_source") == "email")
.withColumn("mobile", col("device").isin(["iOS", "Android"]))
.select("user_id", "event_timestamp", "mobile")
)
Write streaming query results
checkpointPath = f"{
userhome}/email_traffic/checkpoint"
outputPath = f"{
userhome}/email_traffic/output"
devicesQuery = (emailTrafficDF.writeStream
.outputMode("append")
.format("parquet")
.queryName("email_traffic_p")
.trigger(processingTime="1 second")
.option("checkpointLocation", checkpointPath)
.start(outputPath)
)

Monitor streaming query
Get ID of streaming query
devicesQuery.id

Get status of streaming query
devicesQuery.status

devicesQuery.awaitTermination(5)

Stop the streaming query
devicesQuery.stop()
版权声明
本文为[wangyanglongcc]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204221909134055.html
边栏推荐
- 7. Data Center - Data Development: Data System Construction
- leetcode:642. Design search automatic completion system
- One piece of data meets all data scenarios? Tencent cloud data Lake solution and DLC kernel technology introduction
- Official component amap of Gaud fluent_ Flutter_ Map draw a circle on the map
- 团队工作原则
- .net core minimal api 上传文件
- Huawei equipment configuration policy routing to the side hanging firewall
- Project training - Design and development of 2D multiplayer fighting game based on unity (v. use audiomixer to control the volume)
- 数据库连接池HikariCP中的FastList快在哪里
- Can fire doors apply for BS 476-21 fire resistance test?
猜你喜欢

数据库索引

项目实训- 基于unity的2D多人乱斗闯关游戏设计与开发(小地图修改完善)

Bluetooth shield / cervical massage instrument / mini charger / probe thermometer, segment code LCD display driver ic-vk1088b qfn32 4 * 4 ultra small volume, 22seg * 4com, and can enter the power savi

final的作用以及String为什么不可变

C # joint programming with Halcon

关于字符串常量池,intern方法的理解

LeetCode 41. Missing first positive number

Error c4996 'fopen': this function or variable may be unsafe Consider using fopen_ s instead. To disabl

10-Streaming Query

短链接设计和思考
随机推荐
postgre创建序列并绑定到表字段
mmocr DBLoss
指针与对象的一些注意事项
The third job analysis
Research on rocksdb on December 15, 2020
一些场景下基于MySQL比较好的实现思路(持续更新)
Detailed network structure diagram of dbnet character detection
7. Data Center - Data Development: Data System Construction
09-Partitioning
Can fire doors apply for BS 476-21 fire resistance test?
Customs clearance script
创建线程的四种方式
7.数据中台 --- 数据开发:数据体系建设
Vs 2022 installing VLD memory leak detection tool
AWSL! This wave of memories killed real love!
小型LED屏/数字闹钟显示屏/LED广告牌/温度数字显示器等LED数码管显示驱动IC-VK1640/1640B SOP28/SSOP24封装
leetcode:642.设计搜索自动补全系统
LeetCode_343 整数拆分
String.join()和StringUtils.join()优雅解决数组或者集合拼接
Bluetooth shield / cervical massage instrument / mini charger / probe thermometer, segment code LCD display driver ic-vk1088b qfn32 4 * 4 ultra small volume, 22seg * 4com, and can enter the power savi