extrapolate
Allow for a faster downstream by expanding the last emitted element to an Iterator
.
Signature
Source.extrapolate
Source.extrapolate
Flow.extrapolate
Flow.extrapolate
Description
Allow for a faster downstream by expanding the last emitted element to an Iterator
. For example, an Iterator.continually(element)
will cause extrapolate
to keep repeating the last emitted element.
All original elements are always emitted unchanged - the Iterator
is only used whenever there is downstream demand before upstream emits a new element.
Includes an optional initial
argument to prevent blocking the entire stream when there are multiple producers.
See Understanding extrapolate and expand for more information and examples.
Example
Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible the network bandwidth is a bit unreliable. It’s fine, as long as the audio remains fluent, it doesn’t matter if we can’t decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last frame decoded:
- Scala
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. val rateControl: Flow[Frame, Frame, NotUsed] = Flow[Frame].extrapolate((frame: Frame) => { val grayedOut = frame.withFilter(Gray) Iterator.continually(grayedOut) }, Some(Frame.blackFrame)) val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) // let's create a 25fps stream (a Frame every 40.millis) val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) val videoAt25Fps: Source[Frame, Cancellable] = tickSource.zip(videoSource).map(_._2)
- Java
-
source
// if upstream is too slow, produce copies of the last frame but grayed out. Flow<Frame, Frame, NotUsed> rateControl = Flow.of(Frame.class) .extrapolate( lastFrame -> { Frame gray = new Frame( ByteString.fromString( "gray frame!! - " + lastFrame.pixels().utf8String())); return Stream.iterate(gray, i -> i).iterator(); }, BLACK_FRAME // initial value ); Source<Frame, NotUsed> videoSource = networkSource.via(decode).via(rateControl); // let's create a 25fps stream (a Frame every 40.millis) Source<String, Cancellable> tickSource = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); Source<Frame, Cancellable> videoAt25Fps = tickSource.zip(videoSource).map(Pair::second);
Reactive Streams semantics
emits when downstream stops backpressuring
backpressures when downstream backpressures
completes when upstream completes