throttle
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.
Signature
Source.throttle
Source.throttle
Flow.throttle
Flow.throttle
Description
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.
The throttle operator combines well with the queue
operator to adapt the speeds on both ends of the queue
-throttle
pair.
See also Buffers and working with rate for related operators.
Example
Imagine the server end of a streaming platform. When a client connects and request a video content, the server should return the content. Instead of serving a complete video as fast as bandwith allows, throttle
can be used to limit the network usage to 24 frames per second (let’s imagine this streaming platform stores frames, not bytes).
- Scala
-
source
val framesPerSecond = 24 // val frameSource: Source[Frame,_] val videoThrottling = frameSource.throttle(framesPerSecond, 1.second) // serialize `Frame` and send over the network.
- Java
-
source
int framesPerSecond = 24; Source<Frame, NotUsed> videoThrottling = frameSource.throttle(framesPerSecond, Duration.ofSeconds(1)); // serialize `Frame` and send over the network.
The problem in the example above is that when there’s a network hiccup, the video playback will interrupt. It can be improved by sending more content than the necessary ahead of time and let the client buffer that. So, throttle
can be used to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in a good chunk of content will be downloaded and after that the server will activate the throttling.
- Scala
-
source
// val frameSource: Source[Frame,_] val videoThrottlingWithBurst = frameSource.throttle( framesPerSecond, 1.second, framesPerSecond * 30, // maximumBurst ThrottleMode.Shaping) // serialize `Frame` and send over the network.
- Java
-
source
Source<Frame, NotUsed> throttlingWithBurst = frameSource.throttle( framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping()); // serialize `Frame` and send over the network.
The extra argument to set the ThrottleMode
to shaping
tells throttle
to make pauses to avoid exceeding the maximum rate. Alternatively we could set the throttling mode to cause a stream failure when upstream is faster than the throttle rate.
The examples above don’t cover all the parameters supported by throttle
(e.g. cost
-based throttling). See the api documentation
api documentation
for all the details.
Reactive Streams semantics
emits when upstream emits an element and configured time per each element elapsed
backpressures when downstream backpressures
completes when upstream completes