asSubscriber

Integration with Reactive Streams, materializes into a org.reactivestreams.Subscriber.

Source operators

Signature

final def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =
public static <T> akka.stream.javadsl.Source<T, java.util.concurrent.Flow.Subscriber<T>> asSubscriber() {

Description

If you want to create a SourceSource that gets its elements from another library that supports Reactive Streams, you can use JavaFlowSupport.Source.asSubscriber. Each time this SourceSource is materialized, it produces a materialized value of type java.util.concurrent.Flow.Subscriber. This Subscriber can be attached to a Reactive Streams Publisher to populate it.

Note

For JDK 8 users: since java.util.concurrent.Flow was introduced in JDK version 9, if you are still on version 8 you may use the org.reactivestreams library with Source.asSubscriber and Flow.asSubscriber.

Example

Suppose we use a database client that supports Reactive Streams, we could create a SourceSource that queries the database for its rows. That SourceSource can then be used for further processing, for example creating a SourceSource that contains the names of the rows.

Note that since the database is queried for each materialization, the rowSource can be safely re-used. Because both the database driver and Akka Streams support Reactive Streams, backpressure is applied throughout the stream, preventing us from running out of memory when the database rows are consumed slower than they are produced by the database.

Scala
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Publisher;

import akka.NotUsed;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.JavaFlowSupport;

val rowSource: Source[Row, NotUsed] =
  JavaFlowSupport.Source.asSubscriber
    .mapMaterializedValue(
      (subscriber: Subscriber[Row]) => {
        // For each materialization, fetch the rows from the database:
        val rows: Publisher[Row] = databaseClient.fetchRows()
        rows.subscribe(subscriber)
        NotUsed
      });

val names: Source[String, NotUsed] =
  // rowSource can be re-used, since it will start a new
  // query for each materialization, fully supporting backpressure
  // for each materialized stream:
  rowSource.map(row => row.name);
Java
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Publisher;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.JavaFlowSupport;

class Example {
    Source<Row, NotUsed> rowSource =
            JavaFlowSupport.Source.<Row>asSubscriber()
                    .mapMaterializedValue(
                            subscriber -> {
                                // For each materialization, fetch the rows from the database:
                                Publisher<Row> rows = databaseClient.fetchRows();
                                rows.subscribe(subscriber);

                                return NotUsed.getInstance();
                            });

    public Source<String, NotUsed> names() {
        // rowSource can be re-used, since it will start a new
        // query for each materialization, fully supporting backpressure
        // for each materialized stream:
        return rowSource.map(row -> row.getField("name"));
    }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.