collectType
Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.
Signature
Source.collectTypeSource.collectType Flow.collectTypeFlow.collectType
Description
Filter elements that is of a given type.
Example
Given stream element classes Message, Ping, and Pong, where Ping extends Message and Pong is an unrelated class.
- Scala
-
source
trait Message final case class Ping(id: Int) extends Message final case class Pong(id: Int) - Java
-
source
static interface Message {} static class Ping implements Message { final int id; Ping(int id) { this.id = id; } } static class Pong { final int id; Pong(int id) { this.id = id; } }
From a stream of Message elements we would like to collect all elements of type Ping that have an id != 0, and then covert to Pong with same id.
- Scala
-
source
val flow: Flow[Message, Pong, NotUsed] = Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id)) - Java
-
source
Flow<Message, Pong, NotUsed> flow = Flow.of(Message.class) .collectType(Ping.class) .filter(p -> p.id != 0) .map(p -> new Pong(p.id));
Reactive Streams semantics
emits when the element is of the given type
backpressures the element is of the given type and downstream backpressures
completes when upstream completes