Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.

Simple operators


def collectType[T](implicit tag: ClassTag[T]): Repr[T]


Filter elements that is of a given type.


Given stream element classes Message, Ping, and Pong, where Ping extends Message and Pong is an unrelated class.

trait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
static interface Message {}

static class Ping implements Message {
  final int id;

  Ping(int id) { = id;

static class Pong {
  final int id;

  Pong(int id) { = id;

From a stream of Message elements we would like to collect all elements of type Ping that have an id != 0, and then covert to Pong with same id.

val flow: Flow[Message, Pong, NotUsed] =
  Flow[Message].collectType[Ping].filter( != 0).map(p => Pong(
Flow<Message, Pong, NotUsed> flow =
        .filter(p -> != 0)
        .map(p -> new Pong(;

Reactive Streams semantics

emits when the element is of the given type

backpressures the element is of the given type and downstream backpressures

completes when upstream completes

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.