mergeSorted
Merge multiple sources.
Signature
def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M])(implicit ord: Ordering[U]): Repr[U]
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) => Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3]
Description
Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.
emits when all of the inputs have an element available
backpressures when downstream backpressures
completes when all upstreams complete
Example
- Scala
-
source
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Sink val sourceA = Source(List(1, 3, 5, 7)) val sourceB = Source(List(2, 4, 6, 8)) sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println)) //prints 1, 2, 3, 4, 5, 6, 7, 8 val sourceC = Source(List(20, 1, 1, 1)) sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println)) //prints 1, 3, 5, 7, 20, 1, 1, 1
- Java
-
source
import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.Arrays; Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA .mergeSorted(sourceB, Comparator.<Integer>naturalOrder()) .runWith(Sink.foreach(System.out::print), materializer); // prints 1, 2, 3, 4, 5, 6, 7, 8 Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA .mergeSorted(sourceC, Comparator.<Integer>naturalOrder()) .runWith(Sink.foreach(System.out::print), materializer); // prints 1, 3, 5, 7, 20, 1, 1, 1