Invoke a given procedure asynchronously for each element received.
def foreachAsync[T](parallelism: Int)(f: T => Future[Unit]): Sink[T, Future[Done]]
Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way.
The sink materializes into a
CompletionStage<Done> which completes when the stream completes, or fails if the stream fails.
//def asyncProcessing(value: Int): Future[Unit] = _
Source(1 to 100).runWith(Sink.foreachAsync(10)(asyncProcessing))
// final Function<Integer, CompletionStage<Void>> asyncProcessing = _
final Source<Integer, NotUsed> numberSource = Source.range(1, 100);
numberSource.runWith(Sink.foreachAsync(10, asyncProcessing), system);
Reactive Streams semantics
cancels when a
backpressures when the number of
CompletionStages reaches the configured parallelism
Found an error in this documentation? The source code for this page can be found here
Please feel free to edit and contribute a pull request.