Broadcast
Emit each incoming element each of n
outputs.
Signature
Description
Emit each incoming element each of n
outputs.
Example
Here is an example that is using Broadcast
to aggregate different values from a Source
of integers.
- Scala
-
import akka.NotUsed import akka.stream.ClosedShape import akka.stream.scaladsl.Flow import akka.stream.scaladsl.GraphDSL import akka.stream.scaladsl.RunnableGraph import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source val source: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100) val countSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => acc + 1))(Keep.right) val minSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.min(acc, elem)))(Keep.right) val maxSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.max(acc, elem)))(Keep.right) val (count: Future[Int], min: Future[Int], max: Future[Int]) = RunnableGraph .fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) { implicit builder => (countS, minS, maxS) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](3)) source ~> broadcast broadcast.out(0) ~> countS broadcast.out(0) ~> minS broadcast.out(0) ~> maxS ClosedShape }) .run()
- Java
-
import akka.NotUsed; import akka.japi.tuple.Tuple3; import akka.stream.ClosedShape; import akka.stream.UniformFanOutShape; import akka.stream.javadsl.Broadcast; import akka.stream.javadsl.Flow; import akka.stream.javadsl.GraphDSL; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.util.concurrent.CompletionStage; Source<Integer, NotUsed> source = Source.range(1, 10); Sink<Integer, CompletionStage<Integer>> countSink = Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right()); Sink<Integer, CompletionStage<Integer>> minSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right()); Sink<Integer, CompletionStage<Integer>> maxSink = Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right()); final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>> result = RunnableGraph.fromGraph( GraphDSL.create3( countSink, minSink, maxSink, Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder.from(broadcast.out(0)).to(countS); builder.from(broadcast.out(1)).to(minS); builder.from(broadcast.out(2)).to(maxS); return ClosedShape.getInstance(); })) .run(system);
Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.
- Scala
-
RunnableGraph.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) { implicit builder => (countS, minS, maxS) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](3)) source ~> broadcast broadcast.out(0) ~> Flow[Int].async ~> countS broadcast.out(0) ~> Flow[Int].async ~> minS broadcast.out(0) ~> Flow[Int].async ~> maxS ClosedShape })
- Java
-
RunnableGraph.fromGraph( GraphDSL.create3( countSink, minSink, maxSink, Tuple3::create, (builder, countS, minS, maxS) -> { final UniformFanOutShape<Integer, Integer> broadcast = builder.add(Broadcast.create(3)); builder.from(builder.add(source)).viaFanOut(broadcast); builder .from(broadcast.out(0)) .via(builder.add(Flow.of(Integer.class).async())) .to(countS); builder .from(broadcast.out(1)) .via(builder.add(Flow.of(Integer.class).async())) .to(minS); builder .from(broadcast.out(2)) .via(builder.add(Flow.of(Integer.class).async())) .to(maxS); return ClosedShape.getInstance(); }));
Reactive Streams semantics
emits when all of the outputs stops backpressuring and there is an input element available
backpressures when any of the outputs backpressures
completes when upstream completes
cancels depends on the eagerCancel
flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.