sliding

Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.

Simple operators

Signature

Flow.slidingFlow.sliding

Description

Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.

Note: the last window might be smaller than the requested size due to end of stream.

Examples

In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step which is 1a step value of 1.

Scala
val source = Source(1 to 4)
source.sliding(2).runForeach(println)
// prints:
// Vector(1, 2)
// Vector(2, 3)
// Vector(3, 4)
Java
Source<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(2, 1).runForeach(n -> System.out.println(n), system);
// prints:
// [1, 2]
// [2, 3]
// [3, 4]

If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:

Scala
val source = Source(1 to 4)
source.sliding(n = 3, step = 2).runForeach(println)
// prints:
// Vector(1, 2, 3)
// Vector(3, 4) - shorter because stream ended before we got 3 elements
Java
Source<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(3, 2).runForeach(n -> System.out.println(n), system);
// prints:
// Vector(1, 2, 3)
// [1, 2, 3]
// [3, 4] - shorter because stream ended before we got 3 elements

One use case for sliding is to implement a moving average, here we do that with a “period” of 5:

Scala
val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil)
val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size)
movingAverage.runForeach(println)
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11
Java
Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11));
Source<Float, NotUsed> movingAverage =
    numbers
        .sliding(5, 1)
        .map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size());
movingAverage.runForeach(n -> System.out.println(n), system);
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11

Sliding can also be used to do simple windowing, see splitAfter.

Reactive Streams semantics

emits the specified number of elements has been accumulated or upstream completed

backpressures when a group has been assembled and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.