mergeSorted

Merge multiple sources.

Fan-in operators

Signature

def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M])(implicit ord: Ordering[U]): Repr[U]
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3]

Description

Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.

emits when all of the inputs have an element available

backpressures when downstream backpressures

completes when all upstreams complete

Example

Scala
sourceimport akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink

val sourceA = Source(List(1, 3, 5, 7))
val sourceB = Source(List(2, 4, 6, 8))

sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println))
//prints 1, 2, 3, 4, 5, 6, 7, 8

val sourceC = Source(List(20, 1, 1, 1))

sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println))
//prints 1, 3, 5, 7, 20, 1, 1, 1
Java
sourceimport akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import java.util.Arrays;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8));
sourceA
    .mergeSorted(sourceB, Comparator.<Integer>naturalOrder())
    .runWith(Sink.foreach(System.out::print), materializer);
// prints 1, 2, 3, 4, 5, 6, 7, 8

Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1));
sourceA
    .mergeSorted(sourceC, Comparator.<Integer>naturalOrder())
    .runWith(Sink.foreach(System.out::print), materializer);
// prints 1, 3, 5, 7, 20, 1, 1, 1
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.