mergeSorted
Merge multiple sources.
Signature
Source.mergeSorted
Source.mergeSorted
Flow.mergeSorted
Flow.mergeSorted
Description
Merge multiple sources. Waits for one element to be ready from each input stream and emits the smallest element.
Example
- Scala
-
source
import akka.stream.scaladsl.{ Sink, Source } val sourceA = Source(List(1, 3, 5, 7)) val sourceB = Source(List(2, 4, 6, 8)) sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println)) //prints 1, 2, 3, 4, 5, 6, 7, 8 val sourceC = Source(List(20, 1, 1, 1)) sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println)) //prints 1, 3, 5, 7, 20, 1, 1, 1
- Java
-
source
import akka.stream.javadsl.Keep; import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.*; Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA .mergeSorted(sourceB, Comparator.<Integer>naturalOrder()) .runForeach(System.out::println, system); // prints 1, 2, 3, 4, 5, 6, 7, 8 Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA .mergeSorted(sourceC, Comparator.<Integer>naturalOrder()) .runForeach(System.out::println, system); // prints 1, 3, 5, 7, 20, 1, 1, 1
Reactive Streams semantics
emits when all of the inputs have an element available
backpressures when downstream backpressures
completes when all upstreams complete