Flow.futureFlow

Streams the elements through the given future flow once it successfully completes.

Simple operators

Signature

def futureFlow[I, O, M](flow: Future[Flow[I, O, M]]): Flow[I, O, Future[M]]

Description

Streams the elements through the given future flow once it successfully completes. If the future fails the stream is failed.

Examples

A deferred creation of the stream based on the initial element can be achieved by combining futureFlow with prefixAndTail like so:

Scala
def processingFlow(id: Int): Future[Flow[Int, String, NotUsed]] =
  Future {
    Flow[Int].map(n => s"id: $id, value: $n")
  }

val source: Source[String, NotUsed] =
  Source(1 to 10).prefixAndTail(1).flatMapConcat {
    case (List(id), tail) =>
      // base the Future flow creation on the first element
      tail.via(Flow.futureFlow(processingFlow(id)))
  }

Reactive Streams semantics

emits when the internal flow is successfully created and it emits

backpressures when the internal flow is successfully created and it backpressures

completes when upstream completes and all elements have been emitted from the internal flow

completes when upstream completes and all futures have been completed and all elements have been emitted

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.