Kotlin-Flow


Kotlin Flow 是什么

Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步的数据流,会按顺序发出值并完成,它是Kotlin协程的响应式API,是与响应式编程相结合的产物。

flow的创建

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

除此之外 kotlin 也提供了一系列的流的生成方式

// 注意以下的flow 其实是 unsafeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
//本质上调用的是下面这个方法
/**
 * An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
 * Used in our own operators where we trust the context of invocations.
 */
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

/**
 * Creates a _cold_ flow that produces a single value from the given functional type.
 */
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

/**
 * Creates a _cold_ flow that produces a single value from the given functional type.
 *
 * Example of usage:
 *
 * ```
 * suspend fun remoteCall(): R = ...
 * fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
 * ```
 */
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

/**
 * Creates a _cold_ flow that produces values from the given iterable.
 */
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a _cold_ flow that produces values from the given iterator.
 */
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a _cold_ flow that produces values from the given sequence.
 */
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a flow that produces values from the specified `vararg`-arguments.
 *
 * Example of usage:
 *
 * ```
 * flowOf(1, 2, 3)
 * ```
 */
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

/**
 * Creates a flow that produces the given [value].
 */
public fun <T> flowOf(value: T): Flow<T> = flow {
    /*
     * Implementation note: this is just an "optimized" overload of flowOf(vararg)
     * which significantly reduces the footprint of widespread single-value flows.
     */
    emit(value)
}

/**
 * Returns an empty flow.
 */
public fun <T> emptyFlow(): Flow<T> = EmptyFlow

private object EmptyFlow : Flow<Nothing> {
    override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
}

/**
 * Creates a _cold_ flow that produces values from the given array.
 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
 * to the resulting flow.
 */
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a _cold_ flow that produces values from the array.
 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
 * to the resulting flow.
 */
public fun IntArray.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a _cold_ flow that produces values from the given array.
 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
 * to the resulting flow.
 */
public fun LongArray.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a flow that produces values from the range.
 */
public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

/**
 * Creates a flow that produces values from the range.
 */
public fun LongRange.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

需要注意的是

  • Flow创建的流是冷流 必须要在调用末端操作符后才会执行数据流生产操作。
  • flow代码块的emit函数是线程不安全的,不应该在flow内部使用withContext等函数进行上下文切换后使用emit函数,否则会抛出 IllegalStateException
  • 如果想要修改flow的协程调度应该使用flowOn函数

数据的消费 collect

上述我们说到 冷流必须在调用末端操作符之后才会执行数据流生产操作 那么collect就是末端操作符之一。
下面举一个简单的例子

//repository
object Repository {
    fun fetchMessage() = flow<Result> {
        delay(1000)
        var i = 1
        while (true){
            i += 1
            delay(1000)
            if(i==20){
                i=0
            }
            emit(Result.Success(100/i))
        }
    }.catch { error->
        emit(Result.Error(error))
    }

    sealed interface  Result {
        object Loading : Result
        data class Success(val data:Int) : Result
        data class Error(val err: Throwable) : Result
    }
}

// viewModel
class MyViewModel() : ViewModel() {
    private val _messageFlow = Repository.fetchMessage()
        .onStart {
            emit(Repository.Result.Loading)
        }.flowOn(Dispatchers.IO)

    val messageFlow:Flow<Repository.Result>
        get() = _messageFlow
}

// activity
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        mBinding =  DataBindingUtil.setContentView(this,R.layout.activity_main)
        lifecycleScope.launchWhenStarted {
                mModel.messageFlow.collect{
                    when(it){
                        Repository.Result.Loading->{
                            // 展示进度条
                        }
                        is Repository.Result.Success->{
                            // 将数据展示到页面上
                        }
                        is Repository.Result.Error -> {
                            // 弹出提示框
                        }
                    }
                }
            }

    }

