grouped

Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.

Simple operators

Signature

Source.groupedSource.grouped Flow.groupedFlow.grouped

Description

Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.

See also:

Examples

The below example demonstrates how grouped groups the accumulated elements into Seq List and maps with other operation.

Scala
sourceSource(1 to 7).grouped(3).runForeach(println)
// Vector(1, 2, 3)
// Vector(4, 5, 6)
// Vector(7)

Source(1 to 7).grouped(3).map(_.sum).runForeach(println)
// 6   (= 1 + 2 + 3)
// 15  (= 4 + 5 + 6)
// 7   (= 7)
Java
sourceSource.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7))
    .grouped(3)
    .runForeach(System.out::println, system);
// [1, 2, 3]
// [4, 5, 6]
// [7]

Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7))
    .grouped(3)
    .map(g -> g.stream().reduce(0, Integer::sum))
    .runForeach(System.out::println, system);
// 6   (= 1 + 2 + 3)
// 15  (= 4 + 5 + 6)
// 7   (= 7)

Reactive Streams semantics

emits when the specified number of elements has been accumulated or upstream completed

backpressures when a group has been assembled and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.