sliding
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Signature
Description
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Note: the last window might be smaller than the requested size due to end of stream.
Examples
In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step
which is 1a step value of 1.
- Scala
-
source
val source = Source(1 to 4) source.sliding(2).runForeach(println) // prints: // Vector(1, 2) // Vector(2, 3) // Vector(3, 4)
- Java
-
source
Source<Integer, NotUsed> source = Source.range(1, 4); source.sliding(2, 1).runForeach(n -> System.out.println(n), system); // prints: // [1, 2] // [2, 3] // [3, 4]
If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:
- Scala
-
source
val source = Source(1 to 4) source.sliding(n = 3, step = 2).runForeach(println) // prints: // Vector(1, 2, 3) // Vector(3, 4) - shorter because stream ended before we got 3 elements
- Java
-
source
Source<Integer, NotUsed> source = Source.range(1, 4); source.sliding(3, 2).runForeach(n -> System.out.println(n), system); // prints: // Vector(1, 2, 3) // [1, 2, 3] // [3, 4] - shorter because stream ended before we got 3 elements
One use case for sliding is to implement a moving average, here we do that with a “period” of 5
:
- Scala
-
source
val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil) val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size) movingAverage.runForeach(println) // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
- Java
-
source
Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11)); Source<Float, NotUsed> movingAverage = numbers .sliding(5, 1) .map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size()); movingAverage.runForeach(n -> System.out.println(n), system); // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
Sliding can also be used to do simple windowing, see splitAfter.
Reactive Streams semantics
emits the specified number of elements has been accumulated or upstream completed
backpressures when a group has been assembled and downstream backpressures
completes when upstream completes