Source.maybe
Create a source that emits once the materialized Promise
CompletableFuture
is completed with a value.
Signature
Description
Create a source with a materialized Promise[Option[T]]
CompletableFuture<Optional<T>>
which controls what element will be emitted by the Source. This makes it possible to inject a value into a stream after creation.
- If the materialized promise is completed with a
Some
non-emptyOptional
, that value will be produced downstream, followed by completion. - If the materialized promise is completed with a
None
emptyOptional
, no value will be produced downstream and completion will be signalled immediately. - If the materialized promise is completed with a failure, then the source will fail with that error.
- If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with
None
emptyOptional
.
Source.maybe
has some similarities with Source.fromFuture
Source.fromCompletionStage
. One difference is that a new Promise
CompletableFuture
is materialized from Source.maybe
each time the stream is run while the Future
CompletionStage
given to Source.fromFuture
Source.fromCompletionStage
can only be completed once.
Source.queue
is an alternative for emitting more than one element.
Example
- Scala
-
source
import akka.stream.scaladsl._ import scala.concurrent.Promise val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem))) val promise1: Promise[Option[Int]] = source.run() promise1.success(Some(1)) // prints 1 // a new Promise is returned when the stream is materialized val promise2 = source.run() promise2.success(Some(2)) // prints 2
- Java
-
source
import akka.stream.javadsl.RunnableGraph; import java.util.concurrent.CompletableFuture; Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe(); RunnableGraph<CompletableFuture<Optional<Integer>>> runnable = source.to(Sink.foreach(System.out::println)); CompletableFuture<Optional<Integer>> completable1 = runnable.run(system); completable1.complete(Optional.of(1)); // prints 1 CompletableFuture<Optional<Integer>> completable2 = runnable.run(system); completable2.complete(Optional.of(2)); // prints 2
The Source.maybe[Int]
will return a Promise[Option[Int]]
CompletableFuture<Optional<Integer>>
materialized value. That Promise
CompletableFuture
can be completed later. Each time the stream is run a new Promise
CompletableFuture
is returned.
Reactive Streams semantics
emits when the returned promise is completed with some value
completes after emitting some value, or directly if the promise is completed with no value