Source.zipWithN

Combine the elements of multiple streams into a stream of sequences using a combiner function.

Source operators

Signature

Source.zipWithNSource.zipWithN

Description

Combine the elements of multiple streams into a stream of sequences using a combiner function.

This operator is essentially the same as using zipN followed by map to turn the zipped sequence into an arbitrary object to emit downstream.

See also:

Example

In this sample we zip three streams of integers and for each zipped sequence of numbers we calculate the max value and send downstream:

Scala
sourceval numbers = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: Nil)
val otherNumbers = Source(5 :: 2 :: 1 :: 4 :: 10 :: 4 :: Nil)
val andSomeOtherNumbers = Source(3 :: 7 :: 2 :: 1 :: 1 :: Nil)

Source
  .zipWithN((seq: Seq[Int]) => seq.max)(numbers :: otherNumbers :: andSomeOtherNumbers :: Nil)
  .runForeach(println)
// prints:
// 5
// 7
// 3
// 4
// 10
Java
sourceSource<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6));
Source<Integer, NotUsed> otherNumbers = Source.from(Arrays.asList(5, 2, 1, 4, 10, 4));
Source<Integer, NotUsed> andSomeOtherNumbers = Source.from(Arrays.asList(3, 7, 2, 1, 1));

Source.zipWithN(
        (List<Integer> seq) -> seq.stream().mapToInt(i -> i).max().getAsInt(),
        Arrays.asList(numbers, otherNumbers, andSomeOtherNumbers))
    .runForeach(System.out::println, system);
// prints:
// 5
// 7
// 3
// 4
// 10

Note how it stops as soon as any of the original sources reaches its end.

Reactive Streams semantics

emits when all of the inputs has an element available

completes when any upstream completes

backpressures all upstreams when downstream backpressures but also on an upstream that has emitted an element until all other upstreams has emitted elements

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.