interleaveAll

Emits a specifiable number of elements from the original source, then from the provided sources and repeats.

Fan-in operators

Signature

Source.interleaveAllSource.interleaveAll Flow.interleaveAllFlow.interleaveAll

Description

Emits a specifiable number of elements from the original source, then from the provided sources and repeats. If one source completes the rest of the other stream will be emitted when eagerClose is false, otherwise the flow is complete.

Example

Scala
sourceval sourceA = Source(List(1, 2, 7, 8))
val sourceB = Source(List(3, 4, 9))
val sourceC = Source(List(5, 6))

sourceA
  .interleaveAll(List(sourceB, sourceC), 2, eagerClose = false)
  .fold(new StringJoiner(","))((joiner, input) => joiner.add(String.valueOf(input)))
  .runWith(Sink.foreach(println))
//prints 1,2,3,4,5,6,7,8,9
Java
sourceimport akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;

import java.util.*;

Source<Integer, NotUsed> sourceA = Source.from(Arrays.asList(1, 2, 7, 8));
Source<Integer, NotUsed> sourceB = Source.from(Arrays.asList(3, 4, 9));
Source<Integer, NotUsed> sourceC = Source.from(Arrays.asList(5, 6));
sourceA
    .interleaveAll(Arrays.asList(sourceB, sourceC), 2, false)
    .fold(new StringJoiner(","), (joiner, input) -> joiner.add(String.valueOf(input)))
    .runForeach(System.out::println, system);
// prints 1,2,3,4,5,6,7,8,9

Reactive Streams semantics

emits when element is available from the currently consumed upstream

backpressures when upstream backpressures

completes when all upstreams have completed if eagerClose is false, or any upstream completes if eagerClose is true.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.