Source.unfoldAsync
Just like unfold
but the fold function returns a Future
.
Signature
Description
Just like unfold
but the fold function returns a Future
which will cause the source to complete or emit when it completes.
Can be used to implement many stateful sources without having to touch the more low level GraphStage
API.
Examples
In this example we are asking an imaginary actor for chunks of bytes from an offset with a protocol like this:
- Scala
-
source
object DataActor { sealed trait Command case class FetchChunk(offset: Long, replyTo: ActorRef[Chunk]) extends Command case class Chunk(bytes: ByteString)
- Java
The actor will reply with the Chunk
message, if we ask for an offset outside of the end of the data the actor will respond with an empty ByteString
We want to represent this as a stream of ByteString
s that complete when we reach the end, to achieve this we use the offset as the state passed between unfoldAsync
invocations:
- Scala
-
source
// actor we can query for data with an offset val dataActor: ActorRef[DataActor.Command] = ??? import system.executionContext implicit val askTimeout: Timeout = 3.seconds val startOffset = 0L val byteSource: Source[ByteString, NotUsed] = Source.unfoldAsync(startOffset) { currentOffset => // ask for next chunk val nextChunkFuture: Future[DataActor.Chunk] = dataActor.ask(DataActor.FetchChunk(currentOffset, _)) nextChunkFuture.map { chunk => val bytes = chunk.bytes if (bytes.isEmpty) None // end of data else Some((currentOffset + bytes.length, bytes)) } }
- Java
Reactive Streams semantics
emits when there is demand and unfold state returned future completes with some value
completes when the future returned by the unfold function completes with an empty value