Source.fromPublisher
Integration with Reactive Streams, subscribes to a Publisher
.
Signature
- Scala
-
source
def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] =
- Java
-
source
static <T> akka.stream.javadsl.Source<T, NotUsed> fromPublisher(Publisher<T> publisher)
Description
If you want to create a Source
Source
that gets its elements from another library that supports Reactive Streams, you can use JavaFlowSupport.Source.fromPublisher
. This source will produce the elements from the Publisher
, and coordinate backpressure as needed.
If the API you want to consume elements from accepts a Subscriber
instead of providing a Publisher
, see asSubscriber.
For JDK 8 users: since java.util.concurrent.Flow
was introduced in JDK version 9, if you are still on version 8 you may use the org.reactivestreams library with Source.fromPublisher
Source.fromPublisher
.
Example
Suppose we use a database client that supports Reactive Streams, we could create a Source
Source
that queries the database for its rows. That Source
Source
can then be used for further processing, for example creating a Source
Source
that contains the names of the rows.
Because both the database driver and Akka Streams support Reactive Streams, backpressure is applied throughout the stream, preventing us from running out of memory when the database rows are consumed slower than they are produced by the database.
- Scala
-
source
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Publisher; import akka.NotUsed; import akka.stream.scaladsl.Source; import akka.stream.scaladsl.JavaFlowSupport; val names: Source[String, NotUsed] = // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) .map(row => row.name);
- Java
-
source
import java.util.concurrent.Flow.Publisher; import akka.NotUsed; import akka.stream.javadsl.Source; import akka.stream.javadsl.JavaFlowSupport; class Example { public Source<String, NotUsed> names() { // A new subscriber will subscribe to the supplied publisher for each // materialization, so depending on whether the database client supports // this the Source can be materialized more than once. return JavaFlowSupport.Source.<Row>fromPublisher(databaseClient.fetchRows()) .map(row -> row.getField("name")); } }