Broadcast

Emit each incoming element each of n outputs.

Fan-out operators

Signature

BroadcastBroadcast

Description

Emit each incoming element each of n outputs.

Example

Here is an example that is using Broadcast to aggregate different values from a Source of integers.

Scala
import akka.NotUsed
import akka.stream.ClosedShape
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

val source: Source[Int, NotUsed] =
  Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)

val countSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => acc + 1))(Keep.right)
val minSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.min(acc, elem)))(Keep.right)
val maxSink: Sink[Int, Future[Int]] = Flow[Int].toMat(Sink.fold(0)((acc, elem) => math.max(acc, elem)))(Keep.right)

val (count: Future[Int], min: Future[Int], max: Future[Int]) =
  RunnableGraph
    .fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
      implicit builder => (countS, minS, maxS) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](3))
        source ~> broadcast
        broadcast.out(0) ~> countS
        broadcast.out(0) ~> minS
        broadcast.out(0) ~> maxS
        ClosedShape
    })
    .run()
Java
import akka.NotUsed;
import akka.japi.tuple.Tuple3;
import akka.stream.ClosedShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Broadcast;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;

Source<Integer, NotUsed> source = Source.range(1, 10);

Sink<Integer, CompletionStage<Integer>> countSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, (acc, elem) -> acc + 1), Keep.right());
Sink<Integer, CompletionStage<Integer>> minSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, Math::min), Keep.right());
Sink<Integer, CompletionStage<Integer>> maxSink =
    Flow.of(Integer.class).toMat(Sink.fold(0, Math::max), Keep.right());

final Tuple3<CompletionStage<Integer>, CompletionStage<Integer>, CompletionStage<Integer>>
    result =
        RunnableGraph.fromGraph(
                GraphDSL.create3(
                    countSink,
                    minSink,
                    maxSink,
                    Tuple3::create,
                    (builder, countS, minS, maxS) -> {
                      final UniformFanOutShape<Integer, Integer> broadcast =
                          builder.add(Broadcast.create(3));
                      builder.from(builder.add(source)).viaFanOut(broadcast);
                      builder.from(broadcast.out(0)).to(countS);
                      builder.from(broadcast.out(1)).to(minS);
                      builder.from(broadcast.out(2)).to(maxS);
                      return ClosedShape.getInstance();
                    }))
            .run(system);

Note that asynchronous boundary for the output streams must be added explicitly if it’s desired to run them in parallel.

Scala
RunnableGraph.fromGraph(GraphDSL.create(countSink, minSink, maxSink)(Tuple3.apply) {
  implicit builder => (countS, minS, maxS) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](3))
    source ~> broadcast
    broadcast.out(0) ~> Flow[Int].async ~> countS
    broadcast.out(0) ~> Flow[Int].async ~> minS
    broadcast.out(0) ~> Flow[Int].async ~> maxS
    ClosedShape
})
Java
RunnableGraph.fromGraph(
    GraphDSL.create3(
        countSink,
        minSink,
        maxSink,
        Tuple3::create,
        (builder, countS, minS, maxS) -> {
          final UniformFanOutShape<Integer, Integer> broadcast =
              builder.add(Broadcast.create(3));
          builder.from(builder.add(source)).viaFanOut(broadcast);

          builder
              .from(broadcast.out(0))
              .via(builder.add(Flow.of(Integer.class).async()))
              .to(countS);
          builder
              .from(broadcast.out(1))
              .via(builder.add(Flow.of(Integer.class).async()))
              .to(minS);
          builder
              .from(broadcast.out(2))
              .via(builder.add(Flow.of(Integer.class).async()))
              .to(maxS);
          return ClosedShape.getInstance();
        }));

Reactive Streams semantics

emits when all of the outputs stops backpressuring and there is an input element available

backpressures when any of the outputs backpressures

completes when upstream completes

cancels depends on the eagerCancel flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.