limit
Limit number of element from upstream to given max
number.
Signature
Description
Limits the number of elements from upstream to a given max
number, if the limit is passed the operator fails the stream with a StreamLimitReachedException
StreamLimitReachedException
.
See also limitWeighted which can choose a weight for each element counting to a total max limit weight. take is also closely related but completes the stream instead of failing it after a certain number of elements.
Example
limit
can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error. In this sample we take at most 10 000 of the untrusted source elements into the aggregated sequence of elements, if the untrusted source emits more elements the stream and the materialized Future[Seq[String]]
CompletionStage<List<String>>
will be failed:
- Scala
-
source
val untrustedSource: Source[String, NotUsed] = Source.repeat("element") val elements: Future[Seq[String]] = untrustedSource.limit(10000).runWith(Sink.seq)
- Java
-
source
Source<String, NotUsed> untrustedSource = Source.repeat("element"); CompletionStage<List<String>> elements = untrustedSource.limit(10000).runWith(Sink.seq(), system);
Reactive Streams semantics
emits when upstream emits and the number of emitted elements has not reached max
backpressures when downstream backpressures
completes when upstream completes and the number of emitted elements has not reached max