conflate
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.
Description
Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type conflateWithSeed
can be used:
Example
- Scala
-
source
import scala.concurrent.duration._ Source .cycle(() => List(1, 10, 100, 1000).iterator) .throttle(10, per = 1.second) // faster upstream .conflate((acc, el) => acc + el) // acc: Int, el: Int .throttle(1, per = 1.second) // slow downstream
- Java
-
source
Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflate((Integer acc, Integer el) -> acc + el) .throttle(1, Duration.ofSeconds(1)); // slow downstream
If downstream is slower the elements is conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream.
Reactive Streams semantics
emits when downstream stops backpressuring and there is a conflated element available
backpressures when the aggregate function cannot keep up with incoming elements
completes when upstream completes