groupedWeighted
Accumulate incoming events until the combined weight of elements is greater than or equal to the minimum weight and then pass the collection of elements downstream.
Signature
Source.groupedWeighted
Source.groupedWeighted
Flow.groupedWeighted
Flow.groupedWeighted
Description
Chunk up this stream into groups of elements that have a cumulative weight greater than or equal to the minWeight
, with the last group possibly smaller than requested minWeight
due to end-of-stream.
See also:
- grouped for a variant that groups based on number of elements
- groupedWithin for a variant that groups based on number of elements and a time window
- groupedWeightedWithin for a variant that groups based on element weight and a time window
Examples
The below example demonstrates how groupedWeighted
groups the accumulated elements into Seq
List
and maps with other operation.
- Scala
-
source
val collections = immutable.Iterable(Seq(1, 2), Seq(3, 4), Seq(5, 6)) Source[Seq[Int]](collections).groupedWeighted(4)(_.length).runForeach(println) // Vector(Seq(1, 2), Seq(3, 4)) // Vector(Seq(5, 6)) Source[Seq[Int]](collections).groupedWeighted(3)(_.length).runForeach(println) // Vector(Seq(1, 2), Seq(3, 4)) // Vector(Seq(5, 6))
- Java
-
source
Source.from(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5, 6))) .groupedWeighted(4, x -> (long) x.size()) .runForeach(System.out::println, system); // [[1, 2], [3, 4]] // [[5, 6]] Source.from(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5, 6))) .groupedWeighted(3, x -> (long) x.size()) .runForeach(System.out::println, system); // [[1, 2], [3, 4]] // [[5, 6]]
Reactive Streams semantics
emits when the cumulative weight of elements is greater than or equal to the minimum weight or upstream completed
backpressures when a group has been assembled and downstream backpressures
completes when upstream completes