limit

Limit number of element from upstream to given max number.

Simple operators

Signature

Flow.limitFlow.limit

Description

Limits the number of elements from upstream to a given max number, if the limit is passed the operator fails the stream with a StreamLimitReachedExceptionStreamLimitReachedException.

See also limitWeighted which can choose a weight for each element counting to a total max limit weight. take is also closely related but completes the stream instead of failing it after a certain number of elements.

Example

limit can protect a stream coming from an untrusted source into an in-memory aggregate that grows with the number of elements from filling the heap and causing an out-of-memory error. In this sample we take at most 10 000 of the untrusted source elements into the aggregated sequence of elements, if the untrusted source emits more elements the stream and the materialized Future[Seq[String]]CompletionStage<List<String>> will be failed:

Scala
val untrustedSource: Source[String, NotUsed] = Source.repeat("element")

val elements: Future[Seq[String]] =
  untrustedSource.limit(10000).runWith(Sink.seq)
Java
Source<String, NotUsed> untrustedSource = Source.repeat("element");

CompletionStage<List<String>> elements =
    untrustedSource.limit(10000).runWith(Sink.seq(), system);

Reactive Streams semantics

emits when upstream emits and the number of emitted elements has not reached max

backpressures when downstream backpressures

completes when upstream completes and the number of emitted elements has not reached max

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.