StreamConverters.fromInputStream
Create a source that wraps an InputStream
.
Additional Sink and Source converters
Signature
StreamConverters.fromInputStream
StreamConverters.fromInputStream
Description
Creates a Source from a java.io.InputStream
created by the given function. Emitted elements are up to chunkSize
sized ByteString
ByteString
s elements. The actual size of the emitted elements depends on how much data the underlying java.io.InputStream
returns on each read invocation. Such chunks will never be larger than chunkSize
though.
You can configure the default dispatcher for this Source by changing the akka.stream.materializer.blocking-io-dispatcher
or set it for a given Source by using akka.stream.ActorAttributes
.
It materializes a CompletionStage
Future
of IOResult
containing the number of bytes read from the source file upon completion, and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does not give any guarantee that the bytes were seen by downstream stages.
The created InputStream
will be closed when the Source
is cancelled.
See also fromOutputStream
Example
Here is an example using both fromInputStream
and fromOutputStream
to read from a java.io.InputStream
, uppercase the read content and write back out into a java.io.OutputStream
.
- Scala
-
source
val bytes = "Some random input".getBytes val inputStream = new ByteArrayInputStream(bytes) val outputStream = new ByteArrayOutputStream() val source: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => inputStream) val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte)) val sink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => outputStream) val eventualResult = source.via(toUpperCase).runWith(sink)
- Java
-
source
java.io.InputStream inputStream = new ByteArrayInputStream(bytes); Source<ByteString, CompletionStage<IOResult>> source = StreamConverters.fromInputStream(() -> inputStream); // Given a ByteString produces a ByteString with the upperCase representation // Removed from the sample for brevity. // Flow<ByteString, ByteString, NotUsed> toUpperCase = ... java.io.OutputStream outputStream = new ByteArrayOutputStream(); Sink<ByteString, CompletionStage<IOResult>> sink = StreamConverters.fromOutputStream(() -> outputStream); CompletionStage<IOResult> ioResultCompletionStage = source.via(toUpperCase).runWith(sink, system); // When the ioResultCompletionStage completes, the byte array backing the outputStream // will contain the uppercase representation of the bytes read from the inputStream.