flatMapMerge

Transform each input element into a Source whose elements are then flattened into the output stream through merging.

Nesting and flattening operators

Signature

Flow.flatMapMergeFlow.flatMapMerge

Description

Transform each input element into a Source whose elements are then flattened into the output stream through merging. The maximum number of merged sources has to be specified. When this is met flatMapMerge does not request any more elements meaning that it back pressures until one of the existing Sources completes. Order of the elements for each Source is preserved but there is no deterministic order between elements from different active Sources.

See also: flatMapConcat, mapConcat

Example

In the following example flatMapMerge is used to create a Source for each incoming customerId. This could, for example, be a calculation or a query to a database. There can be breadth active sources at any given time so events for different customers could interleave in any order but events for the same customer will be in the order emitted by the underlying Source;

Scala
sourceval source: Source[String, NotUsed] = Source(List("customer-1", "customer-2"))

// e.g. could b a query to a database
def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = {
  Source(List(s"$customerId-evt-1", s"$customerId-evt2"))
}

source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println)

// prints - events from different customers could interleave
// customer-1-evt-1
// customer-2-evt-1
// customer-1-evt-2
// customer-2-evt-2
Java
source// e.g. could be a query to a database
private Source<String, NotUsed> lookupCustomerEvents(String customerId) {
  return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2"));
}
  Source.from(Arrays.asList("customer-1", "customer-2"))
      .flatMapMerge(10, this::lookupCustomerEvents)
      .runForeach(System.out::println, system);
  // prints - events from different customers could interleave
  // customer-1-evt-1
  // customer-2-evt-1
  // customer-1-evt-2
  // customer-2-evt-2

Reactive Streams semantics

emits when one of the currently consumed substreams has an element available

backpressures when downstream backpressures or the max number of substreams is reached

completes when upstream completes and all consumed substreams complete

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.