merge

Merge multiple sources.

Fan-in operators

Signature

def merge[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[U]
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) => Mat3): ReprMat[U, Mat3]

Description

Merge multiple sources. Picks elements randomly if all sources has elements ready.

emits when one of the inputs has an element available

backpressures when downstream backpressures

completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true.)

Example

Scala
sourceimport akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

sourceA.merge(sourceB).runWith(Sink.foreach(println))
// merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40
Java
sourceimport akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import java.util.Arrays;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
sourceA.merge(sourceB).runWith(Sink.foreach(System.out::print), materializer);
// merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.