sliding

Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.

Simple operators

Signature

Flow.slidingFlow.sliding

Description

Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.

Note: the last window might be smaller than the requested size due to end of stream.

Examples

In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step which is 1a step value of 1.

Scala
sourceval source = Source(1 to 4)
source.sliding(2).runForeach(println)
// prints:
// Vector(1, 2)
// Vector(2, 3)
// Vector(3, 4)
Java
sourceSource<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(2, 1).runForeach(n -> System.out.println(n), system);
// prints:
// [1, 2]
// [2, 3]
// [3, 4]

If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:

Scala
sourceval source = Source(1 to 4)
source.sliding(n = 3, step = 2).runForeach(println)
// prints:
// Vector(1, 2, 3)
// Vector(3, 4) - shorter because stream ended before we got 3 elements
Java
sourceSource<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(3, 2).runForeach(n -> System.out.println(n), system);
// prints:
// Vector(1, 2, 3)
// [1, 2, 3]
// [3, 4] - shorter because stream ended before we got 3 elements

One use case for sliding is to implement a moving average, here we do that with a “period” of 5:

Scala
sourceval numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil)
val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size)
movingAverage.runForeach(println)
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11
Java
sourceSource<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11));
Source<Float, NotUsed> movingAverage =
    numbers
        .sliding(5, 1)
        .map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size());
movingAverage.runForeach(n -> System.out.println(n), system);
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11

Sliding can also be used to do simple windowing, see splitAfter.

Reactive Streams semantics

emits the specified number of elements has been accumulated or upstream completed

backpressures when a group has been assembled and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.