New to Akka? Start with the Akka SDK.
mergeSorted
Merge multiple sources.
Signature
Source.mergeSortedSource.mergeSorted Flow.mergeSortedFlow.mergeSorted
Description
Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.
Example
- Scala
-
source
import akka.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 3, 5, 7)) val sourceB = Source(List(2, 4, 6, 8)) sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println)) //prints 1, 2, 3, 4, 5, 6, 7, 8 val sourceC = Source(List(20, 1, 1, 1)) sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println)) //prints 1, 3, 5, 7, 20, 1, 1, 1 - Java
-
source
import akka.stream.javadsl.Keep; import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.*; Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA .mergeSorted(sourceB, Comparator.<Integer>naturalOrder()) .runForeach(System.out::println, system); // prints 1, 2, 3, 4, 5, 6, 7, 8 Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA .mergeSorted(sourceC, Comparator.<Integer>naturalOrder()) .runForeach(System.out::println, system); // prints 1, 3, 5, 7, 20, 1, 1, 1
Reactive Streams semantics
emits when all of the inputs have an element available
backpressures when downstream backpressures
completes when all upstreams complete