mapAsyncUnordered

Like mapAsync but Future CompletionStage results are passed downstream as they arrive regardless of the order of the elements that triggered them.

Asynchronous operators

Signature

Source.mapAsyncUnorderedSource.mapAsyncUnordered Flow.mapAsyncUnorderedFlow.mapAsyncUnordered

Description

Like mapAsync but Future CompletionStage results are passed downstream as they arrive regardless of the order of the elements that triggered them.

If a Future CompletionStage completes with null, it is ignored and the next element is processed. If a Future CompletionStage fails, the stream also fails (unless a different supervision strategy is applied)

Examples

Imagine you are consuming messages from a source, and you prioritize throughput over order (this could be uncorrelated messages so order is irrelevant). You may use the mapAsyncUnordered (so messages are emitted as soon as they’ve been processed) with some parallelism (so processing happens concurrently) :

Scala
source
val events: Source[Event, NotUsed] = Consumer.plainSource(settings, subscription).throttle(1, 50.millis) def eventHandler(event: Event): Future[Int] = { println(s"Processing event $event...") } events .mapAsyncUnordered(3) { in => eventHandler(in) } .map { in => println(s"`mapAsyncUnordered` emitted event number: $in") }
Java
source
private final Source<Event, NotUsed> events = Consumer.plainSource(settings, subscription).throttle(1, Duration.ofMillis(50)); public CompletionStage<Integer> eventHandler(Event in) throws InterruptedException { System.out.println("Processing event number " + in + "..."); // ... } events .mapAsyncUnordered(10, this::eventHandler) .map(in -> "`mapSync` emitted event number " + in.intValue()) .runWith(Sink.foreach(str -> System.out.println(str)), system);

When running the stream above the logging output would look like:

[...]
Processing event numner Event(27)...
Completed processing 27
`mapAsyncUnordered` emitted event number: 27
Processing event numner Event(28)...
Completed processing 22
`mapAsyncUnordered` emitted event number: 22
Processing event numner Event(29)...
Completed processing 26
`mapAsyncUnordered` emitted event number: 26
Processing event numner Event(30)...
Completed processing 30
`mapAsyncUnordered` emitted event number: 30
Processing event numner Event(31)...
Completed processing 31
`mapAsyncUnordered` emitted event number: 31
[...]

See mapAsync for a variant with ordering guarantees.

Reactive Streams semantics

emits any of the Future s CompletionStage s returned by the provided function complete

backpressures when the number of Future s CompletionStage s reaches the configured parallelism and the downstream backpressures

completes upstream completes and all Future s CompletionStage s has been completed and all elements has been emitted

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.