foreachAsync

Invoke a given procedure asynchronously for each element received.

Sink operators

Signature

def foreachAsync[T](parallelism: Int)(f: T => Future[Unit]): Sink[T, Future[Done]]

Description

Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way.

The sink materializes into a Future[Done] CompletionStage<Done> which completes when the stream completes, or fails if the stream fails.

Example

Scala
source//def asyncProcessing(value: Int): Future[Unit] = _

Source(1 to 100).runWith(Sink.foreachAsync(10)(asyncProcessing))
Java
source// final Function<Integer, CompletionStage<Void>> asyncProcessing = _

final Source<Integer, NotUsed> numberSource = Source.range(1, 100);

numberSource.runWith(Sink.foreachAsync(10, asyncProcessing), mat);

cancels when a Future CompletionStage fails

backpressures when the number of Futures CompletionStages reaches the configured parallelism

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.