Source.lazySource
Defers creation and materialization of a Source
until there is demand.
Signature
Source.lazySource
Source.lazySource
Description
Defers creation and materialization of a Source
until there is demand, then emits the elements from the source downstream just like if it had been created up front. If the stream fails or cancels before there is demand the factory will not be invoked.
Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand earlier than you would expect.
The materialized value of the lazy
is a Future
CompletionStage
that is completed with the materialized value of the nested source once that is constructed.
See also:
Example
In this example you might expect this sample to not construct the expensive source until .pull
is called. However, since Sink.queue
has a buffer and will ask for that immediately on materialization the expensive source is in created quickly after the stream has been materialized:
- Scala
-
source
val source = Source.lazySource { () => println("Creating the actual source") createExpensiveSource() } val queue = source.runWith(Sink.queue()) // ... time passes ... // at some point in time we pull the first time // but the source creation may already have been triggered queue.pull()
- Java
-
source
Source<String, CompletionStage<NotUsed>> source = Source.lazySource( () -> { System.out.println("Creating the actual source"); return createExpensiveSource(); }); SinkQueueWithCancel<String> queue = source.runWith(Sink.queue(), system); // ... time passes ... // at some point in time we pull the first time // but the source creation may already have been triggered queue.pull();
Instead the most useful aspect of the operator is that the factory is called once per stream materialization which means that it can be used to safely construct a mutable object to use with the actual deferred source.
In this example we make use of that by unfolding a mutable object that works like an iterator with a method to say if there are more elements and one that produces the next and moves to the next element.
If the IteratorLikeThing
was used directly in a Source.unfold
the same instance would end up being unsafely shared across all three materializations of the stream, but wrapping it with Source.lazy
ensures we create a separate instance for each of the started streams:
- Scala
-
source
val stream = Source .lazySource { () => val iteratorLike = new IteratorLikeThing Source.unfold(iteratorLike) { iteratorLike => if (iteratorLike.thereAreMore) Some((iteratorLike, iteratorLike.extractNext)) else None } } .to(Sink.foreach(println)) // each of the three materializations will have their own instance of IteratorLikeThing stream.run() stream.run() stream.run()
- Java
-
source
RunnableGraph<CompletionStage<NotUsed>> stream = Source.lazySource( () -> { IteratorLikeThing instance = new IteratorLikeThing(); return Source.unfold( instance, sameInstance -> { if (sameInstance.thereAreMore()) return Optional.of(Pair.create(sameInstance, sameInstance.extractNext())); else return Optional.empty(); }); }) .to(Sink.foreach(System.out::println)); // each of the three materializations will have their own instance of IteratorLikeThing stream.run(system); stream.run(system); stream.run(system);
Note though that you can often also achieve the same using unfoldResource. If you have an actual Iterator
you should prefer fromIterator.
Reactive Streams semantics
emits depends on the wrapped Source
completes depends on the wrapped Source