mapAsync

Pass incoming elements to a function that return a Future CompletionStage result.

Asynchronous operators

Signature

Source.mapAsyncSource.mapAsync Flow.mapAsyncFlow.mapAsync

Description

Pass incoming elements to a function that return a Future CompletionStage result. When the Future CompletionStage arrives the result is passed downstream. Up to n elements can be processed concurrently, but regardless of their completion time the incoming order will be kept when results complete. For use cases where order does not matter mapAsyncUnordered can be used.

If a Future CompletionStage completes with null, it is ignored and the next element is processed. If a Future CompletionStage fails, the stream also fails (unless a different supervision strategy is applied)

Examples

Imagine you are consuming messages from a broker. These messages represent business events produced on a service upstream. In that case, you want to consume the messages in order and one at a time:

Scala
source
val events: Source[Event, NotUsed] = Consumer.plainSource(settings, subscription).throttle(1, 50.millis) def eventHandler(event: Event): Future[Int] = { println(s"Processing event $event...") } events .mapAsync(1) { in => eventHandler(in) } .map { in => println(s"`mapAsync` emitted event number: $in") }
Java
source
private final Source<Event, NotUsed> events = Consumer.plainSource(settings, subscription).throttle(1, Duration.ofMillis(50)); public CompletionStage<Integer> eventHandler(Event in) throws InterruptedException { System.out.println("Processing event number " + in + "..."); // ... } events .mapAsync(1, this::eventHandler) .map(in -> "`mapSync` emitted event number " + in.intValue()) .runWith(Sink.foreach(str -> System.out.println(str)), system);

When running the stream above the logging output would look like:

[...]
Processing event number Event(33)...
Completed processing 33
`mapAsync` emitted event number: 33
Processing event number Event(34)...
Completed processing 34
`mapAsync` emitted event number: 34
[...]

If, instead, you may process information concurrently, but still emit the messages downstream in order, you may increase the parallelism. In this case, the events could some IoT payload with weather metrics, for example, where processing the data in strict ordering is not critical:

Scala
source
val events: Source[Event, NotUsed] = Consumer.plainSource(settings, subscription).throttle(1, 50.millis) def eventHandler(event: Event): Future[Int] = { println(s"Processing event $event...") } events .mapAsync(3) { in => eventHandler(in) } .map { in => println(s"`mapAsync` emitted event number: $in") }
Java
source
private final Source<Event, NotUsed> events = Consumer.plainSource(settings, subscription).throttle(1, Duration.ofMillis(50)); public CompletionStage<Integer> eventHandler(Event in) throws InterruptedException { System.out.println("Processing event number " + in + "..."); // ... } events .mapAsync(10, this::eventHandler) .map(in -> "`mapSync` emitted event number " + in.intValue()) .runWith(Sink.foreach(str -> System.out.println(str)), system);

In this case, the logging soon shows how processing of the events happens concurrently which may break the ordering. Still, the stage emits the events back in the correct order:

[...]
Processing event number Event(15)...
Processing event number Event(16)...
Completed processing 16
Processing event number Event(17)...
Completed processing 17
Completed processing 15
`mapAsync` emitted event number: 15
`mapAsync` emitted event number: 16
Processing event number Event(18)...
`mapAsync` emitted event number: 17
[...]

See also mapAsyncUnordered.

Reactive Streams semantics

emits when the Future CompletionStage returned by the provided function finishes for the next element in sequence

backpressures when the number of Future s CompletionStage s reaches the configured parallelism and the downstream backpressures

completes when upstream completes and all Future s CompletionStage s has been completed and all elements has been emitted

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.