limitWeighted

Limit the total weight of incoming elements

Simple operators

Signature

Flow.limitWeightedFlow.limitWeighted

Description

A weight function returns the weight of each element, then the total accumulated weight is compared to a max and if it has passed the max the stream is failed with a StreamLimitReachedExceptionStreamLimitReachedException.

See also limit which puts a limit on the number of elements instead (the same as always returning 1 from the weight function).

Examples

limitWeighted can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error. In this sample we use the number of bytes in each ByteString element as weight and accept at most a total of 10 000 bytes from the untrusted source elements into the aggregated ByteString of all bytes, if the untrusted source emits more elements the stream and the materialized Future[ByteString]CompletionStage<ByteString> will be failed:

Scala
sourceval untrustedSource: Source[ByteString, NotUsed] = Source.repeat(ByteString("element"))

val allBytes: Future[ByteString] =
  untrustedSource.limitWeighted(max = 10000)(_.length).runReduce(_ ++ _)
Java
sourceSource<ByteString, NotUsed> untrustedSource = Source.repeat(ByteString.fromString("element"));

CompletionStage<ByteString> allBytes =
    untrustedSource
        .limitWeighted(
            10000, // max bytes
            bytes -> (long) bytes.length() // bytes of each chunk
            )
        .runReduce(ByteString::concat, system);

Reactive Streams semantics

emits when upstream emits and the number of emitted elements has not reached max

backpressures when downstream backpressures

completes when upstream completes and the number of emitted elements has not reached max

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.