Flow.completionStageFlow
Streams the elements through the given future flow once it successfully completes.
Signature
Flow.completionStageFlow
Flow.completionStageFlow
Description
Streams the elements through the given flow once the CompletionStage
successfully completes. If the future fails the stream fails.
Examples
A deferred creation of the stream based on the initial element by combining completionStageFlow
with prefixAndTail
like so:
- Scala
-
source
CompletionStage<Flow<Integer, String, NotUsed>> processingFlow(int id) { return CompletableFuture.completedFuture( Flow.of(Integer.class).map(n -> "id: " + id + " value: " + n)); } Source<String, NotUsed> source = Source.range(1, 10) .prefixAndTail(1) .flatMapConcat( (pair) -> { List<Integer> head = pair.first(); Source<Integer, NotUsed> tail = pair.second(); int id = head.get(0); return tail.via(Flow.completionStageFlow(processingFlow(id))); });
Reactive Streams semantics
emits when the internal flow is successfully created and it emits
backpressures when the internal flow is successfully created and it backpressures
completes when upstream completes and all elements have been emitted from the internal flow
completes when upstream completes and all futures have been completed and all elements have been emitted