mergeLatest
Merge multiple sources.
Signature
Flow.mergeLatest
Flow.mergeLatest
Description
MergeLatest joins elements from N input streams into stream of lists of size N. The i-th element in list is the latest emitted element from i-th input stream. MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element If eagerComplete
is set to true then it completes as soon as the first upstream completes otherwise when all upstreams complete.
Example
This example takes a stream of prices and quantities and emits the price each time the price of quantity changes:
- Scala
-
source
val prices = Source(List(100, 101, 99, 103)) val quantity = Source(List(1, 3, 4, 2)) prices .mergeLatest(quantity) .map { case price :: quantity :: Nil => price * quantity } .runForeach(println) // prints something like: // 100 // 101 // 303 // 297 // 396 // 412 // 206
- Java
-
source
Source<Integer, NotUsed> prices = Source.from(Arrays.asList(100, 101, 99, 103)); Source<Integer, NotUsed> quantities = Source.from(Arrays.asList(1, 3, 4, 2)); prices .mergeLatest(quantities, true) .map(priceAndQuantity -> priceAndQuantity.get(0) * priceAndQuantity.get(1)) .runForeach(System.out::println, system); // prints something like: // 100 // 101 // 303 // 297 // 396 // 412 // 206
Reactive Streams semantics
emits when element is available from some input and each input emits at least one element from stream start
completes all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)