Source.unfoldAsync
Just like unfold
but the fold function returns a Future
CompletionStage
.
Signature
Source.unfoldAsync
Source.unfoldAsync
Description
Just like unfold
but the fold function returns a Future
CompletionStage
which will cause the source to complete or emit when it completes.
Can be used to implement many stateful sources without having to touch the more low level GraphStage
API.
Examples
In this example we are asking an imaginary actor for chunks of bytes from an offset with a protocol like this:
- Scala
-
source
object DataActor { sealed trait Command case class FetchChunk(offset: Long, replyTo: ActorRef[Chunk]) extends Command case class Chunk(bytes: ByteString)
- Java
-
source
class DataActor { interface Command {} static final class FetchChunk implements Command { public final long offset; public final ActorRef<Chunk> replyTo; public FetchChunk(long offset, ActorRef<Chunk> replyTo) { this.offset = offset; this.replyTo = replyTo; } } static final class Chunk { public final ByteString bytes; public Chunk(ByteString bytes) { this.bytes = bytes; } }
The actor will reply with the Chunk
message, if we ask for an offset outside of the end of the data the actor will respond with an empty ByteString
We want to represent this as a stream of ByteString
s that complete when we reach the end, to achieve this we use the offset as the state passed between unfoldAsync
invocations:
- Scala
-
source
// actor we can query for data with an offset val dataActor: ActorRef[DataActor.Command] = ??? import system.executionContext implicit val askTimeout: Timeout = 3.seconds val startOffset = 0L val byteSource: Source[ByteString, NotUsed] = Source.unfoldAsync(startOffset) { currentOffset => // ask for next chunk val nextChunkFuture: Future[DataActor.Chunk] = dataActor.ask(DataActor.FetchChunk(currentOffset, _)) nextChunkFuture.map { chunk => val bytes = chunk.bytes if (bytes.isEmpty) None // end of data else Some((currentOffset + bytes.length, bytes)) } }
- Java
-
source
ActorRef<DataActor.Command> dataActor = null; // let's say we got it from somewhere Duration askTimeout = Duration.ofSeconds(3); long startOffset = 0L; Source<ByteString, NotUsed> byteSource = Source.unfoldAsync( startOffset, currentOffset -> { // ask for next chunk CompletionStage<DataActor.Chunk> nextChunkCS = AskPattern.ask( dataActor, (ActorRef<DataActor.Chunk> ref) -> new DataActor.FetchChunk(currentOffset, ref), askTimeout, system.scheduler()); return nextChunkCS.thenApply( chunk -> { ByteString bytes = chunk.bytes; if (bytes.isEmpty()) return Optional.empty(); else return Optional.of(Pair.create(currentOffset + bytes.size(), bytes)); }); });
Reactive Streams semantics
emits when there is demand and unfold state returned future completes with some value
completes when the future CompletionStage returned by the unfold function completes with an empty value