Sink.lazySink

Defers creation and materialization of a Sink until there is a first element.

Sink operators

Signature

Sink.lazySinkSink.lazySink

Description

Defers Sink creation and materialization until when the first element arrives from upstream to the lazySink. After that the stream behaves as if the nested sink replaced the lazySink. The nested Sink will not be created if upstream completes or fails without any elements arriving at the sink.

The materialized value of the Sink is a FutureCompletionStage that is completed with the materialized value of the nested sink once that is constructed.

Can be combined with prefixAndTail to base the sink on the first element.

See also:

Examples

In this example we side effect from Flow.map, the sink factory and Sink.foreach so that the order becomes visible, the nested sink is only created once the element has passed map:

Scala
sourceval matVal =
  Source
    .maybe[String]
    .map { element =>
      println(s"mapped $element")
      element
    }
    .toMat(Sink.lazySink { () =>
      println("Sink created")
      Sink.foreach(elem => println(s"foreach $elem"))
    })(Keep.left)
    .run()

// some time passes
// nothing has been printed
matVal.success(Some("one"))
// now prints:
// mapped one
// Sink created
// foreach one
Java
sourceCompletionStage<Optional<String>> matVal =
    Source.<String>maybe()
        .map(
            element -> {
              System.out.println("mapped " + element);
              return element;
            })
        .toMat(
            Sink.lazySink(
                () -> {
                  System.out.println("Sink created");
                  return Sink.foreach(elem -> System.out.println("foreach " + elem));
                }),
            Keep.left())
        .run(system);

// some time passes
// nothing has been printed
matVal.toCompletableFuture().complete(Optional.of("one"));
// now prints:
// mapped one
// Sink created
// foreach one

Reactive Streams semantics

cancels if the future fails or if the created sink cancels

backpressures when initialized and when created sink backpressures

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.