throttle

Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.

Simple operators

Signature

Source.throttleSource.throttle Flow.throttleFlow.throttle

Description

Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element.

The throttle operator combines well with the queue operator to adapt the speeds on both ends of the queue-throttle pair.

See also Buffers and working with rate for related operators.

Example

Imagine the server end of a streaming platform. When a client connects and request a video content, the server should return the content. Instead of serving a complete video as fast as bandwith allows, throttle can be used to limit the network usage to 24 frames per second (let’s imagine this streaming platform stores frames, not bytes).

Scala
val framesPerSecond = 24

// val frameSource: Source[Frame,_]
val videoThrottling = frameSource.throttle(framesPerSecond, 1.second)
// serialize `Frame` and send over the network.
Java
int framesPerSecond = 24;

Source<Frame, NotUsed> videoThrottling =
    frameSource.throttle(framesPerSecond, Duration.ofSeconds(1));
// serialize `Frame` and send over the network.

The problem in the example above is that when there’s a network hiccup, the video playback will interrupt. It can be improved by sending more content than the necessary ahead of time and let the client buffer that. So, throttle can be used to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in a good chunk of content will be downloaded and after that the server will activate the throttling.

Scala
// val frameSource: Source[Frame,_]
val videoThrottlingWithBurst = frameSource.throttle(
  framesPerSecond,
  1.second,
  framesPerSecond * 30, // maximumBurst
  ThrottleMode.Shaping)
// serialize `Frame` and send over the network.
Java
Source<Frame, NotUsed> throttlingWithBurst =
    frameSource.throttle(
        framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping());
// serialize `Frame` and send over the network.

The extra argument to set the ThrottleMode to shapping tells throttle to make pauses to avoid exceeding the maximum rate. Alternatively we could set the throttling mode to cause a stream failure when upstream is faster than the throttle rate.

The examples above don’t cover all the parameters supported by throttle (e.g cost-based throttling). See the api documentationapi documentation for all the details.

Reactive Streams semantics

emits when upstream emits an element and configured time per each element elapsed

backpressures when downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.