final class Source[+Out, +Mat] extends Graph[SourceShape[Out], Mat]

Java API

A Source is a set of stream processing steps that has one open output and an attached input. Can be used as a Publisher

Source
Source.scala
Linear Supertypes
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Source
  2. Graph
  3. AnyRef
  4. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new Source(delegate: scaladsl.Source[Out, Mat])

Type Members

  1. type Shape = SourceShape[Out]

    Type-level accessor for the shape parameter of this graph.

    Type-level accessor for the shape parameter of this graph.

    Definition Classes
    Graph

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to any2stringadd[Source[Out, Mat]] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Source[Out, Mat], B)
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to ArrowAssoc[Source[Out, Mat]] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. def addAttributes(attr: Attributes): Source[Out, Mat]

    Add the given attributes to this Source.

    Add the given attributes to this Source. Further calls to withAttributes will not remove these attributes. Note that this operation has no effect on an empty Flow (because the attributes apply only to the contained processing stages).

    Definition Classes
    SourceGraph
  7. def alsoTo(that: Graph[SinkShape[Out], _]): Source[Out, Mat]

    Attaches the given Sink to this Flow, meaning that elements that passes through will also be sent to the Sink.

    Attaches the given Sink to this Flow, meaning that elements that passes through will also be sent to the Sink.

    Emits when element is available and demand exists both from the Sink and the downstream.

    Backpressures when downstream or Sink backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  8. def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], matF: Function2[Mat, M2, M3]): Source[Out, M3]

    Attaches the given Sink to this Flow, meaning that elements that passes through will also be sent to the Sink.

    Attaches the given Sink to this Flow, meaning that elements that passes through will also be sent to the Sink.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #alsoTo

  9. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  10. def asScala: scaladsl.Source[Out, Mat]

    Converts this Java DSL element to its Scala DSL counterpart.

  11. def async: Source[Out, Mat]

    Put an asynchronous boundary around this Source

    Put an asynchronous boundary around this Source

    Definition Classes
    SourceGraph
  12. def backpressureTimeout(timeout: FiniteDuration): Source[Out, Mat]

    If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    If the time between the emission of an element and the following downstream demand exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes or fails if timeout elapses between element emission and downstream demand.

    Cancels when downstream cancels

  13. def batch[S](max: Long, seed: Function[Out, S], aggregate: Function2[S, Out, S]): Source[S, Mat]

    Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them.

    Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them. For example a batch step might store received elements in an array up to the allowed max limit if the upstream publisher is faster.

    This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.

    Emits when downstream stops backpressuring and there is an aggregated element available

    Backpressures when there are max batched elements and 1 pending element and downstream backpressures

    Completes when upstream completes and there is no batched/pending element waiting

    Cancels when downstream cancels

    See also Source.conflate, Source.batchWeighted

    max

    maximum number of elements to batch before backpressuring upstream (must be positive non-zero)

    seed

    Provides the first state for a batched value using the first unconsumed element as a start

    aggregate

    Takes the currently batched value and the current pending element to produce a new aggregate

  14. def batchWeighted[S](max: Long, costFn: Function[Out, Long], seed: Function[Out, S], aggregate: Function2[S, Out, S]): Source[S, Mat]

    Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them.

    Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches until the subscriber is ready to accept them. For example a batch step might concatenate ByteString elements up to the allowed max limit if the upstream publisher is faster.

    This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.

    Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after being applied with the seed function) without batching further elements with it, and then the rest of the incoming elements are batched.

    Emits when downstream stops backpressuring and there is a batched element available

    Backpressures when there are max weighted batched elements + 1 pending element and downstream backpressures

    Completes when upstream completes and there is no batched/pending element waiting

    Cancels when downstream cancels

    See also Source.conflate, Source.batch

    max

    maximum weight of elements to batch before backpressuring upstream (must be positive non-zero)

    costFn

    a function to compute a single element weight

    seed

    Provides the first state for a batched value using the first unconsumed element as a start

    aggregate

    Takes the currently batched value and the current pending element to produce a new batch

  15. def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]

    Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.

    Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. Depending on the defined akka.stream.OverflowStrategy it might drop elements or backpressure the upstream if there is no space available

    Emits when downstream stops backpressuring and there is a pending element in the buffer

    Backpressures when depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full

    Completes when upstream completes and buffered elements has been drained

    Cancels when downstream cancels

    size

    The size of the buffer in element count

    overflowStrategy

    Strategy that is used when incoming elements cannot fit inside the buffer

  16. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  17. def collect[T](pf: PartialFunction[Out, T]): Source[T, Mat]

    Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step.

    Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step. Non-matching elements are filtered out.

    Emits when the provided partial function is defined for the element

    Backpressures when the partial function is defined for the element and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  18. def completionTimeout(timeout: FiniteDuration): Source[Out, Mat]

    If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    If the completion of the stream does not happen until the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes or fails if timeout elapses before upstream completes

    Cancels when downstream cancels

  19. def concat[T >: Out, M](that: Graph[SourceShape[T], M]): Source[T, Mat]

    Concatenate this Source with the given one, meaning that once current is exhausted and all result elements have been generated, the given source elements will be produced.

    Concatenate this Source with the given one, meaning that once current is exhausted and all result elements have been generated, the given source elements will be produced.

    Note that given Source is materialized together with this Flow and just kept from producing elements by asserting back-pressure until its time comes.

    If this Source gets upstream error - no elements from the given Source will be pulled.

    Emits when element is available from current source or from the given Source when current is completed

    Backpressures when downstream backpressures

    Completes when given Source completes

    Cancels when downstream cancels

  20. def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: Function2[Mat, M, M2]): Source[T, M2]

    Concatenate this Source with the given one, meaning that once current is exhausted and all result elements have been generated, the given source elements will be produced.

    Concatenate this Source with the given one, meaning that once current is exhausted and all result elements have been generated, the given source elements will be produced.

    Note that given Source is materialized together with this Flow and just kept from producing elements by asserting back-pressure until its time comes.

    If this Source gets upstream error - no elements from the given Source will be pulled.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #concat.

  21. def conflate[O2 >: Out](aggregate: Function2[O2, O2, O2]): Source[O2, Mat]

    Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them.

    Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the upstream publisher is faster. This version of conflate does not change the output type of the stream. See Source.conflateWithSeed for a more flexible version that can take a seed function and transform elements while rolling up.

    This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.

    Emits when downstream stops backpressuring and there is a conflated element available

    Backpressures when never

    Completes when upstream completes

    Cancels when downstream cancels

    see also Source.conflateWithSeed Source.batch Source.batchWeighted

    aggregate

    Takes the currently aggregated value and the current pending element to produce a new aggregate

  22. def conflateWithSeed[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): Source[S, Mat]

    Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them.

    Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the upstream publisher is faster.

    This version of conflate allows to derive a seed from the first element and change the aggregated type to be different than the input type. See Flow.conflate for a simpler version that does not change types.

    This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.

    Emits when downstream stops backpressuring and there is a conflated element available

    Backpressures when never

    Completes when upstream completes

    Cancels when downstream cancels

    see also Source.conflate Source.batch Source.batchWeighted

    seed

    Provides the first state for a conflated value using the first unconsumed element as a start

    aggregate

    Takes the currently aggregated value and the current pending element to produce a new aggregate

  23. def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): Source[Out, Mat]

    Shifts elements emission in time by a specified amount.

    Shifts elements emission in time by a specified amount. It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined akka.stream.DelayOverflowStrategy it might drop elements or backpressure the upstream if there is no space available in the buffer.

    Delay precision is 10ms to avoid unnecessary timer scheduling cycles

    Internal buffer has default capacity 16. You can set buffer size by calling withAttributes(inputBuffer)

    Emits when there is a pending element in the buffer and configured time for this element elapsed * EmitEarly - strategy do not wait to emit element if buffer is full

    Backpressures when depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full

    Completes when upstream completes and buffered elements has been drained

    Cancels when downstream cancels

    of

    time to shift all messages

    strategy

    Strategy that is used when incoming elements cannot fit inside the buffer

  24. def detach: Source[Out, Mat]

    Detaches upstream demand from downstream demand without detaching the stream rates; in other words acts like a buffer of size 1.

    Detaches upstream demand from downstream demand without detaching the stream rates; in other words acts like a buffer of size 1.

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  25. def drop(n: Long): Source[Out, Mat]

    Discard the given number of elements at the beginning of the stream.

    Discard the given number of elements at the beginning of the stream. No elements will be dropped if n is zero or negative.

    Emits when the specified number of elements has been dropped already

    Backpressures when the specified number of elements has been dropped and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  26. def dropWhile(p: Predicate[Out]): Source[Out, Mat]

    Discard elements at the beginning of the stream while predicate is true.

    Discard elements at the beginning of the stream while predicate is true. No elements will be dropped after predicate first time returned false.

    Emits when predicate returned false and for all following stream elements

    Backpressures when predicate returned false and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

    p

    predicate is evaluated for each new element until first time returns false

  27. def dropWithin(d: FiniteDuration): Source[Out, Mat]

    Discard the elements received within the given duration at beginning of the stream.

    Discard the elements received within the given duration at beginning of the stream.

    Emits when the specified time elapsed and a new upstream element arrives

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  28. def ensuring(cond: (Source[Out, Mat]) ⇒ Boolean, msg: ⇒ Any): Source[Out, Mat]
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to Ensuring[Source[Out, Mat]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  29. def ensuring(cond: (Source[Out, Mat]) ⇒ Boolean): Source[Out, Mat]
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to Ensuring[Source[Out, Mat]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  30. def ensuring(cond: Boolean, msg: ⇒ Any): Source[Out, Mat]
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to Ensuring[Source[Out, Mat]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  31. def ensuring(cond: Boolean): Source[Out, Mat]
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to Ensuring[Source[Out, Mat]] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  32. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  33. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  34. def expand[U](extrapolate: Function[Out, Iterator[U]]): Source[U, Mat]

    Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older element until new element comes from the upstream.

    Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older element until new element comes from the upstream. For example an expand step might repeat the last element for the subscriber until it receives an update from upstream.

    This element will never "drop" upstream elements as all elements go through at least one extrapolation step. This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber.

    Expand does not support akka.stream.Supervision#restart and akka.stream.Supervision#resume. Exceptions from the seed or extrapolate functions will complete the stream with failure.

    Emits when downstream stops backpressuring

    Backpressures when downstream backpressures or iterator runs empty

    Completes when upstream completes

    Cancels when downstream cancels

    extrapolate

    Takes the current extrapolation state to produce an output element and the next extrapolation state.

  35. def filter(p: Predicate[Out]): Source[Out, Mat]

    Only pass on those elements that satisfy the given predicate.

    Only pass on those elements that satisfy the given predicate.

    Emits when the given predicate returns true for the element

    Backpressures when the given predicate returns true for the element and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  36. def filterNot(p: Predicate[Out]): Source[Out, Mat]

    Only pass on those elements that NOT satisfy the given predicate.

    Only pass on those elements that NOT satisfy the given predicate.

    Emits when the given predicate returns false for the element

    Backpressures when the given predicate returns false for the element and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  37. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  38. def flatMapConcat[T, M](f: Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat]

    Transform each input element into a Source of output elements that is then flattened into the output stream by concatenation, fully consuming one Source after the other.

    Transform each input element into a Source of output elements that is then flattened into the output stream by concatenation, fully consuming one Source after the other.

    Emits when a currently consumed substream has an element available

    Backpressures when downstream backpressures

    Completes when upstream completes and all consumed substreams complete

    Cancels when downstream cancels

  39. def flatMapMerge[T, M](breadth: Int, f: Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat]

    Transform each input element into a Source of output elements that is then flattened into the output stream by merging, where at most breadth substreams are being consumed at any given time.

    Transform each input element into a Source of output elements that is then flattened into the output stream by merging, where at most breadth substreams are being consumed at any given time.

    Emits when a currently consumed substream has an element available

    Backpressures when downstream backpressures

    Completes when upstream completes and all consumed substreams complete

    Cancels when downstream cancels

  40. def fold[T](zero: T)(f: Function2[T, Out, T]): Source[T, Mat]

    Similar to scan but only emits its result when the upstream completes, after which it also completes.

    Similar to scan but only emits its result when the upstream completes, after which it also completes. Applies the given function f towards its current and next value, yielding the next current value.

    If the function f throws an exception and the supervision decision is akka.stream.Supervision#restart current value starts at zero again the stream will continue.

    Emits when upstream completes

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  41. def foldAsync[T](zero: T)(f: Function2[T, Out, CompletionStage[T]]): Source[T, Mat]

    Similar to fold but with an asynchronous function.

    Similar to fold but with an asynchronous function. Applies the given function towards its current and next value, yielding the next current value.

    If the function f returns a failure and the supervision decision is akka.stream.Supervision.Restart current value starts at zero again the stream will continue.

    Emits when upstream completes

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  42. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to StringFormat[Source[Out, Mat]] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  43. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
  44. def groupBy[K](maxSubstreams: Int, f: Function[Out, K]): SubSource[Out, Mat]

    This operation demultiplexes the incoming stream into separate output streams, one for each element key.

    This operation demultiplexes the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time a new substream is opened and subsequently fed with all elements belonging to that key.

    The object returned from this method is not a normal Flow, it is a SubSource. This means that after this combinator all transformations are applied to all encountered substreams in the same fashion. Substream mode is exited either by closing the substream (i.e. connecting it to a Sink) or by merging the substreams back together; see the to and mergeBack methods on SubSource for more information.

    It is important to note that the substreams also propagate back-pressure as any other stream, which means that blocking one substream will block the groupBy operator itself—and thereby all substreams—once all internal or explicit buffers are filled.

    If the group by function f throws an exception and the supervision decision is akka.stream.Supervision#stop the stream and substreams will be completed with failure.

    If the group by function f throws an exception and the supervision decision is akka.stream.Supervision#resume or akka.stream.Supervision#restart the element is dropped and the stream and substreams continue.

    Emits when an element for which the grouping function returns a group that has not yet been created. Emits the new group

    Backpressures when there is an element pending for a group whose substream backpressures

    Completes when upstream completes

    Cancels when downstream cancels and all substreams cancel

    maxSubstreams

    configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails

  45. def grouped(n: Int): Source[List[Out], Mat]

    Chunk up this stream into groups of the given size, with the last group possibly smaller than requested due to end-of-stream.

    Chunk up this stream into groups of the given size, with the last group possibly smaller than requested due to end-of-stream.

    n must be positive, otherwise IllegalArgumentException is thrown.

    Emits when the specified number of elements has been accumulated or upstream completed

    Backpressures when a group has been assembled and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  46. def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat]

    Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first.

    Chunk up this stream into groups of elements received within a time window, or limited by the given number of elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.

    Emits when the configured time elapses since the last group has been emitted

    Backpressures when the configured time elapses since the last group has been emitted

    Completes when upstream completes (emits last group)

    Cancels when downstream completes

    n must be positive, and d must be greater than 0 seconds, otherwise IllegalArgumentException is thrown.

  47. def hashCode(): Int
    Definition Classes
    AnyRef → Any
  48. def idleTimeout(timeout: FiniteDuration): Source[Out, Mat]

    If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    If the time between two processed elements exceeds the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException. The timeout is checked periodically, so the resolution of the check is one period (equals to timeout value).

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes or fails if timeout elapses between two emitted elements

    Cancels when downstream cancels

  49. def initialDelay(delay: FiniteDuration): Source[Out, Mat]

    Delays the initial element by the specified duration.

    Delays the initial element by the specified duration.

    Emits when upstream emits an element if the initial delay is already elapsed

    Backpressures when downstream backpressures or initial delay is not yet elapsed

    Completes when upstream completes

    Cancels when downstream cancels

  50. def initialTimeout(timeout: FiniteDuration): Source[Out, Mat]

    If the first element has not passed through this stage before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    If the first element has not passed through this stage before the provided timeout, the stream is failed with a java.util.concurrent.TimeoutException.

    Emits when upstream emits an element

    Backpressures when downstream backpressures

    Completes when upstream completes or fails if timeout elapses before first element arrives

    Cancels when downstream cancels

  51. def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): Source[T, Mat]

    Interleave is a deterministic merge of the given Source with elements of this Source.

    Interleave is a deterministic merge of the given Source with elements of this Source. It first emits segmentSize number of elements from this flow to downstream, then - same amount for that source, then repeat process.

    Example:

    Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
    // 1, 2, 4, 5, 3, 6, 7

    After one of sources is complete than all the rest elements will be emitted from the second one

    If one of sources gets upstream error - stream completes with failure.

    Emits when element is available from the currently consumed upstream

    Backpressures when downstream backpressures. Signal to current upstream, switch to next upstream when received segmentSize elements

    Completes when this Source and given one completes

    Cancels when downstream cancels

  52. def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, matF: Function2[Mat, M, M2]): Source[T, M2]

    Interleave is a deterministic merge of the given Source with elements of this Source.

    Interleave is a deterministic merge of the given Source with elements of this Source. It first emits segmentSize number of elements from this flow to downstream, then - same amount for that source, then repeat process.

    After one of sources is complete than all the rest elements will be emitted from the second one

    If one of sources gets upstream error - stream completes with failure.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #interleave.

  53. def intersperse[T >: Out](inject: T): Source[T, Mat]

    Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.

    Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.

    Additionally can inject start and end marker elements to stream.

    Examples:

    Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
    nums.intersperse(",");            //   1 , 2 , 3
    nums.intersperse("[", ",", "]");  // [ 1 , 2 , 3 ]

    Emits when upstream emits (or before with the start element if provided)

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  54. def intersperse[T >: Out](start: T, inject: T, end: T): Source[T, Mat]

    Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.

    Intersperses stream with provided element, similar to how scala.collection.immutable.List.mkString injects a separator between a List's elements.

    Additionally can inject start and end marker elements to stream.

    Examples:

    Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
    nums.intersperse(",");            //   1 , 2 , 3
    nums.intersperse("[", ",", "]");  // [ 1 , 2 , 3 ]

    In case you want to only prepend or only append an element (yet still use the intercept feature to inject a separator between elements, you may want to use the following pattern instead of the 3-argument version of intersperse (See Source.concat for semantics details):

    Source.single(">> ").concat(list.intersperse(","))
    list.intersperse(",").concat(Source.single("END"))

    Emits when upstream emits (or before with the start element if provided)

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  55. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  56. def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: Creator[U]): Source[U, Mat]

    Injects additional elements if upstream does not emit for a configured amount of time.

    Injects additional elements if upstream does not emit for a configured amount of time. In other words, this stage attempts to maintains a base rate of emitted elements towards the downstream.

    If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements do not accumulate during this period.

    Upstream elements are always preferred over injected elements.

    Emits when upstream emits an element or if the upstream was idle for the configured period

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  57. def limit(n: Int): Source[Out, Mat]

    Ensure stream boundedness by limiting the number of elements from upstream.

    Ensure stream boundedness by limiting the number of elements from upstream. If the number of incoming elements exceeds max, it will signal upstream failure StreamLimitException downstream.

    Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.

    The stream will be completed without producing any elements if n is zero or negative.

    Emits when the specified number of elements to take has not yet been reached

    Backpressures when downstream backpressures

    Completes when the defined number of elements has been taken or upstream completes

    Cancels when the defined number of elements has been taken or downstream cancels

    See also Flow.take, Flow.takeWithin, Flow.takeWhile

  58. def limitWeighted(n: Long)(costFn: Function[Out, Long]): Source[Out, Mat]

    Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.

    Ensure stream boundedness by evaluating the cost of incoming elements using a cost function. Exactly how many elements will be allowed to travel downstream depends on the evaluated cost of each element. If the accumulated cost exceeds max, it will signal upstream failure StreamLimitException downstream.

    Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.

    The stream will be completed without producing any elements if n is zero or negative.

    Emits when the specified number of elements to take has not yet been reached

    Backpressures when downstream backpressures

    Completes when the defined number of elements has been taken or upstream completes

    Cancels when the defined number of elements has been taken or downstream cancels

    See also Flow.take, Flow.takeWithin, Flow.takeWhile

  59. def log(name: String): Source[Out, Mat]

    Logs elements flowing through the stream as well as completion and erroring.

    Logs elements flowing through the stream as well as completion and erroring.

    By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:

    Uses an internally created LoggingAdapter which uses akka.stream.Log as it's source (use this class to configure slf4j loggers).

    Emits when the mapping function returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  60. def log(name: String, log: LoggingAdapter): Source[Out, Mat]

    Logs elements flowing through the stream as well as completion and erroring.

    Logs elements flowing through the stream as well as completion and erroring.

    By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:

    Uses the given LoggingAdapter for logging.

    Emits when the mapping function returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  61. def log(name: String, extract: Function[Out, Any]): Source[Out, Mat]

    Logs elements flowing through the stream as well as completion and erroring.

    Logs elements flowing through the stream as well as completion and erroring.

    By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:

    The extract function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.

    Uses an internally created LoggingAdapter which uses akka.stream.Log as it's source (use this class to configure slf4j loggers).

    Emits when the mapping function returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  62. def log(name: String, extract: Function[Out, Any], log: LoggingAdapter): Source[Out, Mat]

    Logs elements flowing through the stream as well as completion and erroring.

    Logs elements flowing through the stream as well as completion and erroring.

    By default element and completion signals are logged on debug level, and errors are logged on Error level. This can be adjusted according to your needs by providing a custom Attributes.LogLevels attribute on the given Flow:

    The extract function will be applied to each element before logging, so it is possible to log only those fields of a complex object flowing through this element.

    Uses the given LoggingAdapter for logging.

    Emits when the mapping function returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  63. def map[T](f: Function[Out, T]): Source[T, Mat]

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

    Emits when the mapping function returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  64. def mapAsync[T](parallelism: Int, f: Function[Out, CompletionStage[T]]): Source[T, Mat]

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

    Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a CompletionStage and the value of that future will be emitted downstream. The number of CompletionStages that shall run in parallel is given as the first argument to mapAsync. These CompletionStages may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.

    If the function f throws an exception or if the CompletionStage is completed with failure and the supervision decision is akka.stream.Supervision#stop the stream will be completed with failure.

    If the function f throws an exception or if the CompletionStage is completed with failure and the supervision decision is akka.stream.Supervision#resume or akka.stream.Supervision#restart the element is dropped and the stream continues.

    The function f is always invoked on the elements in the order they arrive.

    Emits when the CompletionStage returned by the provided function finishes for the next element in sequence

    Backpressures when the number of CompletionStages reaches the configured parallelism and the downstream backpressures or the first CompletionStage is not completed

    Completes when upstream completes and all CompletionStages has been completed and all elements has been emitted

    Cancels when downstream cancels

    See also

    #mapAsyncUnordered

  65. def mapAsyncUnordered[T](parallelism: Int, f: Function[Out, CompletionStage[T]]): Source[T, Mat]

    Transform this stream by applying the given function to each of the elements as they pass through this processing step.

    Transform this stream by applying the given function to each of the elements as they pass through this processing step. The function returns a CompletionStage and the value of that future will be emitted downstream. The number of CompletionStages that shall run in parallel is given as the first argument to mapAsyncUnordered. Each processed element will be emitted downstream as soon as it is ready, i.e. it is possible that the elements are not emitted downstream in the same order as received from upstream.

    If the function f throws an exception or if the CompletionStage is completed with failure and the supervision decision is akka.stream.Supervision#stop the stream will be completed with failure.

    If the function f throws an exception or if the CompletionStage is completed with failure and the supervision decision is akka.stream.Supervision#resume or akka.stream.Supervision#restart the element is dropped and the stream continues.

    The function f is always invoked on the elements in the order they arrive (even though the result of the CompletionStages returned by f might be emitted in a different order).

    Emits when any of the CompletionStages returned by the provided function complete

    Backpressures when the number of CompletionStages reaches the configured parallelism and the downstream backpressures

    Completes when upstream completes and all CompletionStages has been completed and all elements has been emitted

    Cancels when downstream cancels

    See also

    #mapAsync

  66. def mapConcat[T](f: Function[Out, _ <: Iterable[T]]): Source[T, Mat]

    Transform each input element into an Iterable of output elements that is then flattened into the output stream.

    Transform each input element into an Iterable of output elements that is then flattened into the output stream.

    Make sure that the Iterable is immutable or at least not modified after being used as an output sequence. Otherwise the stream may fail with ConcurrentModificationException or other more subtle errors may occur.

    The returned Iterable MUST NOT contain null values, as they are illegal as stream elements - according to the Reactive Streams specification.

    Emits when the mapping function returns an element or there are still remaining elements from the previously calculated collection

    Backpressures when downstream backpressures or there are still remaining elements from the previously calculated collection

    Completes when upstream completes and all remaining elements has been emitted

    Cancels when downstream cancels

  67. def mapError(pf: PartialFunction[Throwable, Throwable]): Source[Out, Mat]

    While similar to recover this stage can be used to transform an error signal to a different one *without* logging it as an error in the process.

    While similar to recover this stage can be used to transform an error signal to a different one *without* logging it as an error in the process. So in that sense it is NOT exactly equivalent to recover(t => throw t2) since recover would log the t2 error.

    Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.

    Similarily to recover throwing an exception inside mapError _will_ be logged.

    Emits when element is available from the upstream or upstream is failed and pf returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes or upstream failed with exception pf can handle

    Cancels when downstream cancels

  68. def mapMaterializedValue[Mat2](f: Function[Mat, Mat2]): Source[Out, Mat2]

    Transform only the materialized value of this Source, leaving all other properties as they were.

  69. def merge[T >: Out](that: Graph[SourceShape[T], _]): Source[T, Mat]

    Merge the given Source to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.

    Merge the given Source to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.

    Emits when one of the inputs has an element available

    Backpressures when downstream backpressures

    Completes when all upstreams complete

    Cancels when downstream cancels

  70. def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: Function2[Mat, M, M2]): Source[T, M2]

    Merge the given Source to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.

    Merge the given Source to the current one, taking elements as they arrive from input streams, picking randomly when several elements ready.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #merge.

  71. def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: Comparator[U]): Source[U, Mat]

    Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available).

    Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available). This means that possible contiguity of the input streams is not exploited to avoid waiting for elements, this merge will block when one of the inputs does not have more elements (and does not complete).

    Emits when all of the inputs have an element available

    Backpressures when downstream backpressures

    Completes when all upstreams complete

    Cancels when downstream cancels

  72. def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U], matF: Function2[Mat, Mat2, Mat3]): Source[U, Mat3]

    Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available).

    Merge the given Source to this Source, taking elements as they arrive from input streams, picking always the smallest of the available elements (waiting for one element from each side to be available). This means that possible contiguity of the input streams is not exploited to avoid waiting for elements, this merge will block when one of the inputs does not have more elements (and does not complete).

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #mergeSorted.

  73. def module: Module

    INTERNAL API.

    INTERNAL API.

    Every materializable element must be backed by a stream layout module

    Definition Classes
    SourceGraph
  74. def monitor[M]()(combine: Function2[Mat, FlowMonitor[Out], M]): Source[Out, M]

    Materializes to FlowMonitor[Out] that allows monitoring of the current flow.

    Materializes to FlowMonitor[Out] that allows monitoring of the current flow. All events are propagated by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an event, and may therefor affect performance. The combine function is used to combine the FlowMonitor with this flow's materialized value.

  75. def named(name: String): Source[Out, Mat]

    Add a name attribute to this Source.

    Add a name attribute to this Source.

    Definition Classes
    SourceGraph
  76. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  77. final def notify(): Unit
    Definition Classes
    AnyRef
  78. final def notifyAll(): Unit
    Definition Classes
    AnyRef
  79. def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): Source[T, Mat]

    Provides a secondary source that will be consumed if this source completes without any elements passing by.

    Provides a secondary source that will be consumed if this source completes without any elements passing by. As soon as the first element comes through this stream, the alternative will be cancelled.

    Note that this Flow will be materialized together with the Source and just kept from producing elements by asserting back-pressure until its time comes or it gets cancelled.

    On errors the stage is failed regardless of source of the error.

    Emits when element is available from first stream or first stream closed without emitting any elements and an element is available from the second stream

    Backpressures when downstream backpressures

    Completes when the primary stream completes after emitting at least one element, when the primary stream completes without emitting and the secondary stream already has completed or when the secondary stream completes

    Cancels when downstream cancels and additionally the alternative is cancelled as soon as an element passes by from this stream.

  80. def orElseMat[T >: Out, M, M2](secondary: Graph[SourceShape[T], M], matF: Function2[Mat, M, M2]): Source[T, M2]

    Provides a secondary source that will be consumed if this source completes without any elements passing by.

    Provides a secondary source that will be consumed if this source completes without any elements passing by. As soon as the first element comes through this stream, the alternative will be cancelled.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #orElse

  81. def prefixAndTail(n: Int): Source[Pair[List[Out], Source[Out, NotUsed]], Mat]

    Takes up to n elements from the stream (less than n if the upstream completes before emitting n elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements.

    Takes up to n elements from the stream (less than n if the upstream completes before emitting n elements) and returns a pair containing a strict sequence of the taken element and a stream representing the remaining elements. If n is zero or negative, then this will return a pair of an empty collection and a stream containing the whole upstream unchanged.

    In case of an upstream error, depending on the current state

    • the master stream signals the error if less than n elements has been seen, and therefore the substream has not yet been emitted
    • the tail substream signals the error after the prefix and tail has been emitted by the main stream (at that point the main stream has already completed)

    Emits when the configured number of prefix elements are available. Emits this prefix, and the rest as a substream

    Backpressures when downstream backpressures or substream backpressures

    Completes when prefix elements has been consumed and substream has been consumed

    Cancels when downstream cancels or substream cancels

  82. def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): Source[T, Mat]

    Prepend the given Source to this one, meaning that once the given source is exhausted and all result elements have been generated, the current source's elements will be produced.

    Prepend the given Source to this one, meaning that once the given source is exhausted and all result elements have been generated, the current source's elements will be produced.

    Note that the current Source is materialized together with this Flow and just kept from producing elements by asserting back-pressure until its time comes.

    If the given Source gets upstream error - no elements from this Source will be pulled.

    Emits when element is available from current source or from the given Source when current is completed

    Backpressures when downstream backpressures

    Completes when given Source completes

    Cancels when downstream cancels

  83. def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: Function2[Mat, M, M2]): Source[T, M2]

    Prepend the given Source to this one, meaning that once the given source is exhausted and all result elements have been generated, the current source's elements will be produced.

    Prepend the given Source to this one, meaning that once the given source is exhausted and all result elements have been generated, the current source's elements will be produced.

    Note that the current Source is materialized together with this Flow and just kept from producing elements by asserting back-pressure until its time comes.

    If the given Source gets upstream error - no elements from this Source will be pulled.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #prepend.

  84. def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat]

    RecoverWith allows to switch to alternative Source on flow failure.

    RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after a failure has been recovered so that each time there is a failure it is fed into the pf and a new Source may be materialized.

    Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.

    Throwing an exception inside recoverWith _will_ be logged on ERROR level automatically.

    Emits when element is available from the upstream or upstream is failed and element is available from alternative Source

    Backpressures when downstream backpressures

    Completes when upstream completes or upstream failed with exception pf can handle

    Cancels when downstream cancels

  85. def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat]

    RecoverWithRetries allows to switch to alternative Source on flow failure.

    RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after a failure has been recovered up to attempts number of times so that each time there is a failure it is fed into the pf and a new Source may be materialized. Note that if you pass in 0, this won't attempt to recover at all. Passing in a negative number will behave exactly the same as recoverWith.

    Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.

    Throwing an exception inside recoverWithRetries _will_ be logged on ERROR level automatically.

    Emits when element is available from the upstream or upstream is failed and element is available from alternative Source

    Backpressures when downstream backpressures

    Completes when upstream completes or upstream failed with exception pf can handle

    Cancels when downstream cancels

  86. def reduce(f: Function2[Out, Out, Out]): Source[Out, Mat]

    Similar to fold but uses first element as zero element.

    Similar to fold but uses first element as zero element. Applies the given function towards its current and next value, yielding the next current value.

    Emits when upstream completes

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  87. def runFold[U](zero: U, f: Function2[U, Out, U], materializer: Materializer): CompletionStage[U]

    Shortcut for running this Source with a fold function.

    Shortcut for running this Source with a fold function. The given function is invoked for every received element, giving it its previous output (or the given zero value) and the element as input. The returned java.util.concurrent.CompletionStage will be completed with value of the final function evaluation when the input stream ends, or completed with Failure if there is a failure is signaled in the stream.

  88. def runFoldAsync[U](zero: U, f: Function2[U, Out, CompletionStage[U]], materializer: Materializer): CompletionStage[U]

    Shortcut for running this Source with an asynchronous fold function.

    Shortcut for running this Source with an asynchronous fold function. The given function is invoked for every received element, giving it its previous output (or the given zero value) and the element as input. The returned java.util.concurrent.CompletionStage will be completed with value of the final function evaluation when the input stream ends, or completed with Failure if there is a failure is signaled in the stream.

  89. def runForeach(f: Procedure[Out], materializer: Materializer): CompletionStage[Done]

    Shortcut for running this Source with a foreach procedure.

    Shortcut for running this Source with a foreach procedure. The given procedure is invoked for each received element. The returned java.util.concurrent.CompletionStage will be completed normally when reaching the normal end of the stream, or completed exceptionally if there is a failure is signaled in the stream.

  90. def runReduce[U >: Out](f: Function2[U, U, U], materializer: Materializer): CompletionStage[U]

    Shortcut for running this Source with a reduce function.

    Shortcut for running this Source with a reduce function. The given function is invoked for every received element, giving it its previous output (from the second ones) an the element as input. The returned java.util.concurrent.CompletionStage will be completed with value of the final function evaluation when the input stream ends, or completed with Failure if there is a failure is signaled in the stream.

    If the stream is empty (i.e. completes before signalling any elements), the reduce stage will fail its downstream with a NoSuchElementException, which is semantically in-line with that Scala's standard library collections do in such situations.

  91. def runWith[M](sink: Graph[SinkShape[Out], M], materializer: Materializer): M

    Connect this Source to a Sink and run it.

    Connect this Source to a Sink and run it. The returned value is the materialized value of the Sink, e.g. the Publisher of a Sink.asPublisher.

  92. def scan[T](zero: T)(f: Function2[T, Out, T]): Source[T, Mat]

    Similar to fold but is not a terminal operation, emits its current value which starts at zero and then applies the current and next value to the given function f, emitting the next current value.

    Similar to fold but is not a terminal operation, emits its current value which starts at zero and then applies the current and next value to the given function f, emitting the next current value.

    If the function f throws an exception and the supervision decision is akka.stream.Supervision#restart current value starts at zero again the stream will continue.

    Emits when the function scanning the element returns a new element

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  93. def scanAsync[T](zero: T)(f: Function2[T, Out, CompletionStage[T]]): Source[T, Mat]

    Similar to scan but with a asynchronous function, emits its current value which starts at zero and then applies the current and next value to the given function f, emitting a Future that resolves to the next current value.

    Similar to scan but with a asynchronous function, emits its current value which starts at zero and then applies the current and next value to the given function f, emitting a Future that resolves to the next current value.

    If the function f throws an exception and the supervision decision is akka.stream.Supervision.Restart current value starts at zero again the stream will continue.

    If the function f throws an exception and the supervision decision is akka.stream.Supervision.Resume current value starts at the previous current value, or zero when it doesn't have one, and the stream will continue.

    Emits when the future returned by f completes

    Backpressures when downstream backpressures

    Completes when upstream completes and the last future returned by f completes

    Cancels when downstream cancels

    See also FlowOps.scan

  94. def shape: SourceShape[Out]

    The shape of a graph is all that is externally visible: its inlets and outlets.

    The shape of a graph is all that is externally visible: its inlets and outlets.

    Definition Classes
    SourceGraph
  95. def sliding(n: Int, step: Int): Source[List[Out], Mat]

    Apply a sliding window over the stream and return the windows as groups of elements, with the last group possibly smaller than requested due to end-of-stream.

    Apply a sliding window over the stream and return the windows as groups of elements, with the last group possibly smaller than requested due to end-of-stream.

    n must be positive, otherwise IllegalArgumentException is thrown. step must be positive, otherwise IllegalArgumentException is thrown.

    Emits when enough elements have been collected within the window or upstream completed

    Backpressures when a window has been assembled and downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  96. def splitAfter[U >: Out](p: Predicate[Out]): SubSource[Out, Mat]

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams.

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams. It *ends* the current substream when the predicate is true. This means that for the following series of predicate values, three substreams will be produced with lengths 2, 2, and 3:

    false, true,        // elements go into first substream
    false, true,        // elements go into second substream
    false, false, true  // elements go into third substream

    The object returned from this method is not a normal Flow, it is a SubSource. This means that after this combinator all transformations are applied to all encountered substreams in the same fashion. Substream mode is exited either by closing the substream (i.e. connecting it to a Sink) or by merging the substreams back together; see the to and mergeBack methods on SubSource for more information.

    It is important to note that the substreams also propagate back-pressure as any other stream, which means that blocking one substream will block the splitAfter operator itself—and thereby all substreams—once all internal or explicit buffers are filled.

    If the split predicate p throws an exception and the supervision decision is akka.stream.Supervision.Stop the stream and substreams will be completed with failure.

    If the split predicate p throws an exception and the supervision decision is akka.stream.Supervision.Resume or akka.stream.Supervision.Restart the element is dropped and the stream and substreams continue.

    Emits when an element passes through. When the provided predicate is true it emits the element and opens a new substream for subsequent element

    Backpressures when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures

    Completes when upstream completes

    Cancels when downstream cancels and substreams cancel

    See also Source.splitWhen.

  97. def splitWhen(p: Predicate[Out]): SubSource[Out, Mat]

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams, always beginning a new one with the current element if the given predicate returns true for it.

    This operation applies the given predicate to all incoming elements and emits them to a stream of output streams, always beginning a new one with the current element if the given predicate returns true for it. This means that for the following series of predicate values, three substreams will be produced with lengths 1, 2, and 3:

    false,             // element goes into first substream
    true, false,       // elements go into second substream
    true, false, false // elements go into third substream

    In case the *first* element of the stream matches the predicate, the first substream emitted by splitWhen will start from that element. For example:

    true, false, false // first substream starts from the split-by element
    true, false        // subsequent substreams operate the same way

    The object returned from this method is not a normal Flow, it is a SubSource. This means that after this combinator all transformations are applied to all encountered substreams in the same fashion. Substream mode is exited either by closing the substream (i.e. connecting it to a Sink) or by merging the substreams back together; see the to and mergeBack methods on SubSource for more information.

    It is important to note that the substreams also propagate back-pressure as any other stream, which means that blocking one substream will block the splitWhen operator itself—and thereby all substreams—once all internal or explicit buffers are filled.

    If the split predicate p throws an exception and the supervision decision is akka.stream.Supervision.Stop the stream and substreams will be completed with failure.

    If the split predicate p throws an exception and the supervision decision is akka.stream.Supervision.Resume or akka.stream.Supervision.Restart the element is dropped and the stream and substreams continue.

    Emits when an element for which the provided predicate is true, opening and emitting a new substream for subsequent element

    Backpressures when there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures

    Completes when upstream completes

    Cancels when downstream cancels and substreams cancel

    See also Source.splitAfter.

  98. def statefulMapConcat[T](f: Creator[Function[Out, Iterable[T]]]): Source[T, Mat]

    Transform each input element into an Iterable of output elements that is then flattened into the output stream.

    Transform each input element into an Iterable of output elements that is then flattened into the output stream. The transformation is meant to be stateful, which is enabled by creating the transformation function anew for every materialization — the returned function will typically close over mutable objects to store state between invocations. For the stateless variant see #mapConcat.

    Make sure that the Iterable is immutable or at least not modified after being used as an output sequence. Otherwise the stream may fail with ConcurrentModificationException or other more subtle errors may occur.

    The returned Iterable MUST NOT contain null values, as they are illegal as stream elements - according to the Reactive Streams specification.

    Emits when the mapping function returns an element or there are still remaining elements from the previously calculated collection

    Backpressures when downstream backpressures or there are still remaining elements from the previously calculated collection

    Completes when upstream completes and all remaining elements has been emitted

    Cancels when downstream cancels

  99. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  100. def take(n: Long): Source[Out, Mat]

    Terminate processing (and cancel the upstream publisher) after the given number of elements.

    Terminate processing (and cancel the upstream publisher) after the given number of elements. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.

    The stream will be completed without producing any elements if n is zero or negative.

    Emits when the specified number of elements to take has not yet been reached

    Backpressures when downstream backpressures

    Completes when the defined number of elements has been taken or upstream completes

    Cancels when the defined number of elements has been taken or downstream cancels

  101. def takeWhile(p: Predicate[Out]): Source[Out, Mat]

    Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time.

    Terminate processing (and cancel the upstream publisher) after predicate returns false for the first time. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.

    The stream will be completed without producing any elements if predicate is false for the first stream element.

    Emits when the predicate is true

    Backpressures when downstream backpressures

    Completes when predicate returned false or upstream completes

    Cancels when predicate returned false or downstream cancels

  102. def takeWithin(d: FiniteDuration): Source[Out, Mat]

    Terminate processing (and cancel the upstream publisher) after the given duration.

    Terminate processing (and cancel the upstream publisher) after the given duration. Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.

    Note that this can be combined with #take to limit the number of elements within the duration.

    Emits when an upstream element arrives

    Backpressures when downstream backpressures

    Completes when upstream completes or timer fires

    Cancels when downstream cancels or timer fires

  103. def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, costCalculation: Function[Out, Integer], mode: ThrottleMode): Source[Out, Mat]

    Sends elements downstream with speed limited to cost/per.

    Sends elements downstream with speed limited to cost/per. Cost is calculating for each element individually by calling calculateCost function. This combinator works for streams when elements have different cost(length). Streams of ByteString for example.

    Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be spared for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as element cost. If there isn't any, throttle waits until the bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally to their cost minus available tokens, meeting the target rate.

    Parameter mode manages behaviour when upstream is faster than throttle rate:

    Emits when upstream emits an element and configured time per each element elapsed

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  104. def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]

    Sends elements downstream with speed limited to elements/per.

    Sends elements downstream with speed limited to elements/per. In other words, this stage set the maximum rate for emitting messages. This combinator works for streams where all elements have the same cost or length.

    Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst). Tokens drops into the bucket at a given rate and can be spared for later use up to bucket capacity to allow some burstiness. Whenever stream wants to send an element, it takes as many tokens from the bucket as number of elements. If there isn't any, throttle waits until the bucket accumulates enough tokens. Bucket is full when stream just materialized and started.

    Parameter mode manages behaviour when upstream is faster than throttle rate:

    It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).

    Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.

    Emits when upstream emits an element and configured time per each element elapsed

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  105. def to[M](sink: Graph[SinkShape[Out], M]): RunnableGraph[Mat]

    Connect this Source to a Sink, concatenating the processing steps of both.

    Connect this Source to a Sink, concatenating the processing steps of both.

    +----------------------------+
    | Resulting RunnableGraph    |
    |                            |
    |  +------+        +------+  |
    |  |      |        |      |  |
    |  | this | ~Out~> | sink |  |
    |  |      |        |      |  |
    |  +------+        +------+  |
    +----------------------------+

    The materialized value of the combined Sink will be the materialized value of the current flow (ignoring the given Sink’s value), use toMat if a different strategy is needed.

  106. def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: Function2[Mat, M, M2]): RunnableGraph[M2]

    Connect this Source to a Sink, concatenating the processing steps of both.

    Connect this Source to a Sink, concatenating the processing steps of both.

    +----------------------------+
    | Resulting RunnableGraph    |
    |                            |
    |  +------+        +------+  |
    |  |      |        |      |  |
    |  | this | ~Out~> | sink |  |
    |  |      |        |      |  |
    |  +------+        +------+  |
    +----------------------------+

    The combine function is used to compose the materialized values of this flow and that Sink into the materialized value of the resulting Sink.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

  107. def toString(): String
    Definition Classes
    Source → AnyRef → Any
  108. def via[T, M](flow: Graph[FlowShape[Out, T], M]): Source[T, Mat]

    Transform this Source by appending the given processing stages.

    Transform this Source by appending the given processing stages.

    +----------------------------+
    | Resulting Source           |
    |                            |
    |  +------+        +------+  |
    |  |      |        |      |  |
    |  | this | ~Out~> | flow | ~~> T
    |  |      |        |      |  |
    |  +------+        +------+  |
    +----------------------------+

    The materialized value of the combined Flow will be the materialized value of the current flow (ignoring the other Flow’s value), use viaMat if a different strategy is needed.

  109. def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: Function2[Mat, M, M2]): Source[T, M2]

    Transform this Source by appending the given processing stages.

    Transform this Source by appending the given processing stages.

    +----------------------------+
    | Resulting Source           |
    |                            |
    |  +------+        +------+  |
    |  |      |        |      |  |
    |  | this | ~Out~> | flow | ~~> T
    |  |      |        |      |  |
    |  +------+        +------+  |
    +----------------------------+

    The combine function is used to compose the materialized values of this flow and that flow into the materialized value of the resulting Flow.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

  110. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  111. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  112. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  113. def watchTermination[M]()(matF: Function2[Mat, CompletionStage[Done], M]): Source[Out, M]

    Materializes to Future[Done] that completes on getting termination message.

    Materializes to Future[Done] that completes on getting termination message. The Future completes with success when received complete message from upstream or cancel from downstream. It fails with the same error when received error message from downstream.

  114. def withAttributes(attr: Attributes): Source[Out, Mat]

    Change the attributes of this Source to the given ones and seal the list of attributes.

    Change the attributes of this Source to the given ones and seal the list of attributes. This means that further calls will not be able to remove these attributes, but instead add new ones. Note that this operation has no effect on an empty Flow (because the attributes apply only to the contained processing stages).

    Definition Classes
    SourceGraph
  115. def zip[T](that: Graph[SourceShape[T], _]): Source[Pair[Out, T], Mat]

    Combine the elements of current Source and the given one into a stream of tuples.

    Combine the elements of current Source and the given one into a stream of tuples.

    Emits when all of the inputs has an element available

    Backpressures when downstream backpressures

    Completes when any upstream completes

    Cancels when downstream cancels

  116. def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: Function2[Mat, M, M2]): Source[Pair[Out, T], M2]

    Combine the elements of current Source and the given one into a stream of tuples.

    Combine the elements of current Source and the given one into a stream of tuples.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #zip.

  117. def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _], combine: Function2[Out, Out2, Out3]): Source[Out3, Mat]

    Put together the elements of current Source and the given one into a stream of combined elements using a combiner function.

    Put together the elements of current Source and the given one into a stream of combined elements using a combiner function.

    Emits when all of the inputs has an element available

    Backpressures when downstream backpressures

    Completes when any upstream completes

    Cancels when downstream cancels

  118. def zipWithIndex: Source[Pair[Out, Long], Mat]

    Combine the elements of current Source into a stream of tuples consisting of all elements paired with their index.

    Combine the elements of current Source into a stream of tuples consisting of all elements paired with their index. Indices start at 0.

    Emits when upstream emits an element and is paired with their index

    Backpressures when downstream backpressures

    Completes when upstream completes

    Cancels when downstream cancels

  119. def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], combine: Function2[Out, Out2, Out3], matF: Function2[Mat, M, M2]): Source[Out3, M2]

    Put together the elements of current Source and the given one into a stream of combined elements using a combiner function.

    Put together the elements of current Source and the given one into a stream of combined elements using a combiner function.

    It is recommended to use the internally optimized Keep.left and Keep.right combiners where appropriate instead of manually writing functions that pass through one of the values.

    See also

    #zipWith.

  120. def [B](y: B): (Source[Out, Mat], B)
    Implicit
    This member is added by an implicit conversion from Source[Out, Mat] to ArrowAssoc[Source[Out, Mat]] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Deprecated Value Members

  1. def recover[T >: Out](pf: PartialFunction[Throwable, T]): Source[T, Mat]

    Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.

    Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.

    Throwing an exception inside recover _will_ be logged on ERROR level automatically.

    Emits when element is available from the upstream or upstream is failed and pf returns an element

    Backpressures when downstream backpressures

    Completes when upstream completes or upstream failed with exception pf can handle

    Cancels when downstream cancels

    Annotations
    @deprecated
    Deprecated

    (Since version 2.4.4) Use recoverWithRetries instead.

  2. def transform[U](mkStage: Creator[Stage[Out, U]]): Source[U, Mat]

    Generic transformation of a stream with a custom processing akka.stream.stage.Stage.

    Generic transformation of a stream with a custom processing akka.stream.stage.Stage. This operator makes it possible to extend the Flow API when there is no specialized operator that performs the transformation.

    Annotations
    @deprecated
    Deprecated

    (Since version 2.4.3) Use via(GraphStage) instead.

Inherited from Graph[SourceShape[Out], Mat]

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from Source[Out, Mat] to any2stringadd[Source[Out, Mat]]

Inherited by implicit conversion StringFormat from Source[Out, Mat] to StringFormat[Source[Out, Mat]]

Inherited by implicit conversion Ensuring from Source[Out, Mat] to Ensuring[Source[Out, Mat]]

Inherited by implicit conversion ArrowAssoc from Source[Out, Mat] to ArrowAssoc[Source[Out, Mat]]

Ungrouped