当前位置:网站首页>【Koltin Flow(四)】Flow背压
【Koltin Flow(四)】Flow背压
2022-08-07 00:26:00 【MakerGaoGao】
目录
【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow
前言
- 本篇主要介绍背压相关的内容,如背压的产生、处理方式等。
- 本篇将介绍背压相关的操作符,如buffer等。
背压的产生
- 原因:通俗来说其实就是因为产生的速度和处理的速度或者说耗时不一致才导致了背压的产生。
- 处理:主要分三种:挂起、丢弃新的、丢弃原来的,我们也可以辅助设置缓冲池,即暂时把值存下来。
- 理解:通俗理解为水流管粗细的问题,如果上游的水管粗,下游的水管细就会产生堵住的问题,当然也有可能就是撑破了,水流出来了。
- 处理起来就是堵住,上游不要流了;
- 当然也可以在中间建设一个蓄水池,先把水放在蓄水池,下游可以继续了再放下去。
- 蓄水池当然也有两种方式,上游来水了,蓄水池有水,要不把蓄水池水放掉、注入新水,或者直接放掉新水。
编码处理
1.不处理
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 16:44:10.338 9463-9489/edu.test.demo D/Test-TAG: it is 0,cost time 1118
2022-08-02 16:44:11.442 9463-9489/edu.test.demo D/Test-TAG: it is 1,cost time 2222
2022-08-02 16:44:12.543 9463-9489/edu.test.demo D/Test-TAG: it is 2,cost time 3323
2022-08-02 16:44:13.645 9463-9489/edu.test.demo D/Test-TAG: it is 3,cost time 4425
2022-08-02 16:44:14.749 9463-9489/edu.test.demo D/Test-TAG: it is 4,cost time 5529
2022-08-02 16:44:15.852 9463-9489/edu.test.demo D/Test-TAG: it is 5,cost time 6632
2022-08-02 16:44:16.955 9463-9489/edu.test.demo D/Test-TAG: it is 6,cost time 7735
2022-08-02 16:44:18.059 9463-9489/edu.test.demo D/Test-TAG: it is 7,cost time 8839
2022-08-02 16:44:19.162 9463-9489/edu.test.demo D/Test-TAG: it is 8,cost time 9942
2022-08-02 16:44:20.264 9463-9489/edu.test.demo D/Test-TAG: it is 9,cost time 11044
2022-08-02 16:44:20.264 9463-9489/edu.test.demo D/Test-TAG: finish cost time 11044
分析:
- 可以看出耗时是以最后一次结束的时间计算的,也就是挂起,通俗理解就是下游堵住了,上游等着,所以耗时是下面的耗时综合。
2. 直接调用buffer(),不设特定参数
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.buffer().collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 16:48:29.114 9531-9558/edu.test.demo D/Test-TAG: finish cost time 1024
2022-08-02 16:48:29.205 9531-9558/edu.test.demo D/Test-TAG: it is 0,cost time 1115
2022-08-02 16:48:30.207 9531-9558/edu.test.demo D/Test-TAG: it is 1,cost time 2117
2022-08-02 16:48:31.209 9531-9558/edu.test.demo D/Test-TAG: it is 2,cost time 3119
2022-08-02 16:48:32.210 9531-9558/edu.test.demo D/Test-TAG: it is 3,cost time 4120
2022-08-02 16:48:33.212 9531-9558/edu.test.demo D/Test-TAG: it is 4,cost time 5122
2022-08-02 16:48:34.213 9531-9558/edu.test.demo D/Test-TAG: it is 5,cost time 6123
2022-08-02 16:48:35.215 9531-9558/edu.test.demo D/Test-TAG: it is 6,cost time 7125
2022-08-02 16:48:36.216 9531-9558/edu.test.demo D/Test-TAG: it is 7,cost time 8126
2022-08-02 16:48:37.217 9531-9558/edu.test.demo D/Test-TAG: it is 8,cost time 9127
2022-08-02 16:48:38.219 9531-9558/edu.test.demo D/Test-TAG: it is 9,cost time 10129
分析:
- 可以看出和不处理非常类似,下游耗时基本是一致的。
- 但是finish完成的耗时只有1024,也就是全部缓存下来了,通俗理解就是水全部放在了蓄水池,一点一点往下放。
3. buffer参数设置,设置buffer为5,也就是缓存5个值,三种策略分别为:
- BufferOverflow.SUSPEND(默认)
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.buffer(5,BufferOverflow.SUSPEND).collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 16:52:30.596 9668-9695/edu.test.demo D/Test-TAG: it is 0,cost time 1120
2022-08-02 16:52:31.597 9668-9695/edu.test.demo D/Test-TAG: it is 1,cost time 2121
2022-08-02 16:52:32.599 9668-9695/edu.test.demo D/Test-TAG: it is 2,cost time 3123
2022-08-02 16:52:33.601 9668-9694/edu.test.demo D/Test-TAG: it is 3,cost time 4125
2022-08-02 16:52:33.601 9668-9694/edu.test.demo D/Test-TAG: finish cost time 4125
2022-08-02 16:52:34.602 9668-9694/edu.test.demo D/Test-TAG: it is 4,cost time 5126
2022-08-02 16:52:35.605 9668-9694/edu.test.demo D/Test-TAG: it is 5,cost time 6129
2022-08-02 16:52:36.607 9668-9694/edu.test.demo D/Test-TAG: it is 6,cost time 7131
2022-08-02 16:52:37.609 9668-9694/edu.test.demo D/Test-TAG: it is 7,cost time 8133
2022-08-02 16:52:38.610 9668-9694/edu.test.demo D/Test-TAG: it is 8,cost time 9134
2022-08-02 16:52:39.612 9668-9694/edu.test.demo D/Test-TAG: it is 9,cost time 10136
分析:
- 可以看出在第四个值打印出来的时候完成了,原因在于发送第0个,下面在处理,下面5个都放在了缓冲池,剩下四个在挂起等待
- 等待后面处理,处理一个,缓冲池往下放一个,上游往缓冲池放一个。
- 下游四个处理完了的时候,在处理第五个,最后一个就放到了缓冲池,上游就结束了。
- BufferOverflow.DROP_OLDEST
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.buffer(5,BufferOverflow.DROP_OLDEST).collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 17:00:03.800 9739-9767/edu.test.demo D/Test-TAG: finish cost time 1028
2022-08-02 17:00:03.892 9739-9767/edu.test.demo D/Test-TAG: it is 0,cost time 1120
2022-08-02 17:00:04.894 9739-9767/edu.test.demo D/Test-TAG: it is 5,cost time 2122
2022-08-02 17:00:05.896 9739-9767/edu.test.demo D/Test-TAG: it is 6,cost time 3124
2022-08-02 17:00:06.896 9739-9767/edu.test.demo D/Test-TAG: it is 7,cost time 4124
2022-08-02 17:00:07.899 9739-9767/edu.test.demo D/Test-TAG: it is 8,cost time 5127
2022-08-02 17:00:08.900 9739-9767/edu.test.demo D/Test-TAG: it is 9,cost time 6128
分析:
- 可以看出,finish最先结束了,耗时只是上游的时间,每次100ms,大概就是1000ms。
- 上游发送一个,下游在处理第一个时候,上游继续往缓冲池中放。
- 和BufferOverflow.SUSPEND的区别在于,放入了5个(1,2,3,4,5),放第6个也就是5的时候,不再挂起,而是将1丢弃了,缓冲池变成了(2,3,4,5,6),后续类似,最后就是1,2,3,4全被丢弃了,所以打印结果就是0,5,6,7,8,9。
- BufferOverflow.DROP_LATEST
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.buffer(5,BufferOverflow.DROP_LATEST).collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 17:05:41.508 9822-9847/edu.test.demo D/Test-TAG: finish cost time 1031
2022-08-02 17:05:41.596 9822-9847/edu.test.demo D/Test-TAG: it is 0,cost time 1119
2022-08-02 17:05:42.597 9822-9847/edu.test.demo D/Test-TAG: it is 1,cost time 2120
2022-08-02 17:05:43.599 9822-9847/edu.test.demo D/Test-TAG: it is 2,cost time 3122
2022-08-02 17:05:44.600 9822-9847/edu.test.demo D/Test-TAG: it is 3,cost time 4123
2022-08-02 17:05:45.605 9822-9847/edu.test.demo D/Test-TAG: it is 4,cost time 5128
2022-08-02 17:05:46.606 9822-9847/edu.test.demo D/Test-TAG: it is 5,cost time 6129
分析:
- 可以看出,finish也最先结束了,耗时只是上游的时间,每次100ms,大概就是1000ms。
- 上游发送一个,下游在处理第一个时候,上游继续往缓冲池中放。
- 和BufferOverflow.DROP_OLDEST的区别在于,放入了5个(1,2,3,4,5),放第6个也就是5的时候,不再挂起,而是将5丢弃了,缓冲池不变,所以5,6,7,8,9全被丢弃了,最后打印出来就是0,1,2,3,4,5。
4. conflate conflate是buffer的简化使用方式,其实相当于buffer设置参数为0和BufferOverflow.DROP_OLDEST。
代码如下:
var time :Long = 0
flow {
repeat(10){
delay(100)
emit(it)
}
}.onStart {
time = System.currentTimeMillis()
}.onCompletion {
Log.d(TAG.TAG, "finish cost time ${
System.currentTimeMillis() - time}")
}.conflate().collect {
delay(1000)
Log.d(TAG.TAG, "it is $it,cost time ${
System.currentTimeMillis() - time}")
}
日志如下:
2022-08-02 17:09:06.504 9886-9913/edu.test.demo D/Test-TAG: finish cost time 1024
2022-08-02 17:09:06.593 9886-9913/edu.test.demo D/Test-TAG: it is 0,cost time 1113
2022-08-02 17:09:07.596 9886-9913/edu.test.demo D/Test-TAG: it is 9,cost time 2116
分析:
- 可以看出,finish也最先结束了,耗时只是上游的时间,每次100ms,大概就是1000ms。
- 机制和DROP_OLDEST一致,可以看上面的分析,因为缓冲池为0,不缓冲任何值,最终1-8全部丢弃了,打印出了0和9.
总结
- 本篇主要介绍了flow背压产生的原因、通俗类似于水流的理解方式。
- 本篇介绍了flow的处理优化方式,主要操作符围绕buffer展开。
- 本篇为自己学习及使用过程中的总结,难免存在错误或思维局限,欢迎大家讨论指正。
边栏推荐
猜你喜欢
随机推荐
Playwright安装加一个简单的实例
Web version MC server construction + localization
【使用JDBC对数据库表进行操作】
倒计时3天|2022 OceanBase 年度发布会亮点抢先看!
又来到熟悉的地方
在中国银河证券开户安全吗
leetcode 20. 有效的括号
jvm summary
申请google drive api并使用rclone挂载团队盘为本地磁盘
禁用防火墙后,aria2的6800端口还是不通
2022.8.4 模拟赛
2022 Hazardous Chemicals Business Unit Safety Management Personnel Exam Questions Mock Exam Question Bank and Mock Exam
常用邮件服务器支持端口及加密方法实测
今日睡眠质量记录70分
用同花顺开户安全么
04 多线程与高并发 - ReentrantReadWriteLock 源码解析
Deploy an LVS-DR cluster
【使用JDBC获取数据库相关的元数据信息】
NAT穿越技术详细介绍
位运算应用:保存多状态标识应用









