冷流热流

冷流

  • 只有订阅者订阅时,才开始执行发射数据流的代码。并且冷流和订阅者只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对冷流而言,有多个订阅者的时候,他们各自的事件是独立的。
  • 冷数据流,不消费则不生产数据,这一点与LiveData不同:LiveData的发送端并不依赖于接收端。

热流

  • 无论有没有订阅者订阅,事件始终都会发生。当 热流有多个订阅者时,热流与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。(都收到)
  • 冷流和订阅者只能是一对一的关系,当我们要实现一个流,多个订阅者的需求时(这在开发中是很常见的),就需要热流了
  • SharedFlow即共享的Flow,可以实现一对多关系,SharedFlow是一种热流。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//ViewModel
val sharedFlow=MutableSharedFlow<String>()

viewModelScope.launch{
sharedFlow.emit("Hello")
sharedFlow.emit("SharedFlow")
}

//Activity
lifecycleScope.launch{
viewMode.sharedFlow.collect {
print(it)
}
}

SharedFlow

哪些场景下才使用SharedFlow
其实SharedFlow相比StateFlow更加灵活,由于它可以处理粘性和背压的问题,一个很常用的场景就是是用于事件的传递。就是类似EventBus那种消息总线。可以很方便很简单的实现一个基于SharedFlow的FlowBus。

另一种情况下就是用于管理页面的事件处理,比如根据值的状态弹出弹窗,吐司。由于SharedFlow可以自由的控制是否需要粘性,如果默认没有粘性的情况下,我们可以在页面销毁重建,或旋转屏幕的时候,保证不会触发到页面状态,如弹窗,吐司的触发。因为没有粘性内部没有值的保存,所以不会触发到页面状态。

这两种场景我已经在SharedFlow的单独介绍文章中有代码与演示,有兴趣的可以看看这里

StateFlow

StateFlow 与 LiveData 是最接近的,因为:

有初始值: StateFlow 初始化时必须传入初始值;
容量为 1: StateFlow 只会保存一个值;
重放为 1: StateFlow 会向新订阅者重放最新的值;
不支持 resetReplayCache() 重置重放缓存:

  1. 它始终是有值的。
  2. 它的值是唯一的。
  3. 它允许被多个观察者共用 (因此是共享的数据流)。
  4. 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。

构造函数

1
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

1.StateFlow构造函数较为简单,只需要传入一个默认值
2.StateFlow本质上是一个replay为 1,并且没有缓冲区的SharedFlow, 因此第一次订阅时会先获得默认值
3.StateFlow仅在值已更新,并且值发生了变化时才会返回,即如果更新后的值没有变化,也没会回调Collect方法,这点与LiveData不同

stateIn将普通流转化成StateFlow

1
2
3
4
5
6
val result: StateFlow<Result<UiState>> = someFlow
.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)

需要传入一个默认值,同时之所以WhileSubscribed中传入了5000,是为了实现等待5秒后仍然没有订阅者存在就终止协程的功能,这个方法有以下功能

  • 防抖
  • 用户将您的应用转至后台运行,5 秒钟后所有来自其他层的数据更新会停止,这样可以节省电量。
  • 最新的数据仍然会被缓存,所以当用户切换回应用时,视图立即就可以得到数据进行渲染。
  • 订阅将被重启,新数据会填充进来,当数据可用时更新视图。
  • 在屏幕旋转时,因为重新订阅的时间在 5s 内,因此上游流不会中止

flowWithLifecycle
我们都知道LiveData的监听是和Activity的生命周期绑定的,比如页面再后台就不会触发,而页面到前台才会触发。
StateFlow可不会这么和生命周期绑定,不过我们可以通过添加一个函数让StateFlow可以和生命周期绑定实现和LiveData一样的效果。

1
2
3
4
5
6
7
8
9
10
11
override fun startObserve() {
lifecycleScope.launch {
mViewModel.searchFlow
.flowWithLifecycle(this@Demo4Activity.lifecycle)
//相当于自动帮我们设置 lifecycle.repeatOnLifecycle
.collect {
YYLogUtils.w("search-state-value $it")
}
}
}

如果我就想当前Actiity不可见的情况下也能触发监听,那要怎么搞,也能做,我们StateFlow默认就不绑定生命周期的,我们把绑定生命周期的代码去掉即可

StateFlow的数据防抖是双刃剑,所以我们需要看使用的场景是否需要此功能。

我们点击登录按钮,调用接口得到错误信息“密码错误”。通过StateFlow发送给Activity。Activity就会弹窗展示错误信息。那么第二次再点击登录按钮,StateFlow就会判断你的值还是“密码错误”,它就不给你发送了。这样就导致用户点击按钮没反应?还以为App挂了呢,所以此时我们就需要使用LiveData。

如果使用DataBinding的情况下,推荐使用LiveData

除了这两种情况,其他情况下使用StateFlow平替LiveData是没什么问题的。

最佳实践

使用带超时参数的 WhileSubscribed 策略暴露 Flow(可以防抖)
望等待 5 秒后没有订阅者则停止数据流,可以使用 whileSubscribed(5000)。

1
2
3
4
5
6
7
8
9
class MyViewModel(...) : ViewModel() {
val result = userId.mapLatest { newUserId ->
repository.observeItem(newUserId)
}.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = Result.Loading
)
}

接收
使用 repeatOnLifecycle 来收集数据更新

1
2
3
4
5
6
7
8

onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
myViewModel.myUiState.collect { ... }
}
}
}

如果你的数据流比较复杂,同时不需要获取myFlow.value,需要配置新用户订阅重播无素的个数,或者需要发送重复的值,可以考虑使用SharedFlow

SharedFlow配置更为灵活,支持配置replay, 缓冲区大小等,StateFlow是SharedFlow的特化版本,replay固定为 1,缓冲区大小默认为 0

replay 表示当新的订阅者Collect时,发送几个已经发送过的数据给它,默认为 0,即默认新订阅者不会获取以前的数据

Channel

Channel 是 Kotlin 中实现跨协程数据传输的数据结构,类似于 Java 中的 BlockQueue 阻塞队列。不同之处在于 BlockQueue 会阻塞线程,而 Channel 是挂起线程

Google 的建议 是优先使用 Flow 而不是 Channel,主要原因是 Flow 会更自动地关闭数据流,而一旦 Channel 没有正常关闭,则容易造成资源泄漏。此外,Flow 相较于 Channel 提供了更明确的约束和操作符,更灵活。


冷流热流
http://peiniwan.github.io/2024/04/c43a248da78d.html
作者
六月的雨
发布于
2024年4月6日
许可协议