collectType
Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.
Signature
Source.collectType
Source.collectType
Flow.collectType
Flow.collectType
Description
Filter elements that is of a given type.
Example
Given stream element classes Message
, Ping
, and Pong
, where Ping
extends Message
and Pong
is an unrelated class.
- Scala
-
source
trait Message final case class Ping(id: Int) extends Message final case class Pong(id: Int)
- Java
-
source
static interface Message {} static class Ping implements Message { final int id; Ping(int id) { this.id = id; } } static class Pong { final int id; Pong(int id) { this.id = id; } }
From a stream of Message
elements we would like to collect all elements of type Ping
that have an id != 0
, and then covert to Pong
with same id.
- Scala
-
source
val flow: Flow[Message, Pong, NotUsed] = Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id))
- Java
-
source
Flow<Message, Pong, NotUsed> flow = Flow.of(Message.class) .collectType(Ping.class) .filter(p -> p.id != 0) .map(p -> new Pong(p.id));
Reactive Streams semantics
emits when the element is of the given type
backpressures the element is of the given type and downstream backpressures
completes when upstream completes