mergePrioritizedN

Merge multiple sources with priorities.

Fan-in operators

Description

Merge multiple sources. Prefer sources depending on priorities if all sources have elements ready. If a subset of all sources have elements ready the relative priorities for those sources are used to prioritize. For example, when used with only three sources `sourceA`, `sourceB` and `sourceC`, the `sourceA` has a probability of `(priorityOfA) / (priorityOfA + priorityOfB + priorityOfC)` of being prioritized and similarly for the rest of the sources. The priorities for each source must be positive integers.

Example

Scala
```source```import akka.stream.scaladsl.{ Sink, Source }

val sourceA = Source(List(1, 2, 3, 4))
val sourceB = Source(List(10, 20, 30, 40))
val sourceC = Source(List(100, 200, 300, 400))

Source
.mergePrioritizedN(List((sourceA, 9900), (sourceB, 99), (sourceC, 1)), eagerComplete = false)
.runWith(Sink.foreach(println))
// prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400  since both sources have their first element ready and
// the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99% chance of being picked next
// while sourceB has a 0.99% chance and sourceC has a 0.01% chance``````
Java
```source```Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(10, 20, 30, 40));
Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(100, 200, 300, 400));
List<Pair<Source<Integer, ?>, Integer>> sourcesAndPriorities =
Arrays.asList(new Pair<>(sourceA, 9900), new Pair<>(sourceB, 99), new Pair<>(sourceC, 1));
Source.mergePrioritizedN(sourcesAndPriorities, false).runForeach(System.out::println, system);
// prints e.g. 1, 100, 2, 3, 4, 10, 20, 30, 40, 200, 300, 400  since both sources have their
// first element ready and
// the left sourceA has higher priority - if both sources have elements ready, sourceA has a 99%
// chance of being picked next
// while sourceB has a 0.99% chance and sourceC has a 0.01% chance``````

Reactive Streams semantics

emits when one of the inputs has an element available, preferring inputs based on their priorities if multiple have elements available

backpressures when downstream backpressures

completes when all upstreams complete (or when any upstream completes if `eagerComplete=true`.)

Cancels when downstream cancels

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.