conflate

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.

Backpressure aware operators

Signature

def conflate[O2 >: Out](aggregate: (O2, O2) => O2): Repr[O2]

Description

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type conflateWithSeed can be used:

Example

Scala
sourceimport scala.concurrent.duration._

Source
  .cycle(() => List(1, 10, 100, 1000).iterator)
  .throttle(10, per = 1.second) // faster upstream
  .conflate((acc, el) => acc + el) // acc: Int, el: Int
  .throttle(1, per = 1.second) // slow downstream
Java
sourceSource.cycle(() -> Arrays.asList(1, 10, 100).iterator())
    .throttle(10, Duration.ofSeconds(1)) // fast upstream
    .conflate((Integer acc, Integer el) -> acc + el)
    .throttle(1, Duration.ofSeconds(1)); // slow downstream

If downstream is slower the elements is conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream.

Reactive Streams semantics

emits when downstream stops backpressuring and there is a conflated element available

backpressures when the aggregate function cannot keep up with incoming elements

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.