akka.stream.scaladsl

Flow

trait Flow[+T] extends AnyRef

Scala API: The Flow DSL allows the formulation of stream transformations based on some input. The starting point can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.api.Producer.

See Reactive Streams for details.

Each DSL element produces a new Flow that can be further transformed, building up a description of the complete transformation pipeline. In order to execute this pipeline the Flow must be materialized by calling the #toFuture, #consume, #onComplete, or #toProducer methods on it.

It should be noted that the streams modeled by this library are “hot”, meaning that they asynchronously flow through a series of processors without detailed control by the user. In particular it is not predictable how many elements a given transformation step might buffer before handing elements downstream, which means that transformation functions may be invoked more often than for corresponding transformations on strict collections like List. *An important consequence* is that elements that were produced into a stream may be discarded by later processors, e.g. when using the #take combinator.

By default every operation is executed within its own akka.actor.Actor to enable full pipelining of the chained set of computations. This behavior is determined by the akka.stream.FlowMaterializer which is required by those methods that materialize the Flow into a series of org.reactivestreams.api.Processor instances. The returned reactive stream is fully started and active.

Source
Flow.scala
Linear Supertypes
Type Hierarchy Learn more about scaladoc diagrams
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. Flow
  2. AnyRef
  3. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Abstract Value Members

  1. abstract def append[U](duct: Duct[_ >: T, U]): Flow[U]

    Append the operations of a Duct to this flow.

  2. abstract def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T]

    Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.

    Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. Depending on the defined OverflowStrategy it might drop elements or backpressure the upstream if there is no space available

    size

    The size of the buffer in element count

    overflowStrategy

    Strategy that is used when incoming elements cannot fit inside the buffer

  3. abstract def collect[U](pf: PartialFunction[T, U]): Flow[U]

    Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step.

    Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step. Non-matching elements are filtered out.

  4. abstract def concat[U >: T](next: Producer[U]): Flow[U]

    Concatenate the given other stream to this stream so that the first element emitted by the given producer is emitted after the last element of this stream.

  5. abstract def conflate[S](seed: (T) ⇒ S, aggregate: (S, T) ⇒ S): Flow[S]

    Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary until the consumer is ready to accept them.

    Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the upstream producer is faster.

    This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.

    seed

    Provides the first state for a conflated value using the first unconsumed element as a start

    aggregate

    Takes the currently aggregated value and the current pending element to produce a new aggregate

  6. abstract def consume(materializer: FlowMaterializer): Unit

    Attaches a consumer to this stream which will just discard all received elements.

    Attaches a consumer to this stream which will just discard all received elements. *This will materialize the flow and initiate its execution.*

    The given FlowMaterializer decides how the flow’s logical structure is broken down into individual processing steps.

  7. abstract def drop(n: Int): Flow[T]

    Discard the given number of elements at the beginning of the stream.

  8. abstract def dropWithin(d: FiniteDuration): Flow[T]

    Discard the elements received within the given duration at beginning of the stream.

  9. abstract def expand[S, U](seed: (T) ⇒ S, extrapolate: (S) ⇒ (U, S)): Flow[U]

    Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older element until new element comes from the upstream.

    Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older element until new element comes from the upstream. For example an expand step might repeat the last element for the consumer until it receives an update from upstream.

    This element will never "drop" upstream elements as all elements go through at least one extrapolation step. This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream consumer.

    seed

    Provides the first state for extrapolation using the first unconsumed element

    extrapolate

    Takes the current extrapolation state to produce an output element and the next extrapolation state.

  10. abstract def filter(p: (T) ⇒ Boolean): Flow[T]

    Only pass on those elements that satisfy the given predicate.

  11. abstract def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U]

    Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.

    Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. This operation can be used on a stream of element type Producer.

  12. abstract def fold[U](zero: U)(f: (U, T) ⇒ U): Flow[U]

    Invoke the given function for every received element, giving it its previous output (or the given “zero” value) and the element as input.

    Invoke the given function for every received element, giving it its previous output (or the given “zero” value) and the element as input. The returned stream will receive the return value of the final function evaluation when the input stream ends.

  13. abstract def foreach(c: (T) ⇒ Unit): Flow[Unit]

    Invoke the given procedure for each received element and produce a Unit value upon reaching the normal end of the stream.

    Invoke the given procedure for each received element and produce a Unit value upon reaching the normal end of the stream. Please note that also in this case the Flow needs to be materialized (e.g. using #consume) to initiate its execution.

  14. abstract def groupBy[K](f: (T) ⇒ K): Flow[(K, Producer[T])]

    This operation demultiplexes the incoming stream into separate output streams, one for each element key.

    This operation demultiplexes the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time it is emitted to the downstream consumer together with a fresh producer that will eventually produce all the elements of the substream for that key. Not consuming the elements from the created streams will stop this processor from processing more elements, therefore you must take care to unblock (or cancel) all of the produced streams even if you want to consume only one of them.

  15. abstract def grouped(n: Int): Flow[Seq[T]]

    Chunk up this stream into groups of the given size, with the last group possibly smaller than requested due to end-of-stream.

  16. abstract def groupedWithin(n: Int, d: FiniteDuration): Flow[Seq[T]]

    Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first.

    Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.

  17. abstract def map[U](f: (T) ⇒ U): Flow[U]

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

  18. abstract def mapConcat[U](f: (T) ⇒ Seq[U]): Flow[U]

    Transform each input element into a sequence of output elements that is then flattened into the output stream.

  19. abstract def mapFuture[U](f: (T) ⇒ Future[U]): Flow[U]

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

    Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a Future of the element that will be emitted downstream. As many futures as requested elements by downstream may run in parallel and may complete in any order, but the elements that are emitted downstream are in the same order as from upstream.

  20. abstract def merge[U >: T](other: Producer[U]): Flow[U]

    Merge this stream with the one emitted by the given producer, taking elements as they arrive from either side (picking randomly when both have elements ready).

  21. abstract def onComplete(materializer: FlowMaterializer)(callback: (Try[Unit]) ⇒ Unit): Unit

    When this flow is completed, either through an error or normal completion, apply the provided function with scala.util.Success or scala.util.Failure.

    When this flow is completed, either through an error or normal completion, apply the provided function with scala.util.Success or scala.util.Failure.

    *This operation materializes the flow and initiates its execution.*

  22. abstract def prefixAndTail(n: Int): Flow[(Seq[T], Producer[T])]

    Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.

    Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. If n is zero or negative, then this will return a pair of an empty collection and a stream containing the whole upstream unchanged.

  23. abstract def produceTo(materializer: FlowMaterializer, consumer: Consumer[_ >: T]): Unit

    Attaches a consumer to this stream.

    Attaches a consumer to this stream.

    *This will materialize the flow and initiate its execution.*

    The given FlowMaterializer decides how the flow’s logical structure is broken down into individual processing steps.

  24. abstract def splitWhen(p: (T) ⇒ Boolean): Flow[Producer[T]]

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams, always beginning a new one with the current element if the given predicate returns true for it.

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams, always beginning a new one with the current element if the given predicate returns true for it. This means that for the following series of predicate values, three substreams will be produced with lengths 1, 2, and 3:

    false,             // element goes into first substream
    true, false,       // elements go into second substream
    true, false, false // elements go into third substream
  25. abstract def take(n: Int): Flow[T]

    Terminate processing (and cancel the upstream producer) after the given number of elements.

    Terminate processing (and cancel the upstream producer) after the given number of elements. Due to input buffering some elements may have been requested from upstream producers that will then not be processed downstream of this step.

  26. abstract def takeWithin(d: FiniteDuration): Flow[T]

    Terminate processing (and cancel the upstream producer) after the given duration.

    Terminate processing (and cancel the upstream producer) after the given duration. Due to input buffering some elements may have been requested from upstream producers that will then not be processed downstream of this step.

    Note that this can be combined with #take to limit the number of elements within the duration.

  27. abstract def tee(other: Consumer[_ >: T]): Flow[T]

    Fan-out the stream to another consumer.

    Fan-out the stream to another consumer. Each element is produced to the other consumer as well as to downstream consumers. It will not shutdown until the subscriptions for other and at least one downstream consumer have been established.

  28. abstract def toFuture(materializer: FlowMaterializer): Future[T]

    Returns a scala.concurrent.Future that will be fulfilled with the first thing that is signaled to this stream, which can be either an element (after which the upstream subscription is canceled), an error condition (putting the Future into the corresponding failed state) or the end-of-stream (failing the Future with a NoSuchElementException).

    Returns a scala.concurrent.Future that will be fulfilled with the first thing that is signaled to this stream, which can be either an element (after which the upstream subscription is canceled), an error condition (putting the Future into the corresponding failed state) or the end-of-stream (failing the Future with a NoSuchElementException). *This operation materializes the flow and initiates its execution.*

    The given FlowMaterializer decides how the flow’s logical structure is broken down into individual processing steps.

  29. abstract def toProducer(materializer: FlowMaterializer): Producer[T]

    Materialize this flow and return the downstream-most org.reactivestreams.api.Producer interface.

    Materialize this flow and return the downstream-most org.reactivestreams.api.Producer interface. The stream will not have any consumers attached at this point, which means that after prefetching elements to fill the internal buffers it will assert back-pressure until a consumer connects and creates demand for elements to be emitted.

    The given FlowMaterializer decides how the flow’s logical structure is broken down into individual processing steps.

  30. abstract def transform[U](transformer: Transformer[T, U]): Flow[U]

    Generic transformation of a stream: for each element the akka.stream.Transformer#onNext function is invoked, expecting a (possibly empty) sequence of output elements to be produced.

    Generic transformation of a stream: for each element the akka.stream.Transformer#onNext function is invoked, expecting a (possibly empty) sequence of output elements to be produced. After handing off the elements produced from one input element to the downstream consumers, the akka.stream.Transformer#isComplete predicate determines whether to end stream processing at this point; in that case the upstream subscription is canceled. Before signaling normal completion to the downstream consumers, the akka.stream.Transformer#onComplete function is invoked to produce a (possibly empty) sequence of elements in response to the end-of-stream event.

    akka.stream.Transformer#onError is called when failure is signaled from upstream.

    After normal completion or error the akka.stream.Transformer#cleanup function is called.

    It is possible to keep state in the concrete akka.stream.Transformer instance with ordinary instance variables. The akka.stream.Transformer is executed by an actor and therefore you do not have to add any additional thread safety or memory visibility constructs to access the state from the callback methods.

    Note that you can use akka.stream.TimerTransformer if you need support for scheduled events in the transformer.

  31. abstract def zip[U](other: Producer[U]): Flow[(T, U)]

    Zip this stream together with the one emitted by the given producer.

    Zip this stream together with the one emitted by the given producer. This transformation finishes when either input stream reaches its end, cancelling the subscription to the other one.

Concrete Value Members

  1. final def !=(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Definition Classes
    AnyRef → Any
  3. def +(other: String): String

    Implicit information
    This member is added by an implicit conversion from Flow[T] to any2stringadd[Flow[T]] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Flow[T], B)

    Implicit information
    This member is added by an implicit conversion from Flow[T] to ArrowAssoc[Flow[T]] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  6. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  7. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  8. def ensuring(cond: (Flow[T]) ⇒ Boolean, msg: ⇒ Any): Flow[T]

    Implicit information
    This member is added by an implicit conversion from Flow[T] to Ensuring[Flow[T]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  9. def ensuring(cond: (Flow[T]) ⇒ Boolean): Flow[T]

    Implicit information
    This member is added by an implicit conversion from Flow[T] to Ensuring[Flow[T]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  10. def ensuring(cond: Boolean, msg: ⇒ Any): Flow[T]

    Implicit information
    This member is added by an implicit conversion from Flow[T] to Ensuring[Flow[T]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  11. def ensuring(cond: Boolean): Flow[T]

    Implicit information
    This member is added by an implicit conversion from Flow[T] to Ensuring[Flow[T]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  12. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  13. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  14. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  15. def formatted(fmtstr: String): String

    Implicit information
    This member is added by an implicit conversion from Flow[T] to StringFormat[Flow[T]] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  16. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  17. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  18. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  19. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  20. final def notify(): Unit

    Definition Classes
    AnyRef
  21. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  22. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  23. def toString(): String

    Definition Classes
    AnyRef → Any
  24. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  25. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  26. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  27. def [B](y: B): (Flow[T], B)

    Implicit information
    This member is added by an implicit conversion from Flow[T] to ArrowAssoc[Flow[T]] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from Flow[T] to any2stringadd[Flow[T]]

Inherited by implicit conversion StringFormat from Flow[T] to StringFormat[Flow[T]]

Inherited by implicit conversion Ensuring from Flow[T] to Ensuring[Flow[T]]

Inherited by implicit conversion ArrowAssoc from Flow[T] to ArrowAssoc[Flow[T]]

Ungrouped