IT评测·应用市场-qidao123.com

标题: Kotlin 流 Flow [打印本页]

作者: 花瓣小跑    时间: 2024-9-4 09:22
标题: Kotlin 流 Flow
挂起函数可以异步地返回一个值,而对于返回多个值,可以利用流,利用 emit(x) 发射多个值,
collect { } 来收集值。
默认 流是冷的,只有 收集(collect) 时才会实行。
1. 流的创建

1.1 flow {}

flow {} 里的 发射(emissions)默认是可取消的,对应 SafeFlow,继承自 AbstractFlow:
  1. public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
  2.     public final override suspend fun collect(collector: FlowCollector<T>) {
  3.         val safeCollector = SafeCollector(collector, coroutineContext)
  4.         try {
  5.             collectSafely(safeCollector)
  6.         } finally {
  7.             safeCollector.releaseIntercepted()
  8.         }
  9.     }
  10. }
复制代码
对应 emit() 方法,就是 SafeCollector.emit() 内部 调用了 currentContext.ensureActive() 做 取消检查。
而 其他两种,默认是 不可取消,利用 cancellable() 操作符可取消。
2. 流的操作符

2.1 中间操作符

2.2 结束操作符

3. 流的上下文

默认 流 实行在 和 收集者(Collector) 雷同的 上下文。
更改流的发射的上下文,必须利用 flowOn,而不是在 flow {} 中利用 withContext()。
  1. flow {
  2.     ...
  3. }.flowOn(Dispatchers.Default)
复制代码
4. 缓冲

当 Flow emit 生产者速度 大于 collector 消费者速度时。
  1. simple()
  2.     .collectLatest { value -> // cancel & restart on the latest value
  3.         println("Collecting $value")
  4.         delay(300) // pretend we are processing it for 300 ms
  5.         println("Done $value")
  6.     }
复制代码
说明:会实行全部 Collecting,但是 因为慢处置惩罚,会被取消。
结果:
  1. Collecting 1
  2. Collecting 2
  3. Collecting 3
  4. Done 3
复制代码
https://kotlinlang.org/docs/flow.html#processing-the-latest-value
5. 流的组合

举例:
zip 组合值: 1->one, 2->two 3->three,
而 combine 组合,则可能 1->one 2->one 如许只有一方流 发射值,就会调用盘算。
6. flatten 展平

对于 Flow 又对应 Flow<T> 任务,这时间对于 Flow<Flow<T>> 需要展开为Flow<T>。场景就是 一序列 对应 请求任务。
  1. fun requestFlow(i: Int): Flow<String> = flow {
  2.     emit("$i: First")
  3.     delay(500) // wait 500 ms
  4.     emit("$i: Second")
  5. }
  6. (1..3).asFlow().map { requestFlow(it) }
复制代码
7. 异常

7.1. try/catch 全部捕捉

  1. try {
  2.     simple()
  3.         .collect {
  4.             println("value: $it")
  5.         }
  6. } catch (e: Exception) {
  7.     // 捕获了 flow发射代码块、中间操作符 和 结束操作符 的所有异常
  8. }
复制代码
7.2. catch 操作符

  1. simple()
  2.     .catch { e ->
  3.         // catch 捕获上面的异常,但 不处理 下游 和 结束操作符 的 异常
  4.         println("exception: $e")
  5.     }
  6.     .collect {
  7.         // 如果 这里异常,则不会被捕捉
  8.         println("value: $it")
  9.     }
复制代码
7.3. 声明式捕捉

如果想 捕捉 结束操作符的异常,需要 声明式地捕捉。把 collect 的代码部分 上移到 onEach 中,利用无参的 collect() 收集:
  1. simple()
  2.     .onEach {
  3.         check(it < 2)
  4.         println("value: $it")
  5.     }
  6.     .catch { e ->
  7.         println("exception: $e")
  8.     }
  9.     .collect()
复制代码
8. 完成

  1. flow {
  2. }.onCompletion { cause ->
  3.     // 完成回调,cause 是空 表示 正常完成
  4.     if (cause == null) {
  5.         println("success")
  6.     }
  7. }
复制代码
9. 取消

  1. // 不加 cancellable() 不会 做取消检查,导致完成收集后 才 报异常
  2. // cancellable() 则会 及时取消
  3. // flowOf(1, 2, 3)
  4. listOf(1, 2, 3).asFlow()
  5. //  .cancellable()
  6.     .collect {
  7.         if (it > 1) {
  8.             cancel()
  9.         }
  10.         println("value: $it")
  11.     }
复制代码
cancellable()的实现:

  1. public fun <T> Flow<T>.cancellable(): Flow<T> =
  2.     when (this) {
  3.         is CancellableFlow<*> -> this // Fast-path, already cancellable
  4.         else -> CancellableFlowImpl(this)
  5.     }
  6. private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
  7.     override suspend fun collect(collector: FlowCollector<T>) {
  8.         flow.collect {
  9.             currentCoroutineContext().ensureActive()
  10.             collector.emit(it)
  11.         }
  12.     }
  13. }
复制代码
文档



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4