mergeLatest

Merge multiple sources.

Fan-in operators

Signature

Flow.mergeLatestFlow.mergeLatest

Description

MergeLatest joins elements from N input streams into stream of lists of size N. The i-th element in list is the latest emitted element from i-th input stream. MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element If eagerComplete is set to true then it completes as soon as the first upstream completes otherwise when all upstreams complete.

Example

This example takes a stream of prices and quantities and emits the price each time the price of quantity changes:

Scala
sourceval prices = Source(List(100, 101, 99, 103))
val quantity = Source(List(1, 3, 4, 2))

prices
  .mergeLatest(quantity)
  .map {
    case price :: quantity :: Nil => price * quantity
  }
  .runForeach(println)

// prints something like:
// 100
// 101
// 303
// 297
// 396
// 412
// 206
Java
sourceSource<Integer, NotUsed> prices = Source.from(Arrays.asList(100, 101, 99, 103));
Source<Integer, NotUsed> quantities = Source.from(Arrays.asList(1, 3, 4, 2));

prices
    .mergeLatest(quantities, true)
    .map(priceAndQuantity -> priceAndQuantity.get(0) * priceAndQuantity.get(1))
    .runForeach(System.out::println, system);

// prints something like:
// 100
// 101
// 303
// 297
// 396
// 412
// 206

Reactive Streams semantics

emits when element is available from some input and each input emits at least one element from stream start

completes all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.