Kotlin Flow 是什么
Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步的数据流,会按顺序发出值并完成,它是Kotlin协程的响应式API,是与响应式编程相结合的产物。
flow的创建
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
除此之外 kotlin 也提供了一系列的流的生成方式
// 注意以下的flow 其实是 unsafeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
//本质上调用的是下面这个方法
/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
/**
* Creates a _cold_ flow that produces a single value from the given functional type.
*/
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
/**
* Creates a _cold_ flow that produces a single value from the given functional type.
*
* Example of usage:
*
* ```
* suspend fun remoteCall(): R = ...
* fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
* ```
*/
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
/**
* Creates a _cold_ flow that produces values from the given iterable.
*/
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given iterator.
*/
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given sequence.
*/
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a flow that produces values from the specified `vararg`-arguments.
*
* Example of usage:
*
* ```
* flowOf(1, 2, 3)
* ```
*/
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
/**
* Creates a flow that produces the given [value].
*/
public fun <T> flowOf(value: T): Flow<T> = flow {
/*
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
* which significantly reduces the footprint of widespread single-value flows.
*/
emit(value)
}
/**
* Returns an empty flow.
*/
public fun <T> emptyFlow(): Flow<T> = EmptyFlow
private object EmptyFlow : Flow<Nothing> {
override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
}
/**
* Creates a _cold_ flow that produces values from the given array.
* The flow being _cold_ means that the array components are read every time a terminal operator is applied
* to the resulting flow.
*/
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the array.
* The flow being _cold_ means that the array components are read every time a terminal operator is applied
* to the resulting flow.
*/
public fun IntArray.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given array.
* The flow being _cold_ means that the array components are read every time a terminal operator is applied
* to the resulting flow.
*/
public fun LongArray.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a flow that produces values from the range.
*/
public fun IntRange.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a flow that produces values from the range.
*/
public fun LongRange.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
}
}
需要注意的是
- Flow创建的流是冷流 必须要在调用末端操作符后才会执行数据流生产操作。
- flow代码块的emit函数是线程不安全的,不应该在flow内部使用withContext等函数进行上下文切换后使用emit函数,否则会抛出 IllegalStateException
- 如果想要修改flow的协程调度应该使用flowOn函数
数据的消费 collect
上述我们说到 冷流必须在调用末端操作符之后才会执行数据流生产操作 那么collect就是末端操作符之一。
下面举一个简单的例子
//repository
object Repository {
fun fetchMessage() = flow<Result> {
delay(1000)
var i = 1
while (true){
i += 1
delay(1000)
if(i==20){
i=0
}
emit(Result.Success(100/i))
}
}.catch { error->
emit(Result.Error(error))
}
sealed interface Result {
object Loading : Result
data class Success(val data:Int) : Result
data class Error(val err: Throwable) : Result
}
}
// viewModel
class MyViewModel() : ViewModel() {
private val _messageFlow = Repository.fetchMessage()
.onStart {
emit(Repository.Result.Loading)
}.flowOn(Dispatchers.IO)
val messageFlow:Flow<Repository.Result>
get() = _messageFlow
}
// activity
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
mBinding = DataBindingUtil.setContentView(this,R.layout.activity_main)
lifecycleScope.launchWhenStarted {
mModel.messageFlow.collect{
when(it){
Repository.Result.Loading->{
// 展示进度条
}
is Repository.Result.Success->{
// 将数据展示到页面上
}
is Repository.Result.Error -> {
// 弹出提示框
}
}
}
}
}
需要注意的是
- 关于flowOn,只会作用于上游数据流。flow默认继承外层协程作用域的上下文 。例如上面的代码中 flowOn上游的操作是在IO线程执行的而collect之间的操作是在主线程执行的。
- 上述的lifecycleScope.launchWhenStarted 是会将Flow数据流消费端所在的协程,函数执行限定在Lifecycle.State.STARTED状态之后。当生命周期处于后台的时候会将消费端协程挂起。但是生产端的数据仍然在执行。为了解决这个问题 有了repeatOnLifecycle
- repeatOnLifecycle的作用 切换CoroutineContext到主线程,在进入允许的生命周期状态时,启动协程,订阅数据流。在超出设定的生命周期状态后,关闭协程,取消订阅。
上述代码可以改成
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED){
mModel.messageFlow.collect{
when(it){
Repository.Result.Loading->{
// 展示进度条
}
is Repository.Result.Success->{
// 将数据展示到页面上
}
is Repository.Result.Error -> {
// 弹出提示框
}
}
}
}
}
当然 也可以在视图层仍然使用livedata 对于已经构建好的flow
flow 转化成 livedata
@JvmOverloads
public fun <T> Flow<T>.asLiveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
collect {
emit(it)
}
}
热流 StateFlow 和 SharedFlow
StateFlow实际上是SharedFlow的子类,同样也拥有只读与可读可写的两种类型,StateFlow与MutableStateFlow。我们可以先看一下StateFlow和Livedata的对比
相同点:
- 都允许多个消费者
- 都有只读与可变类型
- 永远只保存一个状态值
- 同样支持DataBinding
StateFlow的不同之处:
- CAS
- 强制要求初始默认值
- 默认支持防抖过滤 基本类型比较值是否相同 对象的记得重写equal
- value的空安全校验
- 有着Flow丰富的异步数据流操作
- 默认没有Lifecycle支持,flow的collect是挂起函数,会一直等待数据流传递数据 可以使用repeatOnLifecycle来弥补
- 线程安全,LiveData的postValue虽然也可在异步使用,但会导致数据丢失。
再来看SharedFlow 和 StateFlow的对比
相同点
- 与 SateFlow 一样,SharedFlow 也有两个版本:SharedFlow 与 MutableSharedFlow。
不同点
- MutableSharedFlow 没有起始值
- SharedFlow 可以保留历史数据 StateFlow只保存最新的状态值
- MutableSharedFlow 发射值需要调用 emit()/tryEmit() 方法,没有 setValue() 方法
- 一般情况下 StateFlow用于UI 而Shared用于事件
使用SharedFlow 避免粘性事件
// viewModel
class MyViewModel() : ViewModel() {
private val _sharedFlow = MutableSharedFlow<String>()
val sharedFlow:SharedFlow<String>
get() = _sharedFlow
//模拟登录事件
fun login(str:String) = viewModelScope.launch {
_sharedFlow.emit(str)
}
}
// activity
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
mBinding = DataBindingUtil.setContentView(this,R.layout.activity_main)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED){
mModel.sharedFlow.collect{
it.showToast(this@MainActivity)
}
}
}
mBinding.mMessage.setOnClickListener {
mModel.login("12345")
}
}
注意点
- 如果在这里使用StateFlow 或者 LiveData的话 当点击登录之后 会弹出一次Toast 如果此时屏幕发生翻转 activity重建的话 会取消订阅然后重新订阅该事件 由于StateFlow 和 LiveData 会保存一个最新值 此时订阅之后将会又收到该值导致又弹出一次Toast 然而SharedFlow 默认情况下是不会保存任何值的 所以重新订阅之后无法接收到之前已经发送过的值。也就解决的粘性事件。
- 所以 我觉得 liveData 和 StateFlow 更适用于状态的保存 而SharedFlow 适用于事件。