Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.
Note: the last window might be smaller than the requested size due to end of stream.
In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and the default step
which is 1 .
- Scala
val source = Source(1 to 4) source.sliding(2).runForeach(println) // prints: // Vector(1, 2) // Vector(2, 3) // Vector(3, 4)
- Java
If the stream stops without having seen enough elements to fill a window, the last window will have as many elements was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:
- Scala
val source = Source(1 to 4) source.sliding(n = 3, step = 2).runForeach(println) // prints: // Vector(1, 2, 3) // Vector(3, 4) - shorter because stream ended before we got 3 elements
- Java
One use case for sliding is to implement a moving average, here we do that with a “period” of 5
- Scala
val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil) val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size) movingAverage.runForeach(println) // prints // 3.8 = average of 1, 3, 10, 2, 3 // 4.4 = average of 3, 10, 2, 3, 4 // 4.2 = average of 10, 2, 3, 4, 2 // 4.2 = average of 2, 3, 4, 2, 10 // 6.0 = average of 3, 4, 2, 10, 11
- Java
Sliding can also be used to do simple windowing, see splitAfter.
Reactive Streams semantics
emits the specified number of elements has been accumulated or upstream completed
backpressures when a group has been assembled and downstream backpressures
completes when upstream completes