flatMapConcat
Transform each input element into a Source
whose elements are then flattened into the output stream through concatenation.
Nesting and flattening operators
Signature
Flow.flatMapConcat
Flow.flatMapConcat
See also: flatMapMerge
Description
Transform each input element into a Source
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.
Example
In the following example flatMapConcat
is used to create a Source
for each incoming customerId. This could be, for example, a calculation or a query to a database. Each customer is then passed to lookupCustomerEvents
which returns a Source
. All the events for a customer are delivered before moving to the next customer.
- Scala
-
source
val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) // e.g. could b a query to a database def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { Source(List(s"$customerId-event-1", s"$customerId-event-2")) } source.flatMapConcat(customerId => lookupCustomerEvents(customerId)).runForeach(println) // prints - events from each customer consecutively // customer-1-event-1 // customer-1-event-2 // customer-2-event-1 // customer-2-event-2
- Java
-
source
// e.g. could be a query to a database private Source<String, NotUsed> lookupCustomerEvents(String customerId) { return Source.from(Arrays.asList(customerId + "-event-1", customerId + "-event-2")); } Source.from(Arrays.asList("customer-1", "customer-2")) .flatMapConcat(this::lookupCustomerEvents) .runForeach(System.out::println, system); // prints - events from each customer consecutively // customer-1-event-1 // customer-1-event-2 // customer-2-event-1 // customer-2-event-2
Reactive Streams semantics
emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete