flatMapMerge
Transform each input element into a Source
whose elements are then flattened into the output stream through merging.
Nesting and flattening operators
Signature
Flow.flatMapMerge
Flow.flatMapMerge
Description
Transform each input element into a Source
whose elements are then flattened into the output stream through merging. The maximum number of merged sources has to be specified. When this is met flatMapMerge
does not request any more elements meaning that it back pressures until one of the existing Source
s completes. Order of the elements for each Source
is preserved but there is no deterministic order between elements from different active Source
s.
See also: flatMapConcat, mapConcat
Example
In the following example flatMapMerge
is used to create a Source
for each incoming customerId. This could, for example, be a calculation or a query to a database. There can be breadth
active sources at any given time so events for different customers could interleave in any order but events for the same customer will be in the order emitted by the underlying Source
;
- Scala
-
source
val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) // e.g. could b a query to a database def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { Source(List(s"$customerId-evt-1", s"$customerId-evt2")) } source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println) // prints - events from different customers could interleave // customer-1-evt-1 // customer-2-evt-1 // customer-1-evt-2 // customer-2-evt-2
- Java
-
source
// e.g. could be a query to a database private Source<String, NotUsed> lookupCustomerEvents(String customerId) { return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2")); } Source.from(Arrays.asList("customer-1", "customer-2")) .flatMapMerge(10, this::lookupCustomerEvents) .runForeach(System.out::println, system); // prints - events from different customers could interleave // customer-1-evt-1 // customer-2-evt-1 // customer-1-evt-2 // customer-2-evt-2
Reactive Streams semantics
emits when one of the currently consumed substreams has an element available
backpressures when downstream backpressures or the max number of substreams is reached
completes when upstream completes and all consumed substreams complete