limitWeighted
Limit the total weight of incoming elements
Signature
Flow.limitWeighted
Flow.limitWeighted
Description
A weight function returns the weight of each element, then the total accumulated weight is compared to a max and if it has passed the max the stream is failed with a StreamLimitReachedException
StreamLimitReachedException
.
See also limit which puts a limit on the number of elements instead (the same as always returning 1
from the weight function).
Examples
limitWeighted
can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error. In this sample we use the number of bytes in each ByteString
element as weight and accept at most a total of 10 000 bytes from the untrusted source elements into the aggregated ByteString
of all bytes, if the untrusted source emits more elements the stream and the materialized Future[ByteString]
CompletionStage<ByteString>
will be failed:
- Scala
-
source
val untrustedSource: Source[ByteString, NotUsed] = Source.repeat(ByteString("element")) val allBytes: Future[ByteString] = untrustedSource.limitWeighted(max = 10000)(_.length).runReduce(_ ++ _)
- Java
-
source
Source<ByteString, NotUsed> untrustedSource = Source.repeat(ByteString.fromString("element")); CompletionStage<ByteString> allBytes = untrustedSource .limitWeighted( 10000, // max bytes bytes -> (long) bytes.length() // bytes of each chunk ) .runReduce(ByteString::concat, system);
Reactive Streams semantics
emits when upstream emits and the number of emitted elements has not reached max
backpressures when downstream backpressures
completes when upstream completes and the number of emitted elements has not reached max