马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
挂起函数可以异步地返回一个值,而对于返回多个值,可以利用流,利用 emit(x) 发射多个值,
collect { } 来收集值。
默认 流是冷的,只有 收集(collect) 时才会实行。
1. 流的创建
- flow {} 生成流,emit(x) 来发射值;
- xxx.asFlow() 集合转成Flow;
- flowOf(1, 2, 3) 生成固定值的流。
1.1 flow {}
flow {} 里的 发射(emissions)默认是可取消的,对应 SafeFlow,继承自 AbstractFlow:
- public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
- public final override suspend fun collect(collector: FlowCollector<T>) {
- val safeCollector = SafeCollector(collector, coroutineContext)
- try {
- collectSafely(safeCollector)
- } finally {
- safeCollector.releaseIntercepted()
- }
- }
- }
复制代码 对应 emit() 方法,就是 SafeCollector.emit() 内部 调用了 currentContext.ensureActive() 做 取消检查。
而 其他两种,默认是 不可取消,利用 cancellable() 操作符可取消。
2. 流的操作符
2.1 中间操作符
2.2 结束操作符
- collect {}
- toList() 或 toSet() 收集到集合
- first() 或 single()
- reduce() 或 fold() 合并值
3. 流的上下文
默认 流 实行在 和 收集者(Collector) 雷同的 上下文。
更改流的发射的上下文,必须利用 flowOn,而不是在 flow {} 中利用 withContext()。
- flow {
- ...
- }.flowOn(Dispatchers.Default)
复制代码 4. 缓冲
当 Flow emit 生产者速度 大于 collector 消费者速度时。
- buffer() 并发地实行 发射 和 收集,而不是 顺序实行(发射 收集 再 发射 收集);
- conflate() 丢弃中间值,取最新发射的值;
- collectLatest { } 收集最新的值,但 如果 发射新值,会 取消 慢的收集。
- simple()
- .collectLatest { value -> // cancel & restart on the latest value
- println("Collecting $value")
- delay(300) // pretend we are processing it for 300 ms
- println("Done $value")
- }
复制代码 说明:会实行全部 Collecting,但是 因为慢处置惩罚,会被取消。
结果:
- Collecting 1
- Collecting 2
- Collecting 3
- Done 3
复制代码 https://kotlinlang.org/docs/flow.html#processing-the-latest-value
5. 流的组合
- zip 1对1的 组合
- combine 每次上游更新,就会重新盘算。两个流 生产速度不一样时,就会 差别的对应组合。更新值即组合
举例:
zip 组合值: 1->one, 2->two 3->three,
而 combine 组合,则可能 1->one 2->one 如许只有一方流 发射值,就会调用盘算。
6. flatten 展平
对于 Flow 又对应 Flow<T> 任务,这时间对于 Flow<Flow<T>> 需要展开为Flow<T>。场景就是 一序列 对应 请求任务。
- fun requestFlow(i: Int): Flow<String> = flow {
- emit("$i: First")
- delay(500) // wait 500 ms
- emit("$i: Second")
- }
- (1..3).asFlow().map { requestFlow(it) }
复制代码
- flatMapConcat 按顺序,流完成后,才接着下一个流
- flatMapMerge 支持并发地处置惩罚,流 则 出现并发交错地收集值,concurrency 设置并发数
- flatMapLatest 处置惩罚最新的流,当新的流发射值时,取消之前的流
7. 异常
- try { flow.collect {} } catch (e: Exception) { } 处置惩罚异常,包罗收集器里代码异常;
- flow {}.catch { }.collect { } 处置惩罚 上游异常,但不会处置惩罚 卑鄙 异常;
- flow {}.onEach { }.collect() 处置惩罚上卑鄙异常。
7.1. try/catch 全部捕捉
- try {
- simple()
- .collect {
- println("value: $it")
- }
- } catch (e: Exception) {
- // 捕获了 flow发射代码块、中间操作符 和 结束操作符 的所有异常
- }
复制代码 7.2. catch 操作符
- simple()
- .catch { e ->
- // catch 捕获上面的异常,但 不处理 下游 和 结束操作符 的 异常
- println("exception: $e")
- }
- .collect {
- // 如果 这里异常,则不会被捕捉
- println("value: $it")
- }
复制代码 7.3. 声明式捕捉
如果想 捕捉 结束操作符的异常,需要 声明式地捕捉。把 collect 的代码部分 上移到 onEach 中,利用无参的 collect() 收集:
- simple()
- .onEach {
- check(it < 2)
- println("value: $it")
- }
- .catch { e ->
- println("exception: $e")
- }
- .collect()
复制代码 8. 完成
- try/finnaly 在结束后处置惩罚
- flow {}.onCompletion { cause -> } 处置惩罚
- flow {
- }.onCompletion { cause ->
- // 完成回调,cause 是空 表示 正常完成
- if (cause == null) {
- println("success")
- }
- }
复制代码 9. 取消
- onEach 时检测
- cancel() 在收集时,调用取消
- flow {}.cancellable() 设置flow可取消
- // 不加 cancellable() 不会 做取消检查,导致完成收集后 才 报异常
- // cancellable() 则会 及时取消
- // flowOf(1, 2, 3)
- listOf(1, 2, 3).asFlow()
- // .cancellable()
- .collect {
- if (it > 1) {
- cancel()
- }
- println("value: $it")
- }
复制代码 cancellable()的实现:
- public fun <T> Flow<T>.cancellable(): Flow<T> =
- when (this) {
- is CancellableFlow<*> -> this // Fast-path, already cancellable
- else -> CancellableFlowImpl(this)
- }
- private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
- override suspend fun collect(collector: FlowCollector<T>) {
- flow.collect {
- currentCoroutineContext().ensureActive()
- collector.emit(it)
- }
- }
- }
复制代码 文档
- Flow
- 异步流
- Kotlin Flow:把握根本,征服应用,避开开辟陷阱!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |