Transform each stream element with the help of a state.
Transform each stream element with the help of a state.
The state creation function is invoked once when the stream is materialized and the returned state is passed to the mapping function for mapping the first element.
The mapping function returns a mapped element to emit downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, be a new immutable state but it is also safe to use a mutable state.
The on complete function is called, once, when the first of upstream completion, downstream cancellation or stream failure happens. If the cause is upstream completion and the downstream is still accepting elements, the returned value from the function is passed downstream before completing the operator itself, for the other cases the returned value is ignored.
The statefulMap
operator adheres to the ActorAttributes.SupervisionStrategy attribute.
For mapping stream elements without keeping a state see map.
In the first example we implement an zipWithIndex
operator like zipWithIndex, it always associates a unique index with each element of the stream, the index starts from 0.
- Scala
Source(List("A", "B", "C", "D")) .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) .runForeach(println) //prints //(A,0) //(B,1) //(C,2) //(D,3)
- Java
In the second example, the elements are buffered until the incoming element is different, and then emitted downstream. When upstream completes, if there are buffered elements, they are emitted before completing.
- Scala
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) .statefulMap(() => List.empty[String])( (buffer, element) => buffer match { case head :: _ if head != element => (element :: Nil, buffer) case _ => (element :: buffer, Nil) }, buffer => Some(buffer)) .filter(_.nonEmpty) .runForeach(println) //prints //List(A) //List(B, B) //List(C, C, C) //List(D)
- Java
In the forth example, repeated incoming elements are only emitted once and then dropped.
- Scala
Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) .statefulMap(() => Option.empty[String])( (lastElement, elem) => lastElement match { case Some(head) if head == elem => (Some(elem), None) case _ => (Some(elem), Some(elem)) }, _ => None) .collect { case Some(elem) => elem } .runForeach(println) //prints //A //B //C //D
- Java
In the fifth example, we combine the statefulMap and mapConcat to implement a statefulMapConcat like behavior.
- Scala
Source(1 to 10) .statefulMap(() => List.empty[Int])( (state, elem) => { //grouped 3 elements into a list val newState = elem :: state if (newState.size == 3) (Nil, newState.reverse) else (newState, Nil) }, state => Some(state.reverse)) .mapConcat(identity) .runForeach(println) //prints //1 //2 //3 //4 //5 //6 //7 //8 //9 //10
- Java
Reactive Streams semantics
emits the mapping function returns an element and downstream is ready to consume it
backpressures downstream backpressures
completes upstream completes
cancels downstream cancels