Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.

Simple operators


Source.collectTypeSource.collectType Flow.collectTypeFlow.collectType


Filter elements that is of a given type.


Given stream element classes Message, Ping, and Pong, where Ping extends Message and Pong is an unrelated class.

sourcetrait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
sourcestatic interface Message {}

static class Ping implements Message {
  final int id;

  Ping(int id) { = id;

static class Pong {
  final int id;

  Pong(int id) { = id;

From a stream of Message elements we would like to collect all elements of type Ping that have an id != 0, and then covert to Pong with same id.

sourceval flow: Flow[Message, Pong, NotUsed] =
  Flow[Message].collectType[Ping].filter( != 0).map(p => Pong(
sourceFlow<Message, Pong, NotUsed> flow =
        .filter(p -> != 0)
        .map(p -> new Pong(;

Reactive Streams semantics

emits when the element is of the given type

backpressures the element is of the given type and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.