需要注意的是

  • 关于flowOn,只会作用于上游数据流。flow默认继承外层协程作用域的上下文 。例如上面的代码中 flowOn上游的操作是在IO线程执行的而collect之间的操作是在主线程执行的。
  • 上述的lifecycleScope.launchWhenStarted 是会将Flow数据流消费端所在的协程,函数执行限定在Lifecycle.State.STARTED状态之后。当生命周期处于后台的时候会将消费端协程挂起。但是生产端的数据仍然在执行。为了解决这个问题 有了repeatOnLifecycle
  • repeatOnLifecycle的作用 切换CoroutineContext到主线程,在进入允许的生命周期状态时,启动协程,订阅数据流。在超出设定的生命周期状态后,关闭协程,取消订阅。

上述代码可以改成

lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED){
        mModel.messageFlow.collect{
            when(it){
                Repository.Result.Loading->{
                    // 展示进度条
                }
                is Repository.Result.Success->{
                    // 将数据展示到页面上
                }
                is Repository.Result.Error -> {
                    // 弹出提示框
                }
            }
        }
    }
}

当然 也可以在视图层仍然使用livedata 对于已经构建好的flow

flow 转化成 livedata

@JvmOverloads
public fun <T> Flow<T>.asLiveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
    collect {
        emit(it)
    }
}

热流 StateFlow 和 SharedFlow

StateFlow实际上是SharedFlow的子类,同样也拥有只读与可读可写的两种类型,StateFlow与MutableStateFlow。我们可以先看一下StateFlow和Livedata的对比

相同点:

  • 都允许多个消费者
  • 都有只读与可变类型
  • 永远只保存一个状态值
  • 同样支持DataBinding

StateFlow的不同之处:

  • CAS
  • 强制要求初始默认值
  • 默认支持防抖过滤 基本类型比较值是否相同 对象的记得重写equal
  • value的空安全校验
  • 有着Flow丰富的异步数据流操作
  • 默认没有Lifecycle支持,flow的collect是挂起函数,会一直等待数据流传递数据 可以使用repeatOnLifecycle来弥补
  • 线程安全,LiveData的postValue虽然也可在异步使用,但会导致数据丢失。

再来看SharedFlow 和 StateFlow的对比

相同点

  • 与 SateFlow 一样,SharedFlow 也有两个版本:SharedFlow 与 MutableSharedFlow。

不同点

  • MutableSharedFlow 没有起始值
  • SharedFlow 可以保留历史数据 StateFlow只保存最新的状态值
  • MutableSharedFlow 发射值需要调用 emit()/tryEmit() 方法,没有 setValue() 方法
  • 一般情况下 StateFlow用于UI 而Shared用于事件

使用SharedFlow 避免粘性事件

// viewModel
class MyViewModel() : ViewModel() {

    private val _sharedFlow = MutableSharedFlow<String>()

    val sharedFlow:SharedFlow<String>
    get() = _sharedFlow
    
    //模拟登录事件
    fun login(str:String) = viewModelScope.launch {
        _sharedFlow.emit(str)
    }

}


// activity

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        mBinding =  DataBindingUtil.setContentView(this,R.layout.activity_main)
            lifecycleScope.launch {
                repeatOnLifecycle(Lifecycle.State.STARTED){
                    mModel.sharedFlow.collect{
                        it.showToast(this@MainActivity)
                    }
                }
            }
        mBinding.mMessage.setOnClickListener {
            mModel.login("12345")
        }
    }

注意点

  • 如果在这里使用StateFlow 或者 LiveData的话 当点击登录之后 会弹出一次Toast 如果此时屏幕发生翻转 activity重建的话 会取消订阅然后重新订阅该事件 由于StateFlow 和 LiveData 会保存一个最新值 此时订阅之后将会又收到该值导致又弹出一次Toast 然而SharedFlow 默认情况下是不会保存任何值的 所以重新订阅之后无法接收到之前已经发送过的值。也就解决的粘性事件。
  • 所以 我觉得 liveData 和 StateFlow 更适用于状态的保存 而SharedFlow 适用于事件。

文章作者: Lao Wu
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Lao Wu !
评论
  目录