sliding
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Signature
Description
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Note: the last window might be smaller than the requested size due to end of stream.
Examples
In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step
which is 1 .
- Scala
-
source
val source = Source(1 to 4) source.sliding(2).runForeach(println) // prints: // Vector(1, 2) // Vector(2, 3) // Vector(3, 4)
- Java
If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:
- Scala
-
source
val source = Source(1 to 4) source.sliding(n = 3, step = 2).runForeach(println) // prints: // Vector(1, 2, 3) // Vector(3, 4) - shorter because stream ended before we got 3 elements
- Java
One use case for sliding is to implement a moving average, here we do that with a “period” of 5
:
- Scala
-
source
val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil) val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size) movingAverage.runForeach(println) // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
- Java
Sliding can also be used to do simple windowing, see splitAfter.
Reactive Streams semantics
emits the specified number of elements has been accumulated or upstream completed
backpressures when a group has been assembled and downstream backpressures
completes when upstream completes