StreamConverters.asJavaStream

Create a sink which materializes into Java 8 Stream that can be run to trigger demand through the sink.

Additional Sink and Source converters

Signature

StreamConvertersStreamConverters

Description

Create a sink which materializes into Java 8 Stream that can be run to trigger demand through the sink. Elements emitted through the stream will be available for reading through the Java 8 Stream.

The Java 8 Stream will be ended when the stream flowing into this Sink completes, and closing the Java Stream will cancel the inflow of this Sink. If the Java Stream throws an exception, the Akka stream is cancelled.

Be aware that Java Stream blocks current thread while waiting on next element from downstream.

Example

Here is an example of a SinkSink that materializes into a java.util.stream.Stream.

Scala
import java.util.stream
import java.util.stream.IntStream

import akka.NotUsed
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.StreamConverters
val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0)

val sink: Sink[Int, stream.Stream[Int]] = StreamConverters.asJavaStream[Int]()

val jStream: java.util.stream.Stream[Int] = source.runWith(sink)
Java
import akka.japi.function.Creator;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.StreamConverters;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.BaseStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);

Sink<Integer, java.util.stream.Stream<Integer>> sink = StreamConverters.<Integer>asJavaStream();

Stream<Integer> jStream = source.runWith(sink, system);

Reactive Streams semantics

cancels when the Java Stream is closed

backpressures when no read is pending on the Java Stream

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.