mapAsyncUnordered
Like mapAsync
but Future
CompletionStage
results are passed downstream as they arrive regardless of the order of the elements that triggered them.
Signature
Source.mapAsyncUnordered
Source.mapAsyncUnordered
Flow.mapAsyncUnordered
Flow.mapAsyncUnordered
Description
Like mapAsync
but Future
CompletionStage
results are passed downstream as they arrive regardless of the order of the elements that triggered them.
If a Future
CompletionStage
completes with null
, it is ignored and the next element is processed. If a Future
CompletionStage
fails, the stream also fails (unless a different supervision strategy is applied)
Examples
Imagine you are consuming messages from a source, and you prioritize throughput over order (this could be uncorrelated messages so order is irrelevant). You may use the mapAsyncUnordered
(so messages are emitted as soon as they’ve been processed) with some parallelism (so processing happens concurrently) :
- Scala
-
source
val events: Source[Event, NotUsed] = Consumer.plainSource(settings, subscription).throttle(1, 50.millis) def eventHandler(event: Event): Future[Int] = { println(s"Processing event $event...") } events .mapAsyncUnordered(3) { in => eventHandler(in) } .map { in => println(s"`mapAsyncUnordered` emitted event number: $in") } - Java
-
source
private final Source<Event, NotUsed> events = Consumer.plainSource(settings, subscription).throttle(1, Duration.ofMillis(50)); public CompletionStage<Integer> eventHandler(Event in) throws InterruptedException { System.out.println("Processing event number " + in + "..."); // ... } events .mapAsyncUnordered(10, this::eventHandler) .map(in -> "`mapSync` emitted event number " + in.intValue()) .runWith(Sink.foreach(str -> System.out.println(str)), system);
When running the stream above the logging output would look like:
[...]
Processing event numner Event(27)...
Completed processing 27
`mapAsyncUnordered` emitted event number: 27
Processing event numner Event(28)...
Completed processing 22
`mapAsyncUnordered` emitted event number: 22
Processing event numner Event(29)...
Completed processing 26
`mapAsyncUnordered` emitted event number: 26
Processing event numner Event(30)...
Completed processing 30
`mapAsyncUnordered` emitted event number: 30
Processing event numner Event(31)...
Completed processing 31
`mapAsyncUnordered` emitted event number: 31
[...]
See mapAsync for a variant with ordering guarantees.
Reactive Streams semantics
emits any of the Future
s CompletionStage
s returned by the provided function complete
backpressures when the number of Future
s CompletionStage
s reaches the configured parallelism and the downstream backpressures
completes upstream completes and all Future
s CompletionStage
s has been completed and all elements has been emitted