collectType

Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.

Simple operators

Signature

Source.collectTypeSource.collectType Flow.collectTypeFlow.collectType

Description

Filter elements that is of a given type.

Example

Given stream element classes Message, Ping, and Pong, where Ping extends Message and Pong is an unrelated class.

Scala
sourcetrait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
Java
sourcestatic interface Message {}

static class Ping implements Message {
  final int id;

  Ping(int id) {
    this.id = id;
  }
}

static class Pong {
  final int id;

  Pong(int id) {
    this.id = id;
  }
}

From a stream of Message elements we would like to collect all elements of type Ping that have an id != 0, and then covert to Pong with same id.

Scala
sourceval flow: Flow[Message, Pong, NotUsed] =
  Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id))
Java
sourceFlow<Message, Pong, NotUsed> flow =
    Flow.of(Message.class)
        .collectType(Ping.class)
        .filter(p -> p.id != 0)
        .map(p -> new Pong(p.id));

Reactive Streams semantics

emits when the element is of the given type

backpressures the element is of the given type and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.