combine

Combine several sources, using a given strategy such as merge or concat, into one source.

Source operators

Signature

def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M]

Description

Provides a way to create a “fan-in” of multiple sources without having to use the more advanced GraphDSL.

The way the elements from the sources are combined is pluggable through the strategy parameter which accepts a function Int => Graph[FanInShape]Integer -> Graph<FanInShape> where the integer parameter specifies the number of sources that the graph must accept. This makes it possible to use combine with the built-in Concat and Merge by expanding their apply methods to functionsusing a method reference to their create methods, but also to use an arbitrary strategy.

Combine is most useful when you have more sources than 2 or want to use a custom operator, as there are more concise operators for 2-source concat and merge

Some of the built-in operators that can be used as strategy are:

Examples

In this example we Merge three different sources of integers. The three sources will immediately start contributing elements to the combined source. The individual elements from each source will be in order but the order compared to elements from other sources is not deterministic:

Scala
import akka.stream.scaladsl.{ Concat, Merge, Source }
// ...

val source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val combined = Source.combine(source1, source2, source3)(Merge(_))
combined.runForeach(println)
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3
Java
import akka.stream.javadsl.Concat;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Source;
// ...

Source<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);

final Source<Integer, NotUsed> combined =
    Source.combine(source1, source2, Collections.singletonList(source3), Merge::create);

combined.runForeach(System.out::println, system);
// could print (order between sources is not deterministic)
// 1
// 12
// 8
// 9
// 13
// 14
// 2
// 10
// 3

If we instead use Concat the first source will get to emit elements until it completes, then the second source until that completes and so on until all the sources has completed.

Scala
val source1 = Source(1 to 3)
val source2 = Source(8 to 10)
val source3 = Source(12 to 14)
val sources = Source.combine(source1, source2, source3)(Concat(_))
sources.runForeach(println)
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14
Java
Source<Integer, NotUsed> source1 = Source.range(1, 3);
Source<Integer, NotUsed> source2 = Source.range(8, 10);
Source<Integer, NotUsed> source3 = Source.range(12, 14);

final Source<Integer, NotUsed> sources =
    Source.combine(source1, source2, Collections.singletonList(source3), Concat::create);

sources.runForeach(System.out::println, system);
// prints (order is deterministic)
// 1
// 2
// 3
// 8
// 9
// 10
// 12
// 13
// 14

Reactive Streams semantics

emits when there is demand, but depending on the strategy

completes depends on the strategy

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.