mergeSorted

Merge multiple sources.

Fan-in operators

Signature

Source.mergeSortedSource.mergeSorted Flow.mergeSortedFlow.mergeSorted

Description

Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.

Example

Scala
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

val sourceA = Source(List(1, 3, 5, 7))
val sourceB = Source(List(2, 4, 6, 8))

sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println))
//prints 1, 2, 3, 4, 5, 6, 7, 8

val sourceC = Source(List(20, 1, 1, 1))

sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println))
//prints 1, 3, 5, 7, 20, 1, 1, 1
Java
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import java.util.Arrays;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8));
sourceA
    .mergeSorted(sourceB, Comparator.<Integer>naturalOrder())
    .runWith(Sink.foreach(System.out::print), system);
// prints 1, 2, 3, 4, 5, 6, 7, 8

Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1));
sourceA
    .mergeSorted(sourceC, Comparator.<Integer>naturalOrder())
    .runWith(Sink.foreach(System.out::print), system);
// prints 1, 3, 5, 7, 20, 1, 1, 1

Reactive Streams semantics

emits when all of the inputs have an element available

backpressures when downstream backpressures

completes when all upstreams complete

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.