This documentation regards version 2.10.4+3-f7095f01-SNAPSHOT, however the current version is 2.10.4.
Broadcast
Emit each incoming element each of n
outputs.
Signature
Description
Emit each incoming element each of n
outputs.
Example
Here is an example that is using Broadcast
to aggregate different values from a Source
of integers.
- Scala
- Java
-
source
import akka.NotUsed; import akka.japi.tuple.Tuple3; import akka.stream.ClosedShape; import akka.stream.UniformFanOutShape; import akka.stream.javadsl.Broadcast; import akka.stream.javadsl.Flow; import akka.stream.javadsl.GraphDSL; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.util.concurrent.CompletionStage; Source<Integer, NotUsed> source = Source.range(1, 10); Sink<Integer, CompletionStage<Integer>> countSink = Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right()); Sink<Integer, CompletionStage<Integer>> minSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right()); Sink<Integer, CompletionStage<Integer>> maxSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right()); final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>> result = RunnableGraph.fromGraph( GraphDSL.create3( countSink, minSink, maxSink, Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder.from(broadcast.out(0)).to(countS); builder.from(broadcast.out(1)).to(minS); builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); })) .run(system);
Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.
- Scala
- Java
-
source
RunnableGraph.fromGraph( GraphDSL.create3( countSink.async(), minSink.async(), maxSink.async(), Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder.from(broadcast.out(0)).to(countS); builder.from(broadcast.out(1)).to(minS); builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); }));
Reactive Streams semantics
emits when all of the outputs stops backpressuring and there is an input element available
backpressures when any of the outputs backpressures
completes when upstream completes
cancels depends on the eagerCancel
flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.