flatMapConcat

Transform each input element into a Source whose elements are then flattened into the output stream through concatenation.

Nesting and flattening operators

Signature

Flow.flatMapConcatFlow.flatMapConcat

Description

Transform each input element into a Source whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.

See also: flatMapMerge, mapConcat

Example

In the following example flatMapConcat is used to create a Source for each incoming customerId. This could be, for example, a calculation or a query to a database. Each customer is then passed to lookupCustomerEvents which returns a Source. All the events for a customer are delivered before moving to the next customer.

Scala
sourceval source: Source[String, NotUsed] = Source(List("customer-1", "customer-2"))

// e.g. could b a query to a database
def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = {
  Source(List(s"$customerId-event-1", s"$customerId-event-2"))
}

source.flatMapConcat(customerId => lookupCustomerEvents(customerId)).runForeach(println)

// prints - events from each customer consecutively
// customer-1-event-1
// customer-1-event-2
// customer-2-event-1
// customer-2-event-2
Java
source// e.g. could be a query to a database
private Source<String, NotUsed> lookupCustomerEvents(String customerId) {
  return Source.from(Arrays.asList(customerId + "-event-1", customerId + "-event-2"));
}
  Source.from(Arrays.asList("customer-1", "customer-2"))
      .flatMapConcat(this::lookupCustomerEvents)
      .runForeach(System.out::println, system);
  // prints - events from each customer consecutively
  // customer-1-event-1
  // customer-1-event-2
  // customer-2-event-1
  // customer-2-event-2

Reactive Streams semantics

emits when the current consumed substream has an element available

backpressures when downstream backpressures

completes when upstream completes and all consumed substreams complete

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.