collect

Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.

Simple operators

Signature

Source.collect Flow.collect

Description

Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream. This can often replace filter followed by map to achieve the same in one single operator.

Example

Given stream element classes Message, Ping, and Pong, where Ping extends Message and Pong is an unrelated class.

Scala
sourcetrait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
Java
sourcestatic interface Message {}

static class Ping implements Message {
  final int id;

  Ping(int id) {
    this.id = id;
  }
}

static class Pong {
  final int id;

  Pong(int id) {
    this.id = id;
  }
}

From a stream of Message elements we would like to collect all elements of type Ping that have an id != 0, and then covert to Pong with same id.

Scala
sourceval flow: Flow[Message, Pong, NotUsed] =
  Flow[Message].collect {
    case Ping(id) if id != 0 => Pong(id)
  }
Java
sourceFlow<Message, Pong, NotUsed> flow =
    Flow.of(Message.class)
        .collect(
            new PFBuilder<Message, Pong>()
                .match(Ping.class, p -> p.id != 0, p -> new Pong(p.id))
                .build());

Reactive Streams semantics

emits when the provided partial function is defined for the element

backpressures the partial function is defined for the element and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.