
Merge multiple sources.

Fan-in operators


Source.mergePreferred Flow.mergePreferred


Merge multiple sources. If all sources have elements ready, emit the preferred source first. Then emit the preferred source again if another element is pushed. Otherwise, emit all the secondary sources. Repeat until streams are empty. For the case with two sources, when preferred is set to true then prefer the right source, otherwise prefer the left source (see examples).


sourceimport{ Sink, Source }

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))

sourceA.mergePreferred(sourceB, false).runWith(Sink.foreach(println))
// prints 1, 10, ... since both sources have their first element ready and the left source is preferred

sourceA.mergePreferred(sourceB, true).runWith(Sink.foreach(println))
// prints 10, 1, ... since both sources have their first element ready and the right source is preferred
sourceSource<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));

sourceA.mergePreferred(sourceB, false, false).runForeach(System.out::println, system);
// prints 1, 10, ... since both sources have their first element ready and the left source is
// preferred

sourceA.mergePreferred(sourceB, true, false).runForeach(System.out::println, system);
// prints 10, 1, ... since both sources have their first element ready and the right source is
// preferred

Reactive Streams semantics

emits when one of the inputs has an element available, preferring a defined input if multiple have elements available

backpressures when downstream backpressures

completes when all upstreams complete (This behavior is changeable to completing when any upstream completes by setting eagerComplete=true.)

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.