conflateWithSeed

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure.

Backpressure aware operators

Signature

def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S]

Description

Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure. When backpressure starts or there is no backpressure element is passed into a seed function to transform it to the summary type.

Example

Scala
import scala.concurrent.duration._

case class Summed(i: Int) {
  def sum(other: Summed) = Summed(this.i + other.i)
}

Source.cycle(() ⇒ List(1, 10, 100, 1000).iterator)
  .throttle(10, per = 1.second) // faster upstream
  .conflateWithSeed(el ⇒ Summed(el))((acc, el) ⇒ acc sum Summed(el)) // (Summed, Int) => Summed
  .throttle(1, per = 1.second) // slow downstream
Java
class Summed {

  private final Integer el;

  public Summed(Integer el) {
    this.el = el;
  }

  public Summed sum(Summed other) {
    return new Summed(this.el + other.el);
  }
}

Source.cycle(() -> Arrays.asList(1, 10, 100).iterator())
    .throttle(10, Duration.ofSeconds(1)) // fast upstream
    .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el)))
    .throttle(1, Duration.ofSeconds(1)); // slow downstream

If downstream is slower, the “seed” function is called which is able to change the type of the to be conflated elements if needed (it can also be an identity function, in which case this conflateWithSeed is equivalent to a plain conflate). Next, the conflating function is applied while there is back-pressure from the downstream, such that the upstream can produce elements at an rate independent of the downstream.

You may want to use this operation for example to apply an average operation on the upstream elements, while the downstream backpressures. This allows us to keep processing upstream elements, and give an average number to the downstream once it is ready to process the next one.

Reactive Streams semantics

emits when downstream stops backpressuring and there is a conflated element available

backpressures when the aggregate or seed functions cannot keep up with incoming elements

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.