groupBy

Demultiplex the incoming stream into separate output streams.

Nesting and flattening operators

Signature

Source.groupBySource.groupBy Flow.groupByFlow.groupBy

Description

This operation demultiplexes the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time a new substream is opened and subsequently fed with all elements belonging to that key.

Note: If allowClosedSubstreamRecreation is set to true substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost.

Warning

If allowClosedSubstreamRecreation is set to false (default behavior) the operators keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.

Example

Scala
sourceSource(1 to 10)
  .groupBy(maxSubstreams = 2, _ % 2) // create two sub-streams with odd and even numbers
  .reduce(_ + _) // for each sub-stream, sum its elements
  .mergeSubstreams // merge back into a stream
  .runForeach(println)
//30
//25
Java
sourceSource.range(1, 10)
    .groupBy(2, i -> i % 2 == 0) // create two sub-streams with odd and even numbers
    .reduce(Integer::sum) // for each sub-stream, sum its elements
    .mergeSubstreams() // merge back into a stream
    .runForeach(System.out::println, system);
// 25
// 30

Note: groupBy operator is used to partition the stream based on an identifier which is not same as using async boundary in the stream which is used for running the same stream concurrently. If a stream can be exclusively partitioned, it can be executed efficiently, by maximizing the parallel processing using groupBy operator.

Example with async boundary that introduces running concurrently :

Scala
sourceSource(1 to 10)
  .groupBy(maxSubstreams = 2, _ % 2) // create two sub-streams with odd and even numbers
  .via(Flow[Int].map(_ => 1).reduce(_ + _).async) // for each sub-stream, sum its elements
  .mergeSubstreams // merge back into a stream
  .runForeach(println)
//30
//25
Java
sourceSource.range(1, 10)
        .groupBy(2, i -> i % 2 == 0) // create two sub-streams with odd and even numbers
        .via(Flow.of(Integer.class).map(elem-> 1).reduce(Integer::sum).async()) // for each sub-stream, sum its elements
        .mergeSubstreams() // merge back into a stream
        .runForeach(System.out::println, system);
// 25
// 30

Reactive Streams semantics

emits an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures

completes when upstream completes (Until the end of stream it is not possible to know whether new substreams will be needed or not)

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.