public final class Source<Out,Mat> extends java.lang.Object implements Graph<SourceShape<Out>,Mat>
A Source
is a set of stream processing steps that has one open output and an attached input.
Can be used as a Publisher
Modifier and Type | Method and Description |
---|---|
static <T> Source<T,ActorRef> |
actorPublisher(Props props)
|
static <T> Source<T,ActorRef> |
actorRef(int bufferSize,
OverflowStrategy overflowStrategy)
Creates a
Source that is materialized as an ActorRef . |
Source<Out,Mat> |
addAttributes(Attributes attr)
Add the given attributes to this Source.
|
Source<Out,Mat> |
alsoTo(Graph<SinkShape<Out>,?> that)
|
<M2,M3> Source<Out,M3> |
alsoToMat(Graph<SinkShape<Out>,M2> that,
Function2<Mat,M2,M3> matF)
|
Source<Out,Mat> |
asScala()
Converts this Java DSL element to its Scala DSL counterpart.
|
static <T> Source<T,org.reactivestreams.Subscriber<T>> |
asSubscriber()
Creates a
Source that is materialized as a Subscriber |
Source<Out,Mat> |
async()
Put an asynchronous boundary around this
Source |
Source<Out,Mat> |
backpressureTimeout(scala.concurrent.duration.FiniteDuration timeout)
If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
the stream is failed with a
TimeoutException . |
<S> Source<S,Mat> |
batch(long max,
Function<Out,S> seed,
Function2<S,Out,S> aggregate)
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
until the subscriber is ready to accept them.
|
<S> Source<S,Mat> |
batchWeighted(long max,
Function<Out,java.lang.Object> costFn,
Function<Out,S> seed,
Function2<S,Out,S> aggregate)
Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
until the subscriber is ready to accept them.
|
Source<Out,Mat> |
buffer(int size,
OverflowStrategy overflowStrategy)
Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
|
<T> Source<T,Mat> |
collect(scala.PartialFunction<Out,T> pf)
Transform this stream by applying the given partial function to each of the elements
on which the function is defined as they pass through this processing step.
|
static <T,U> Source<U,NotUsed> |
combine(Source<T,?> first,
Source<T,?> second,
java.util.List<Source<T,?>> rest,
Function<java.lang.Integer,? extends Graph<UniformFanInShape<T,U>,NotUsed>> strategy)
Combines several sources with fan-in strategy like
Merge or Concat and returns Source . |
Source<Out,Mat> |
completionTimeout(scala.concurrent.duration.FiniteDuration timeout)
If the completion of the stream does not happen until the provided timeout, the stream is failed
with a
TimeoutException . |
<T,M> Source<T,Mat> |
concat(Graph<SourceShape<T>,M> that)
Concatenate this
Source with the given one, meaning that once current
is exhausted and all result elements have been generated,
the given source elements will be produced. |
<T,M,M2> Source<T,M2> |
concatMat(Graph<SourceShape<T>,M> that,
Function2<Mat,M,M2> matF)
Concatenate this
Source with the given one, meaning that once current
is exhausted and all result elements have been generated,
the given source elements will be produced. |
<O2> Source<O2,Mat> |
conflate(Function2<O2,O2,O2> aggregate)
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
until the subscriber is ready to accept them.
|
<S> Source<S,Mat> |
conflateWithSeed(Function<Out,S> seed,
Function2<S,Out,S> aggregate)
Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
until the subscriber is ready to accept them.
|
static <O> Source<O,NotUsed> |
cycle(Creator<java.util.Iterator<O>> f)
Helper to create 'cycled'
Source from iterator provider. |
Source<Out,Mat> |
delay(scala.concurrent.duration.FiniteDuration of,
DelayOverflowStrategy strategy)
Shifts elements emission in time by a specified amount.
|
Source<Out,Mat> |
detach()
Detaches upstream demand from downstream demand without detaching the
stream rates; in other words acts like a buffer of size 1.
|
Source<Out,Mat> |
drop(long n)
Discard the given number of elements at the beginning of the stream.
|
Source<Out,Mat> |
dropWhile(Predicate<Out> p)
Discard elements at the beginning of the stream while predicate is true.
|
Source<Out,Mat> |
dropWithin(scala.concurrent.duration.FiniteDuration d)
Discard the elements received within the given duration at beginning of the stream.
|
static <O> Source<O,NotUsed> |
empty()
Create a
Source with no elements, i.e. |
<U> Source<U,Mat> |
expand(Function<Out,java.util.Iterator<U>> extrapolate)
Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
element until new element comes from the upstream.
|
static <T> Source<T,NotUsed> |
failed(java.lang.Throwable cause)
Create a
Source that immediately ends the stream with the cause failure to every connected Sink . |
Source<Out,Mat> |
filter(Predicate<Out> p)
Only pass on those elements that satisfy the given predicate.
|
Source<Out,Mat> |
filterNot(Predicate<Out> p)
Only pass on those elements that NOT satisfy the given predicate.
|
<T,M> Source<T,Mat> |
flatMapConcat(Function<Out,? extends Graph<SourceShape<T>,M>> f)
Transform each input element into a
Source of output elements that is
then flattened into the output stream by concatenation,
fully consuming one Source after the other. |
<T,M> Source<T,Mat> |
flatMapMerge(int breadth,
Function<Out,? extends Graph<SourceShape<T>,M>> f)
Transform each input element into a
Source of output elements that is
then flattened into the output stream by merging, where at most breadth
substreams are being consumed at any given time. |
<T> Source<T,Mat> |
fold(T zero,
Function2<T,Out,T> f)
Similar to
scan but only emits its result when the upstream completes,
after which it also completes. |
<T> Source<T,Mat> |
foldAsync(T zero,
Function2<T,Out,java.util.concurrent.CompletionStage<T>> f)
Similar to
fold but with an asynchronous function. |
static <O> Source<O,NotUsed> |
from(java.lang.Iterable<O> iterable)
Helper to create
Source from Iterable . |
static <O> Source<O,NotUsed> |
fromCompletionStage(java.util.concurrent.CompletionStage<O> future)
Start a new
Source from the given CompletionStage . |
static <O> Source<O,NotUsed> |
fromFuture(scala.concurrent.Future<O> future)
Start a new
Source from the given Future . |
static <T,M> Source<T,M> |
fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes
it so also in type.
|
static <O> Source<O,NotUsed> |
fromIterator(Creator<java.util.Iterator<O>> f)
Helper to create
Source from Iterator . |
static <O> Source<O,NotUsed> |
fromPublisher(org.reactivestreams.Publisher<O> publisher)
Helper to create
Source from Publisher . |
<K> SubSource<Out,Mat> |
groupBy(int maxSubstreams,
Function<Out,K> f)
This operation demultiplexes the incoming stream into separate output
streams, one for each element key.
|
Source<java.util.List<Out>,Mat> |
grouped(int n)
Chunk up this stream into groups of the given size, with the last group
possibly smaller than requested due to end-of-stream.
|
Source<java.util.List<Out>,Mat> |
groupedWithin(int n,
scala.concurrent.duration.FiniteDuration d)
Chunk up this stream into groups of elements received within a time window,
or limited by the given number of elements, whatever happens first.
|
Source<Out,Mat> |
idleTimeout(scala.concurrent.duration.FiniteDuration timeout)
If the time between two processed elements exceeds the provided timeout, the stream is failed
with a
TimeoutException . |
Source<Out,Mat> |
initialDelay(scala.concurrent.duration.FiniteDuration delay)
Delays the initial element by the specified duration.
|
Source<Out,Mat> |
initialTimeout(scala.concurrent.duration.FiniteDuration timeout)
If the first element has not passed through this stage before the provided timeout, the stream is failed
with a
TimeoutException . |
<T> Source<T,Mat> |
interleave(Graph<SourceShape<T>,?> that,
int segmentSize)
|
<T,M,M2> Source<T,M2> |
interleaveMat(Graph<SourceShape<T>,M> that,
int segmentSize,
Function2<Mat,M,M2> matF)
|
<T> Source<T,Mat> |
intersperse(T inject)
Intersperses stream with provided element, similar to how
scala.collection.immutable.List.mkString
injects a separator between a List's elements. |
<T> Source<T,Mat> |
intersperse(T start,
T inject,
T end)
Intersperses stream with provided element, similar to how
scala.collection.immutable.List.mkString
injects a separator between a List's elements. |
<U> Source<U,Mat> |
keepAlive(scala.concurrent.duration.FiniteDuration maxIdle,
Creator<U> injectedElem)
Injects additional elements if upstream does not emit for a configured amount of time.
|
static <T,M> Source<T,java.util.concurrent.CompletionStage<M>> |
lazily(Creator<Source<T,M>> create)
Creates a
Source that is not materialized until there is downstream demand, when the source gets materialized
the materialized future is completed with its value, if downstream cancels or fails without any demand the
create factory is never called and the materialized CompletionStage is failed. |
Source<Out,Mat> |
limit(int n)
Ensure stream boundedness by limiting the number of elements from upstream.
|
Source<Out,Mat> |
limitWeighted(long n,
Function<Out,java.lang.Object> costFn)
Ensure stream boundedness by evaluating the cost of incoming elements
using a cost function.
|
Source<Out,Mat> |
log(java.lang.String name)
Logs elements flowing through the stream as well as completion and erroring.
|
Source<Out,Mat> |
log(java.lang.String name,
Function<Out,java.lang.Object> extract)
Logs elements flowing through the stream as well as completion and erroring.
|
Source<Out,Mat> |
log(java.lang.String name,
Function<Out,java.lang.Object> extract,
LoggingAdapter log)
Logs elements flowing through the stream as well as completion and erroring.
|
Source<Out,Mat> |
log(java.lang.String name,
LoggingAdapter log)
Logs elements flowing through the stream as well as completion and erroring.
|
<T> Source<T,Mat> |
map(Function<Out,T> f)
Transform this stream by applying the given function to each of the elements
as they pass through this processing step.
|
<T> Source<T,Mat> |
mapAsync(int parallelism,
Function<Out,java.util.concurrent.CompletionStage<T>> f)
Transform this stream by applying the given function to each of the elements
as they pass through this processing step.
|
<T> Source<T,Mat> |
mapAsyncUnordered(int parallelism,
Function<Out,java.util.concurrent.CompletionStage<T>> f)
Transform this stream by applying the given function to each of the elements
as they pass through this processing step.
|
<T> Source<T,Mat> |
mapConcat(Function<Out,? extends java.lang.Iterable<T>> f)
Transform each input element into an
Iterable of output elements that is
then flattened into the output stream. |
Source<Out,Mat> |
mapError(scala.PartialFunction<java.lang.Throwable,java.lang.Throwable> pf)
While similar to
recover this stage can be used to transform an error signal to a different one *without* logging
it as an error in the process. |
<Mat2> Source<Out,Mat2> |
mapMaterializedValue(Function<Mat,Mat2> f)
Transform only the materialized value of this Source, leaving all other properties as they were.
|
static <T> Source<T,java.util.concurrent.CompletableFuture<java.util.Optional<T>>> |
maybe()
Create a
Source which materializes a CompletableFuture which controls what element
will be emitted by the Source. |
<T> Source<T,Mat> |
merge(Graph<SourceShape<T>,?> that)
Merge the given
Source to the current one, taking elements as they arrive from input streams,
picking randomly when several elements ready. |
<T,M,M2> Source<T,M2> |
mergeMat(Graph<SourceShape<T>,M> that,
Function2<Mat,M,M2> matF)
Merge the given
Source to the current one, taking elements as they arrive from input streams,
picking randomly when several elements ready. |
<U,M> Source<U,Mat> |
mergeSorted(Graph<SourceShape<U>,M> that,
java.util.Comparator<U> comp)
|
<U,Mat2,Mat3> |
mergeSortedMat(Graph<SourceShape<U>,Mat2> that,
java.util.Comparator<U> comp,
Function2<Mat,Mat2,Mat3> matF)
|
StreamLayout.Module |
module()
INTERNAL API.
|
<M> Source<Out,M> |
monitor(Function2<Mat,FlowMonitor<Out>,M> combine)
Materializes to
FlowMonitor[Out] that allows monitoring of the current flow. |
Source<Out,Mat> |
named(java.lang.String name)
Add a
name attribute to this Source. |
<T,M> Source<T,Mat> |
orElse(Graph<SourceShape<T>,M> secondary)
Provides a secondary source that will be consumed if this source completes without any
elements passing by.
|
<T,M,M2> Source<T,M2> |
orElseMat(Graph<SourceShape<T>,M> secondary,
Function2<Mat,M,M2> matF)
Provides a secondary source that will be consumed if this source completes without any
elements passing by.
|
Source<Pair<java.util.List<Out>,Source<Out,NotUsed>>,Mat> |
prefixAndTail(int n)
Takes up to
n elements from the stream (less than n if the upstream completes before emitting n elements)
and returns a pair containing a strict sequence of the taken element
and a stream representing the remaining elements. |
<T,M> Source<T,Mat> |
prepend(Graph<SourceShape<T>,M> that)
Prepend the given
Source to this one, meaning that once the given source
is exhausted and all result elements have been generated, the current source's
elements will be produced. |
<T,M,M2> Source<T,M2> |
prependMat(Graph<SourceShape<T>,M> that,
Function2<Mat,M,M2> matF)
Prepend the given
Source to this one, meaning that once the given source
is exhausted and all result elements have been generated, the current source's
elements will be produced. |
static <T> Source<T,SourceQueueWithComplete<T>> |
queue(int bufferSize,
OverflowStrategy overflowStrategy)
Creates a
Source that is materialized as an SourceQueue . |
static Source<java.lang.Integer,NotUsed> |
range(int start,
int end)
Creates
Source that represents integer values in range ''[start;end]'', step equals to 1. |
static Source<java.lang.Integer,NotUsed> |
range(int start,
int end,
int step)
Creates
Source that represents integer values in range ''[start;end]'', with the given step. |
<T> Source<T,Mat> |
recover(scala.PartialFunction<java.lang.Throwable,T> pf)
Deprecated.
Use recoverWithRetries instead. Since 2.4.4.
|
<T> Source<T,Mat> |
recoverWith(scala.PartialFunction<java.lang.Throwable,? extends Graph<SourceShape<T>,NotUsed>> pf)
RecoverWith allows to switch to alternative Source on flow failure.
|
<T> Source<T,Mat> |
recoverWithRetries(int attempts,
scala.PartialFunction<java.lang.Throwable,? extends Graph<SourceShape<T>,NotUsed>> pf)
RecoverWithRetries allows to switch to alternative Source on flow failure.
|
Source<Out,Mat> |
reduce(Function2<Out,Out,Out> f)
Similar to
fold but uses first element as zero element. |
static <T> Source<T,NotUsed> |
repeat(T element)
Create a
Source that will continually emit the given element. |
<U> java.util.concurrent.CompletionStage<U> |
runFold(U zero,
Function2<U,Out,U> f,
Materializer materializer)
Shortcut for running this
Source with a fold function. |
<U> java.util.concurrent.CompletionStage<U> |
runFoldAsync(U zero,
Function2<U,Out,java.util.concurrent.CompletionStage<U>> f,
Materializer materializer)
Shortcut for running this
Source with an asynchronous fold function. |
java.util.concurrent.CompletionStage<Done> |
runForeach(Procedure<Out> f,
Materializer materializer)
Shortcut for running this
Source with a foreach procedure. |
<U> java.util.concurrent.CompletionStage<U> |
runReduce(Function2<U,U,U> f,
Materializer materializer)
Shortcut for running this
Source with a reduce function. |
<M> M |
runWith(Graph<SinkShape<Out>,M> sink,
Materializer materializer)
Connect this
Source to a Sink and run it. |
<T> Source<T,Mat> |
scan(T zero,
Function2<T,Out,T> f)
Similar to
fold but is not a terminal operation,
emits its current value which starts at zero and then
applies the current and next value to the given function f ,
emitting the next current value. |
<T> Source<T,Mat> |
scanAsync(T zero,
Function2<T,Out,java.util.concurrent.CompletionStage<T>> f)
Similar to
scan but with a asynchronous function,
emits its current value which starts at zero and then
applies the current and next value to the given function f ,
emitting a Future that resolves to the next current value. |
SourceShape<Out> |
shape()
The shape of a graph is all that is externally visible: its inlets and outlets.
|
static <T> Source<T,NotUsed> |
single(T element)
Create a
Source with one element. |
Source<java.util.List<Out>,Mat> |
sliding(int n,
int step)
Apply a sliding window over the stream and return the windows as groups of elements, with the last group
possibly smaller than requested due to end-of-stream.
|
<U> SubSource<Out,Mat> |
splitAfter(Predicate<Out> p)
This operation applies the given predicate to all incoming elements and
emits them to a stream of output streams.
|
SubSource<Out,Mat> |
splitWhen(Predicate<Out> p)
This operation applies the given predicate to all incoming elements and
emits them to a stream of output streams, always beginning a new one with
the current element if the given predicate returns true for it.
|
<T> Source<T,Mat> |
statefulMapConcat(Creator<Function<Out,java.lang.Iterable<T>>> f)
Transform each input element into an
Iterable of output elements that is
then flattened into the output stream. |
Source<Out,Mat> |
take(long n)
Terminate processing (and cancel the upstream publisher) after the given
number of elements.
|
Source<Out,Mat> |
takeWhile(Predicate<Out> p)
Terminate processing (and cancel the upstream publisher) after predicate
returns false for the first time.
|
Source<Out,Mat> |
takeWithin(scala.concurrent.duration.FiniteDuration d)
Terminate processing (and cancel the upstream publisher) after the given
duration.
|
Source<Out,Mat> |
throttle(int cost,
scala.concurrent.duration.FiniteDuration per,
int maximumBurst,
Function<Out,java.lang.Integer> costCalculation,
ThrottleMode mode)
Sends elements downstream with speed limited to
cost/per . |
Source<Out,Mat> |
throttle(int elements,
scala.concurrent.duration.FiniteDuration per,
int maximumBurst,
ThrottleMode mode)
Sends elements downstream with speed limited to
elements/per . |
static <O> Source<O,Cancellable> |
tick(scala.concurrent.duration.FiniteDuration initialDelay,
scala.concurrent.duration.FiniteDuration interval,
O tick)
Elements are emitted periodically with the specified interval.
|
<M> RunnableGraph<Mat> |
to(Graph<SinkShape<Out>,M> sink)
|
<M,M2> RunnableGraph<M2> |
toMat(Graph<SinkShape<Out>,M> sink,
Function2<Mat,M,M2> combine)
|
java.lang.String |
toString() |
<U> Source<U,Mat> |
transform(Creator<Stage<Out,U>> mkStage)
Deprecated.
Use via(GraphStage) instead. Since 2.4.3.
|
static <S,E> Source<E,NotUsed> |
unfold(S s,
Function<S,java.util.Optional<Pair<S,E>>> f)
Create a
Source that will unfold a value of type S into
a pair of the next state S and output elements of type E . |
static <S,E> Source<E,NotUsed> |
unfoldAsync(S s,
Function<S,java.util.concurrent.CompletionStage<java.util.Optional<Pair<S,E>>>> f)
Same as
unfold , but uses an async function to generate the next state-element tuple. |
static <T,S> Source<T,NotUsed> |
unfoldResource(Creator<S> create,
Function<S,java.util.Optional<T>> read,
Procedure<S> close)
Start a new
Source from some resource which can be opened, read and closed. |
static <T,S> Source<T,NotUsed> |
unfoldResourceAsync(Creator<java.util.concurrent.CompletionStage<S>> create,
Function<S,java.util.concurrent.CompletionStage<java.util.Optional<T>>> read,
Function<S,java.util.concurrent.CompletionStage<Done>> close)
Start a new
Source from some resource which can be opened, read and closed. |
<T,M> Source<T,Mat> |
via(Graph<FlowShape<Out,T>,M> flow)
Transform this
Source by appending the given processing stages. |
<T,M,M2> Source<T,M2> |
viaMat(Graph<FlowShape<Out,T>,M> flow,
Function2<Mat,M,M2> combine)
Transform this
Source by appending the given processing stages. |
<M> Source<Out,M> |
watchTermination(Function2<Mat,java.util.concurrent.CompletionStage<Done>,M> matF)
Materializes to
Future[Done] that completes on getting termination message. |
Source<Out,Mat> |
withAttributes(Attributes attr)
Change the attributes of this
Source to the given ones and seal the list
of attributes. |
<T> Source<Pair<Out,T>,Mat> |
zip(Graph<SourceShape<T>,?> that)
Combine the elements of current
Source and the given one into a stream of tuples. |
<T,M,M2> Source<Pair<Out,T>,M2> |
zipMat(Graph<SourceShape<T>,M> that,
Function2<Mat,M,M2> matF)
Combine the elements of current
Source and the given one into a stream of tuples. |
static <T> Source<java.util.List<T>,NotUsed> |
zipN(java.util.List<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of lists.
|
<Out2,Out3> |
zipWith(Graph<SourceShape<Out2>,?> that,
Function2<Out,Out2,Out3> combine)
Put together the elements of current
Source and the given one
into a stream of combined elements using a combiner function. |
Source<Pair<Out,java.lang.Object>,Mat> |
zipWithIndex()
Combine the elements of current
Source into a stream of tuples consisting
of all elements paired with their index. |
<Out2,Out3,M,M2> |
zipWithMat(Graph<SourceShape<Out2>,M> that,
Function2<Out,Out2,Out3> combine,
Function2<Mat,M,M2> matF)
Put together the elements of current
Source and the given one
into a stream of combined elements using a combiner function. |
static <T,O> Source<O,NotUsed> |
zipWithN(Function<java.util.List<T>,O> zipper,
java.util.List<Source<T,?>> sources) |
public static <O> Source<O,NotUsed> empty()
Source
with no elements, i.e. an empty stream that is completed immediately
for every connected Sink
.public static <T> Source<T,java.util.concurrent.CompletableFuture<java.util.Optional<T>>> maybe()
Source
which materializes a CompletableFuture
which controls what element
will be emitted by the Source.
If the materialized promise is completed with a filled Optional, that value will be produced downstream,
followed by completion.
If the materialized promise is completed with an empty Optional, no value will be produced downstream and completion will
be signalled immediately.
If the materialized promise is completed with a failure, then the returned source will terminate with that error.
If the downstream of this source cancels before the promise has been completed, then the promise will be completed
with an empty Optional.public static <O> Source<O,NotUsed> fromPublisher(org.reactivestreams.Publisher<O> publisher)
Source
from Publisher
.
Construct a transformation starting with given publisher. The transformation steps
are executed by a series of Processor
instances
that mediate the flow of elements downstream and the propagation of
back-pressure upstream.
publisher
- (undocumented)public static <O> Source<O,NotUsed> fromIterator(Creator<java.util.Iterator<O>> f)
Source
from Iterator
.
Example usage:
List<Integer> data = new ArrayList<Integer>();
data.add(1);
data.add(2);
data.add(3);
Source.from(() -> data.iterator());
Start a new Source
from the given Iterator. The produced stream of elements
will continue until the iterator runs empty or fails during evaluation of
the next()
method. Elements are pulled out of the iterator
in accordance with the demand coming from the downstream transformation
steps.
f
- (undocumented)public static <O> Source<O,NotUsed> cycle(Creator<java.util.Iterator<O>> f)
Source
from iterator provider.
Example usage:
Source.cycle(() -> Arrays.asList(1, 2, 3).iterator());
Start a new 'cycled' Source
from the given elements. The producer stream of elements
will continue infinitely by repeating the sequence of elements provided by function parameter.
f
- (undocumented)public static <O> Source<O,NotUsed> from(java.lang.Iterable<O> iterable)
Source
from Iterable
.
Example usage:
List<Integer> data = new ArrayList<Integer>();
data.add(1);
data.add(2);
data.add(3);
Source.from(data);
Starts a new Source
from the given Iterable
. This is like starting from an
Iterator, but every Subscriber directly attached to the Publisher of this
stream will see an individual flow of elements (always starting from the
beginning) regardless of when they subscribed.
Make sure that the Iterable
is immutable or at least not modified after
being used as a Source
. Otherwise the stream may fail with
ConcurrentModificationException
or other more subtle errors may occur.
iterable
- (undocumented)public static Source<java.lang.Integer,NotUsed> range(int start, int end)
Source
that represents integer values in range ''[start;end]'', step equals to 1.
It allows to create Source
out of range as simply as on Scala Source(1 to N)
Uses scala.collection.immutable.Range.inclusive(Int, Int)
internally
start
- (undocumented)end
- (undocumented)scala.collection.immutable.Range.inclusive(Int, Int)
public static Source<java.lang.Integer,NotUsed> range(int start, int end, int step)
Source
that represents integer values in range ''[start;end]'', with the given step.
It allows to create Source
out of range as simply as on Scala Source(1 to N)
Uses scala.collection.immutable.Range.inclusive(Int, Int, Int)
internally
start
- (undocumented)end
- (undocumented)step
- (undocumented)scala.collection.immutable.Range.inclusive(Int, Int, Int)
public static <O> Source<O,NotUsed> fromFuture(scala.concurrent.Future<O> future)
Source
from the given Future
. The stream will consist of
one element when the Future
is completed with a successful value, which
may happen before or after materializing the Flow
.
The stream terminates with a failure if the Future
is completed with a failure.future
- (undocumented)public static <O> Source<O,NotUsed> fromCompletionStage(java.util.concurrent.CompletionStage<O> future)
Source
from the given CompletionStage
. The stream will consist of
one element when the CompletionStage
is completed with a successful value, which
may happen before or after materializing the Flow
.
The stream terminates with a failure if the CompletionStage
is completed with a failure.future
- (undocumented)public static <O> Source<O,Cancellable> tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, O tick)
initialDelay
- (undocumented)interval
- (undocumented)tick
- (undocumented)public static <T> Source<T,NotUsed> single(T element)
Source
with one element.
Every connected Sink
of this stream will see an individual stream consisting of one element.element
- (undocumented)public static <T> Source<T,NotUsed> repeat(T element)
Source
that will continually emit the given element.element
- (undocumented)public static <S,E> Source<E,NotUsed> unfold(S s, Function<S,java.util.Optional<Pair<S,E>>> f)
Source
that will unfold a value of type S
into
a pair of the next state S
and output elements of type E
.s
- (undocumented)f
- (undocumented)public static <S,E> Source<E,NotUsed> unfoldAsync(S s, Function<S,java.util.concurrent.CompletionStage<java.util.Optional<Pair<S,E>>>> f)
unfold
, but uses an async function to generate the next state-element tuple.s
- (undocumented)f
- (undocumented)public static <T> Source<T,NotUsed> failed(java.lang.Throwable cause)
Source
that immediately ends the stream with the cause
failure to every connected Sink
.cause
- (undocumented)public static <T,M> Source<T,java.util.concurrent.CompletionStage<M>> lazily(Creator<Source<T,M>> create)
Source
that is not materialized until there is downstream demand, when the source gets materialized
the materialized future is completed with its value, if downstream cancels or fails without any demand the
create
factory is never called and the materialized CompletionStage
is failed.create
- (undocumented)public static <T> Source<T,org.reactivestreams.Subscriber<T>> asSubscriber()
Source
that is materialized as a Subscriber
public static <T> Source<T,ActorRef> actorPublisher(Props props)
Source
that is materialized to an ActorRef
which points to an Actor
created according to the passed in Props
. Actor created by the props
should
be ActorPublisher
.props
- (undocumented)public static <T> Source<T,ActorRef> actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Source
that is materialized as an ActorRef
.
Messages sent to this actor will be emitted to the stream if there is demand from downstream,
otherwise they will be buffered until request for demand is received.
Depending on the defined OverflowStrategy
it might drop elements if
there is no space available in the buffer.
The strategy akka.stream.OverflowStrategy.backpressure
is not supported, and an
IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
The buffer can be disabled by using bufferSize
of 0 and then received messages are dropped if there is no demand
from downstream. When bufferSize
is 0 the overflowStrategy
does not matter. An async boundary is added after
this Source; as such, it is never safe to assume the downstream will always generate demand.
The stream can be completed successfully by sending the actor reference a Status.Success
(whose content will be ignored) in which case already buffered elements will be signaled before signaling
completion, or by sending PoisonPill
in which case completion will be signaled immediately.
The stream can be completed with failure by sending a Status.Failure
to the
actor reference. In case the Actor is still draining its internal buffer (after having received
a Status.Success
) before signaling completion and it receives a Status.Failure
,
the failure will be signaled downstream immediately (instead of the completion signal).
The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also akka.stream.javadsl.Source.queue
.
bufferSize
- The size of the buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic static <T,M> Source<T,M> fromGraph(Graph<SourceShape<T>,M> g)
g
- (undocumented)public static <T,U> Source<U,NotUsed> combine(Source<T,?> first, Source<T,?> second, java.util.List<Source<T,?>> rest, Function<java.lang.Integer,? extends Graph<UniformFanInShape<T,U>,NotUsed>> strategy)
Merge
or Concat
and returns Source
.first
- (undocumented)second
- (undocumented)rest
- (undocumented)strategy
- (undocumented)public static <T> Source<java.util.List<T>,NotUsed> zipN(java.util.List<Source<T,?>> sources)
sources
- (undocumented)public static <T,O> Source<O,NotUsed> zipWithN(Function<java.util.List<T>,O> zipper, java.util.List<Source<T,?>> sources)
public static <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy)
Source
that is materialized as an SourceQueue
.
You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
if downstream is terminated.
Depending on the defined OverflowStrategy
it might drop elements if
there is no space available in the buffer.
Acknowledgement mechanism is available.
akka.stream.javadsl.SourceQueue.offer
returns CompletionStage
which completes with
QueueOfferResult.enqueued
if element was added to buffer or sent downstream. It completes with
QueueOfferResult.dropped
if element was dropped. Can also complete with QueueOfferResult.Failure
-
when stream failed or QueueOfferResult.QueueClosed
when downstream is completed.
The strategy akka.stream.OverflowStrategy.backpressure
will not complete last offer():CompletionStage
call when buffer is full.
You can watch accessibility of stream with akka.stream.javadsl.SourceQueue.watchCompletion
.
It returns future that completes with success when stream is completed or fail when stream is failed.
The buffer can be disabled by using bufferSize
of 0 and then received message will wait
for downstream demand unless there is another message waiting for downstream demand, in that case
offer result will be completed according to the overflow strategy.
SourceQueue that current source is materialized to is for single thread usage only.
bufferSize
- size of buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic static <T,S> Source<T,NotUsed> unfoldResource(Creator<S> create, Function<S,java.util.Optional<T>> read, Procedure<S> close)
Source
from some resource which can be opened, read and closed.
Interaction with resource happens in a blocking way.
Example:
Source.unfoldResource(
() -> new BufferedReader(new FileReader("...")),
reader -> reader.readLine(),
reader -> reader.close())
You can use the supervision strategy to handle exceptions for read
function. All exceptions thrown by create
or close
will fail the stream.
Restart
supervision strategy will close and create blocking IO again. Default strategy is Stop
which means
that stream will be terminated on error in read
function by default.
You can configure the default dispatcher for this Source by changing the akka.stream.blocking-io-dispatcher
or
set it for a given Source by using ActorAttributes
.
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal
is received. Stream calls close and completes when read
returns None.close
- - function that closes resourcepublic static <T,S> Source<T,NotUsed> unfoldResourceAsync(Creator<java.util.concurrent.CompletionStage<S>> create, Function<S,java.util.concurrent.CompletionStage<java.util.Optional<T>>> read, Function<S,java.util.concurrent.CompletionStage<Done>> close)
Source
from some resource which can be opened, read and closed.
It's similar to unfoldResource
but takes functions that return CompletionStage
instead of plain values.
You can use the supervision strategy to handle exceptions for read
function or failures of produced Futures
.
All exceptions thrown by create
or close
as well as fails of returned futures will fail the stream.
Restart
supervision strategy will close and create resource. Default strategy is Stop
which means
that stream will be terminated on error in read
function (or future) by default.
You can configure the default dispatcher for this Source by changing the akka.stream.blocking-io-dispatcher
or
set it for a given Source by using ActorAttributes
.
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal
is received. Stream calls close and completes when CompletionStage
from read function returns None.close
- - function that closes resourcepublic SourceShape<Out> shape()
Graph
shape
in interface Graph<SourceShape<Out>,Mat>
public StreamLayout.Module module()
Graph
Every materializable element must be backed by a stream layout module
module
in interface Graph<SourceShape<Out>,Mat>
public java.lang.String toString()
toString
in class java.lang.Object
public Source<Out,Mat> asScala()
public <Mat2> Source<Out,Mat2> mapMaterializedValue(Function<Mat,Mat2> f)
f
- (undocumented)public <T,M> Source<T,Mat> via(Graph<FlowShape<Out,T>,M> flow)
Source
by appending the given processing stages.
+----------------------------+
| Resulting Source |
| |
| +------+ +------+ |
| | | | | |
| | this | ~Out~> | flow | ~~> T
| | | | | |
| +------+ +------+ |
+----------------------------+
The materialized value of the combined Flow
will be the materialized
value of the current flow (ignoring the other Flow’s value), use
viaMat
if a different strategy is needed.flow
- (undocumented)public <T,M,M2> Source<T,M2> viaMat(Graph<FlowShape<Out,T>,M> flow, Function2<Mat,M,M2> combine)
Source
by appending the given processing stages.
+----------------------------+
| Resulting Source |
| |
| +------+ +------+ |
| | | | | |
| | this | ~Out~> | flow | ~~> T
| | | | | |
| +------+ +------+ |
+----------------------------+
The combine
function is used to compose the materialized values of this flow and that
flow into the materialized value of the resulting Flow.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
flow
- (undocumented)combine
- (undocumented)public <M> RunnableGraph<Mat> to(Graph<SinkShape<Out>,M> sink)
Source
to a Sink
, concatenating the processing steps of both.
+----------------------------+
| Resulting RunnableGraph |
| |
| +------+ +------+ |
| | | | | |
| | this | ~Out~> | sink | |
| | | | | |
| +------+ +------+ |
+----------------------------+
The materialized value of the combined Sink
will be the materialized
value of the current flow (ignoring the given Sink’s value), use
toMat
if a different strategy is needed.sink
- (undocumented)public <M,M2> RunnableGraph<M2> toMat(Graph<SinkShape<Out>,M> sink, Function2<Mat,M,M2> combine)
Source
to a Sink
, concatenating the processing steps of both.
+----------------------------+
| Resulting RunnableGraph |
| |
| +------+ +------+ |
| | | | | |
| | this | ~Out~> | sink | |
| | | | | |
| +------+ +------+ |
+----------------------------+
The combine
function is used to compose the materialized values of this flow and that
Sink into the materialized value of the resulting Sink.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
sink
- (undocumented)combine
- (undocumented)public <M> M runWith(Graph<SinkShape<Out>,M> sink, Materializer materializer)
Source
to a Sink
and run it. The returned value is the materialized value
of the Sink
, e.g. the Publisher
of a Sink.asPublisher
.sink
- (undocumented)materializer
- (undocumented)public <U> java.util.concurrent.CompletionStage<U> runFold(U zero, Function2<U,Out,U> f, Materializer materializer)
Source
with a fold function.
The given function is invoked for every received element, giving it its previous
output (or the given zero
value) and the element as input.
The returned CompletionStage
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure is signaled in the stream.zero
- (undocumented)f
- (undocumented)materializer
- (undocumented)public <U> java.util.concurrent.CompletionStage<U> runFoldAsync(U zero, Function2<U,Out,java.util.concurrent.CompletionStage<U>> f, Materializer materializer)
Source
with an asynchronous fold function.
The given function is invoked for every received element, giving it its previous
output (or the given zero
value) and the element as input.
The returned CompletionStage
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure is signaled in the stream.zero
- (undocumented)f
- (undocumented)materializer
- (undocumented)public <U> java.util.concurrent.CompletionStage<U> runReduce(Function2<U,U,U> f, Materializer materializer)
Source
with a reduce function.
The given function is invoked for every received element, giving it its previous
output (from the second ones) an the element as input.
The returned CompletionStage
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure is signaled in the stream.
If the stream is empty (i.e. completes before signalling any elements),
the reduce stage will fail its downstream with a NoSuchElementException
,
which is semantically in-line with that Scala's standard library collections
do in such situations.
f
- (undocumented)materializer
- (undocumented)public <T,M> Source<T,Mat> concat(Graph<SourceShape<T>,M> that)
Source
with the given one, meaning that once current
is exhausted and all result elements have been generated,
the given source elements will be produced.
Note that given Source
is materialized together with this Flow and just kept
from producing elements by asserting back-pressure until its time comes.
If this Source
gets upstream error - no elements from the given Source
will be pulled.
'''Emits when''' element is available from current source or from the given Source
when current is completed
'''Backpressures when''' downstream backpressures
'''Completes when''' given Source
completes
'''Cancels when''' downstream cancels
that
- (undocumented)public <T,M,M2> Source<T,M2> concatMat(Graph<SourceShape<T>,M> that, Function2<Mat,M,M2> matF)
Source
with the given one, meaning that once current
is exhausted and all result elements have been generated,
the given source elements will be produced.
Note that given Source
is materialized together with this Flow and just kept
from producing elements by asserting back-pressure until its time comes.
If this Source
gets upstream error - no elements from the given Source
will be pulled.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)matF
- (undocumented)#concat}.
public <T,M> Source<T,Mat> prepend(Graph<SourceShape<T>,M> that)
Source
to this one, meaning that once the given source
is exhausted and all result elements have been generated, the current source's
elements will be produced.
Note that the current Source
is materialized together with this Flow and just kept
from producing elements by asserting back-pressure until its time comes.
If the given Source
gets upstream error - no elements from this Source
will be pulled.
'''Emits when''' element is available from current source or from the given Source
when current is completed
'''Backpressures when''' downstream backpressures
'''Completes when''' given Source
completes
'''Cancels when''' downstream cancels
that
- (undocumented)public <T,M,M2> Source<T,M2> prependMat(Graph<SourceShape<T>,M> that, Function2<Mat,M,M2> matF)
Source
to this one, meaning that once the given source
is exhausted and all result elements have been generated, the current source's
elements will be produced.
Note that the current Source
is materialized together with this Flow and just kept
from producing elements by asserting back-pressure until its time comes.
If the given Source
gets upstream error - no elements from this Source
will be pulled.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)matF
- (undocumented)#prepend}.
public <T,M> Source<T,Mat> orElse(Graph<SourceShape<T>,M> secondary)
Note that this Flow will be materialized together with the Source
and just kept
from producing elements by asserting back-pressure until its time comes or it gets
cancelled.
On errors the stage is failed regardless of source of the error.
'''Emits when''' element is available from first stream or first stream closed without emitting any elements and an element is available from the second stream
'''Backpressures when''' downstream backpressures
'''Completes when''' the primary stream completes after emitting at least one element, when the primary stream completes without emitting and the secondary stream already has completed or when the secondary stream completes
'''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes by from this stream.
secondary
- (undocumented)public <T,M,M2> Source<T,M2> orElseMat(Graph<SourceShape<T>,M> secondary, Function2<Mat,M,M2> matF)
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
secondary
- (undocumented)matF
- (undocumented)orElse(akka.stream.Graph<akka.stream.SourceShape<T>, M>)
public Source<Out,Mat> alsoTo(Graph<SinkShape<Out>,?> that)
Sink
to this Flow
, meaning that elements that passes
through will also be sent to the Sink
.
'''Emits when''' element is available and demand exists both from the Sink and the downstream.
'''Backpressures when''' downstream or Sink backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
that
- (undocumented)public <M2,M3> Source<Out,M3> alsoToMat(Graph<SinkShape<Out>,M2> that, Function2<Mat,M2,M3> matF)
Sink
to this Flow
, meaning that elements that passes
through will also be sent to the Sink
.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)matF
- (undocumented)alsoTo(akka.stream.Graph<akka.stream.SinkShape<Out>, ?>)
public <T> Source<T,Mat> interleave(Graph<SourceShape<T>,?> that, int segmentSize)
Source
with elements of this Source
.
It first emits segmentSize
number of elements from this flow to downstream, then - same amount for that
source,
then repeat process.
Example:
Source.from(Arrays.asList(1, 2, 3)).interleave(Source.from(Arrays.asList(4, 5, 6, 7), 2)
// 1, 2, 4, 5, 3, 6, 7
After one of sources is complete than all the rest elements will be emitted from the second one
If one of sources gets upstream error - stream completes with failure.
'''Emits when''' element is available from the currently consumed upstream
'''Backpressures when''' downstream backpressures. Signal to current
upstream, switch to next upstream when received segmentSize
elements
'''Completes when''' this Source
and given one completes
'''Cancels when''' downstream cancels
that
- (undocumented)segmentSize
- (undocumented)public <T,M,M2> Source<T,M2> interleaveMat(Graph<SourceShape<T>,M> that, int segmentSize, Function2<Mat,M,M2> matF)
Source
with elements of this Source
.
It first emits segmentSize
number of elements from this flow to downstream, then - same amount for that
source,
then repeat process.
After one of sources is complete than all the rest elements will be emitted from the second one
If one of sources gets upstream error - stream completes with failure.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)segmentSize
- (undocumented)matF
- (undocumented)#interleave}.
public <T> Source<T,Mat> merge(Graph<SourceShape<T>,?> that)
Source
to the current one, taking elements as they arrive from input streams,
picking randomly when several elements ready.
'''Emits when''' one of the inputs has an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' all upstreams complete
'''Cancels when''' downstream cancels
that
- (undocumented)public <T,M,M2> Source<T,M2> mergeMat(Graph<SourceShape<T>,M> that, Function2<Mat,M,M2> matF)
Source
to the current one, taking elements as they arrive from input streams,
picking randomly when several elements ready.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)matF
- (undocumented)#merge}.
public <U,M> Source<U,Mat> mergeSorted(Graph<SourceShape<U>,M> that, java.util.Comparator<U> comp)
Source
to this Source
, taking elements as they arrive from input streams,
picking always the smallest of the available elements (waiting for one element from each side
to be available). This means that possible contiguity of the input streams is not exploited to avoid
waiting for elements, this merge will block when one of the inputs does not have more elements (and
does not complete).
'''Emits when''' all of the inputs have an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' all upstreams complete
'''Cancels when''' downstream cancels
that
- (undocumented)comp
- (undocumented)public <U,Mat2,Mat3> Source<U,Mat3> mergeSortedMat(Graph<SourceShape<U>,Mat2> that, java.util.Comparator<U> comp, Function2<Mat,Mat2,Mat3> matF)
Source
to this Source
, taking elements as they arrive from input streams,
picking always the smallest of the available elements (waiting for one element from each side
to be available). This means that possible contiguity of the input streams is not exploited to avoid
waiting for elements, this merge will block when one of the inputs does not have more elements (and
does not complete).
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)comp
- (undocumented)matF
- (undocumented)#mergeSorted}.
public <T> Source<Pair<Out,T>,Mat> zip(Graph<SourceShape<T>,?> that)
Source
and the given one into a stream of tuples.
'''Emits when''' all of the inputs has an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' any upstream completes
'''Cancels when''' downstream cancels
that
- (undocumented)public <T,M,M2> Source<Pair<Out,T>,M2> zipMat(Graph<SourceShape<T>,M> that, Function2<Mat,M,M2> matF)
Source
and the given one into a stream of tuples.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)matF
- (undocumented)#zip}.
public <Out2,Out3> Source<Out3,Mat> zipWith(Graph<SourceShape<Out2>,?> that, Function2<Out,Out2,Out3> combine)
Source
and the given one
into a stream of combined elements using a combiner function.
'''Emits when''' all of the inputs has an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' any upstream completes
'''Cancels when''' downstream cancels
that
- (undocumented)combine
- (undocumented)public <Out2,Out3,M,M2> Source<Out3,M2> zipWithMat(Graph<SourceShape<Out2>,M> that, Function2<Out,Out2,Out3> combine, Function2<Mat,M,M2> matF)
Source
and the given one
into a stream of combined elements using a combiner function.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
that
- (undocumented)combine
- (undocumented)matF
- (undocumented)#zipWith}.
public Source<Pair<Out,java.lang.Object>,Mat> zipWithIndex()
Source
into a stream of tuples consisting
of all elements paired with their index. Indices start at 0.
'''Emits when''' upstream emits an element and is paired with their index
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
public java.util.concurrent.CompletionStage<Done> runForeach(Procedure<Out> f, Materializer materializer)
Source
with a foreach procedure. The given procedure is invoked
for each received element.
The returned CompletionStage
will be completed normally when reaching the
normal end of the stream, or completed exceptionally if there is a failure is signaled in
the stream.f
- (undocumented)materializer
- (undocumented)public <T> Source<T,Mat> map(Function<Out,T> f)
'''Emits when''' the mapping function returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
f
- (undocumented)public <T> Source<T,Mat> recover(scala.PartialFunction<java.lang.Throwable,T> pf)
Throwing an exception inside recover
_will_ be logged on ERROR level automatically.
'''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or upstream failed with exception pf can handle
'''Cancels when''' downstream cancels
pf
- (undocumented)public Source<Out,Mat> mapError(scala.PartialFunction<java.lang.Throwable,java.lang.Throwable> pf)
recover
this stage can be used to transform an error signal to a different one *without* logging
it as an error in the process. So in that sense it is NOT exactly equivalent to recover(t => throw t2)
since recover
would log the t2
error.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Similarily to recover
throwing an exception inside mapError
_will_ be logged.
'''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or upstream failed with exception pf can handle
'''Cancels when''' downstream cancels
pf
- (undocumented)public <T> Source<T,Mat> recoverWith(scala.PartialFunction<java.lang.Throwable,? extends Graph<SourceShape<T>,NotUsed>> pf)
pf
and a new
Source may be materialized.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Throwing an exception inside recoverWith
_will_ be logged on ERROR level automatically.
'''Emits when''' element is available from the upstream or upstream is failed and element is available from alternative Source
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or upstream failed with exception pf can handle
'''Cancels when''' downstream cancels
pf
- (undocumented)public <T> Source<T,Mat> recoverWithRetries(int attempts, scala.PartialFunction<java.lang.Throwable,? extends Graph<SourceShape<T>,NotUsed>> pf)
attempts
number of times so that each time there is a failure
it is fed into the pf
and a new Source may be materialized. Note that if you pass in 0, this won't
attempt to recover at all. Passing in a negative number will behave exactly the same as recoverWith
.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Throwing an exception inside recoverWithRetries
_will_ be logged on ERROR level automatically.
'''Emits when''' element is available from the upstream or upstream is failed and element is available from alternative Source
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or upstream failed with exception pf can handle
'''Cancels when''' downstream cancels
attempts
- (undocumented)pf
- (undocumented)public <T> Source<T,Mat> mapConcat(Function<Out,? extends java.lang.Iterable<T>> f)
Iterable
of output elements that is
then flattened into the output stream.
Make sure that the Iterable
is immutable or at least not modified after
being used as an output sequence. Otherwise the stream may fail with
ConcurrentModificationException
or other more subtle errors may occur.
The returned Iterable
MUST NOT contain null
values,
as they are illegal as stream elements - according to the Reactive Streams specification.
'''Emits when''' the mapping function returns an element or there are still remaining elements from the previously calculated collection
'''Backpressures when''' downstream backpressures or there are still remaining elements from the previously calculated collection
'''Completes when''' upstream completes and all remaining elements has been emitted
'''Cancels when''' downstream cancels
f
- (undocumented)public <T> Source<T,Mat> statefulMapConcat(Creator<Function<Out,java.lang.Iterable<T>>> f)
Iterable
of output elements that is
then flattened into the output stream. The transformation is meant to be stateful,
which is enabled by creating the transformation function anew for every materialization —
the returned function will typically close over mutable objects to store state between
invocations. For the stateless variant see mapConcat(akka.japi.function.Function<Out, ? extends java.lang.Iterable<T>>)
.
Make sure that the Iterable
is immutable or at least not modified after
being used as an output sequence. Otherwise the stream may fail with
ConcurrentModificationException
or other more subtle errors may occur.
The returned Iterable
MUST NOT contain null
values,
as they are illegal as stream elements - according to the Reactive Streams specification.
'''Emits when''' the mapping function returns an element or there are still remaining elements from the previously calculated collection
'''Backpressures when''' downstream backpressures or there are still remaining elements from the previously calculated collection
'''Completes when''' upstream completes and all remaining elements has been emitted
'''Cancels when''' downstream cancels
f
- (undocumented)public <T> Source<T,Mat> mapAsync(int parallelism, Function<Out,java.util.concurrent.CompletionStage<T>> f)
CompletionStage
and the
value of that future will be emitted downstream. The number of CompletionStages
that shall run in parallel is given as the first argument to
mapAsync
.
These CompletionStages may complete in any order, but the elements that
are emitted downstream are in the same order as received from upstream.
If the function f
throws an exception or if the CompletionStage
is completed
with failure and the supervision decision is Supervision.stop()
the stream will be completed with failure.
If the function f
throws an exception or if the CompletionStage
is completed
with failure and the supervision decision is Supervision.resume()
or
Supervision.restart()
the element is dropped and the stream continues.
The function f
is always invoked on the elements in the order they arrive.
'''Emits when''' the CompletionStage returned by the provided function finishes for the next element in sequence
'''Backpressures when''' the number of CompletionStages reaches the configured parallelism and the downstream backpressures or the first CompletionStage is not completed
'''Completes when''' upstream completes and all CompletionStages has been completed and all elements has been emitted
'''Cancels when''' downstream cancels
parallelism
- (undocumented)f
- (undocumented)mapAsyncUnordered(int, akka.japi.function.Function<Out, java.util.concurrent.CompletionStage<T>>)
public <T> Source<T,Mat> mapAsyncUnordered(int parallelism, Function<Out,java.util.concurrent.CompletionStage<T>> f)
CompletionStage
and the
value of that future will be emitted downstream. The number of CompletionStages
that shall run in parallel is given as the first argument to
mapAsyncUnordered
.
Each processed element will be emitted downstream as soon as it is ready, i.e. it is possible
that the elements are not emitted downstream in the same order as received from upstream.
If the function f
throws an exception or if the CompletionStage
is completed
with failure and the supervision decision is Supervision.stop()
the stream will be completed with failure.
If the function f
throws an exception or if the CompletionStage
is completed
with failure and the supervision decision is Supervision.resume()
or
Supervision.restart()
the element is dropped and the stream continues.
The function f
is always invoked on the elements in the order they arrive (even though the result of the CompletionStages
returned by f
might be emitted in a different order).
'''Emits when''' any of the CompletionStages returned by the provided function complete
'''Backpressures when''' the number of CompletionStages reaches the configured parallelism and the downstream backpressures
'''Completes when''' upstream completes and all CompletionStages has been completed and all elements has been emitted
'''Cancels when''' downstream cancels
parallelism
- (undocumented)f
- (undocumented)mapAsync(int, akka.japi.function.Function<Out, java.util.concurrent.CompletionStage<T>>)
public Source<Out,Mat> filter(Predicate<Out> p)
'''Emits when''' the given predicate returns true for the element
'''Backpressures when''' the given predicate returns true for the element and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
p
- (undocumented)public Source<Out,Mat> filterNot(Predicate<Out> p)
'''Emits when''' the given predicate returns false for the element
'''Backpressures when''' the given predicate returns false for the element and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
p
- (undocumented)public <T> Source<T,Mat> collect(scala.PartialFunction<Out,T> pf)
'''Emits when''' the provided partial function is defined for the element
'''Backpressures when''' the partial function is defined for the element and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
pf
- (undocumented)public Source<java.util.List<Out>,Mat> grouped(int n)
n
must be positive, otherwise IllegalArgumentException is thrown.
'''Emits when''' the specified number of elements has been accumulated or upstream completed
'''Backpressures when''' a group has been assembled and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
n
- (undocumented)public Source<Out,Mat> limit(int n)
StreamLimitException
downstream.
Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if n
is zero
or negative.
'''Emits when''' the specified number of elements to take has not yet been reached
'''Backpressures when''' downstream backpressures
'''Completes when''' the defined number of elements has been taken or upstream completes
'''Cancels when''' the defined number of elements has been taken or downstream cancels
See also Flow.take
, Flow.takeWithin
, Flow.takeWhile
n
- (undocumented)public Source<Out,Mat> limitWeighted(long n, Function<Out,java.lang.Object> costFn)
StreamLimitException
downstream.
Due to input buffering some elements may have been requested from upstream publishers that will then not be processed downstream of this step.
The stream will be completed without producing any elements if n
is zero
or negative.
'''Emits when''' the specified number of elements to take has not yet been reached
'''Backpressures when''' downstream backpressures
'''Completes when''' the defined number of elements has been taken or upstream completes
'''Cancels when''' the defined number of elements has been taken or downstream cancels
See also Flow.take
, Flow.takeWithin
, Flow.takeWhile
n
- (undocumented)costFn
- (undocumented)public Source<java.util.List<Out>,Mat> sliding(int n, int step)
n
must be positive, otherwise IllegalArgumentException is thrown.
step
must be positive, otherwise IllegalArgumentException is thrown.
'''Emits when''' enough elements have been collected within the window or upstream completed
'''Backpressures when''' a window has been assembled and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
n
- (undocumented)step
- (undocumented)public <T> Source<T,Mat> scan(T zero, Function2<T,Out,T> f)
fold
but is not a terminal operation,
emits its current value which starts at zero
and then
applies the current and next value to the given function f
,
emitting the next current value.
If the function f
throws an exception and the supervision decision is
Supervision.restart()
current value starts at zero
again
the stream will continue.
'''Emits when''' the function scanning the element returns a new element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
zero
- (undocumented)f
- (undocumented)public <T> Source<T,Mat> scanAsync(T zero, Function2<T,Out,java.util.concurrent.CompletionStage<T>> f)
scan
but with a asynchronous function,
emits its current value which starts at zero
and then
applies the current and next value to the given function f
,
emitting a Future
that resolves to the next current value.
If the function f
throws an exception and the supervision decision is
akka.stream.Supervision.Restart
current value starts at zero
again
the stream will continue.
If the function f
throws an exception and the supervision decision is
akka.stream.Supervision.Resume
current value starts at the previous
current value, or zero when it doesn't have one, and the stream will continue.
'''Emits when''' the future returned by f completes
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes and the last future returned by f
completes
'''Cancels when''' downstream cancels
See also FlowOps.scan
zero
- (undocumented)f
- (undocumented)public <T> Source<T,Mat> fold(T zero, Function2<T,Out,T> f)
scan
but only emits its result when the upstream completes,
after which it also completes. Applies the given function f
towards its current and next value,
yielding the next current value.
If the function f
throws an exception and the supervision decision is
Supervision.restart()
current value starts at zero
again
the stream will continue.
'''Emits when''' upstream completes
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
zero
- (undocumented)f
- (undocumented)public <T> Source<T,Mat> foldAsync(T zero, Function2<T,Out,java.util.concurrent.CompletionStage<T>> f)
fold
but with an asynchronous function.
Applies the given function towards its current and next value,
yielding the next current value.
If the function f
returns a failure and the supervision decision is
akka.stream.Supervision.Restart
current value starts at zero
again
the stream will continue.
'''Emits when''' upstream completes
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
zero
- (undocumented)f
- (undocumented)public Source<Out,Mat> reduce(Function2<Out,Out,Out> f)
fold
but uses first element as zero element.
Applies the given function towards its current and next value,
yielding the next current value.
'''Emits when''' upstream completes
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
f
- (undocumented)public <T> Source<T,Mat> intersperse(T start, T inject, T end)
scala.collection.immutable.List.mkString
injects a separator between a List's elements.
Additionally can inject start and end marker elements to stream.
Examples:
Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
nums.intersperse(","); // 1 , 2 , 3
nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
In case you want to only prepend or only append an element (yet still use the intercept
feature
to inject a separator between elements, you may want to use the following pattern instead of the 3-argument
version of intersperse (See Source.concat
for semantics details):
Source.single(">> ").concat(list.intersperse(","))
list.intersperse(",").concat(Source.single("END"))
'''Emits when''' upstream emits (or before with the start
element if provided)
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
start
- (undocumented)inject
- (undocumented)end
- (undocumented)public <T> Source<T,Mat> intersperse(T inject)
scala.collection.immutable.List.mkString
injects a separator between a List's elements.
Additionally can inject start and end marker elements to stream.
Examples:
Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
nums.intersperse(","); // 1 , 2 , 3
nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
'''Emits when''' upstream emits (or before with the start
element if provided)
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
inject
- (undocumented)public Source<java.util.List<Out>,Mat> groupedWithin(int n, scala.concurrent.duration.FiniteDuration d)
'''Emits when''' the configured time elapses since the last group has been emitted
'''Backpressures when''' the configured time elapses since the last group has been emitted
'''Completes when''' upstream completes (emits last group)
'''Cancels when''' downstream completes
n
must be positive, and d
must be greater than 0 seconds, otherwise
IllegalArgumentException is thrown.
n
- (undocumented)d
- (undocumented)public Source<Out,Mat> delay(scala.concurrent.duration.FiniteDuration of, DelayOverflowStrategy strategy)
DelayOverflowStrategy
it might drop elements or backpressure the upstream if
there is no space available in the buffer.
Delay precision is 10ms to avoid unnecessary timer scheduling cycles
Internal buffer has default capacity 16. You can set buffer size by calling withAttributes(inputBuffer)
'''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * EmitEarly - strategy do not wait to emit element if buffer is full
'''Backpressures when''' depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full
'''Completes when''' upstream completes and buffered elements has been drained
'''Cancels when''' downstream cancels
of
- time to shift all messagesstrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic Source<Out,Mat> drop(long n)
n
is zero or negative.
'''Emits when''' the specified number of elements has been dropped already
'''Backpressures when''' the specified number of elements has been dropped and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
n
- (undocumented)public Source<Out,Mat> dropWithin(scala.concurrent.duration.FiniteDuration d)
'''Emits when''' the specified time elapsed and a new upstream element arrives
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
d
- (undocumented)public Source<Out,Mat> takeWhile(Predicate<Out> p)
The stream will be completed without producing any elements if predicate is false for the first stream element.
'''Emits when''' the predicate is true
'''Backpressures when''' downstream backpressures
'''Completes when''' predicate returned false or upstream completes
'''Cancels when''' predicate returned false or downstream cancels
p
- (undocumented)public Source<Out,Mat> dropWhile(Predicate<Out> p)
'''Emits when''' predicate returned false and for all following stream elements
'''Backpressures when''' predicate returned false and downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
p
- predicate is evaluated for each new element until first time returns falsepublic Source<Out,Mat> take(long n)
The stream will be completed without producing any elements if n
is zero
or negative.
'''Emits when''' the specified number of elements to take has not yet been reached
'''Backpressures when''' downstream backpressures
'''Completes when''' the defined number of elements has been taken or upstream completes
'''Cancels when''' the defined number of elements has been taken or downstream cancels
n
- (undocumented)public Source<Out,Mat> takeWithin(scala.concurrent.duration.FiniteDuration d)
Note that this can be combined with take(long)
to limit the number of elements
within the duration.
'''Emits when''' an upstream element arrives
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or timer fires
'''Cancels when''' downstream cancels or timer fires
d
- (undocumented)public <S> Source<S,Mat> conflateWithSeed(Function<Out,S> seed, Function2<S,Out,S> aggregate)
This version of conflate allows to derive a seed from the first element and change the aggregated type to be
different than the input type. See Flow.conflate
for a simpler version that does not change types.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
'''Emits when''' downstream stops backpressuring and there is a conflated element available
'''Backpressures when''' never
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
see also Source.conflate
Source.batch
Source.batchWeighted
seed
- Provides the first state for a conflated value using the first unconsumed element as a startaggregate
- Takes the currently aggregated value and the current pending element to produce a new aggregatepublic <O2> Source<O2,Mat> conflate(Function2<O2,O2,O2> aggregate)
Source.conflateWithSeed
for a
more flexible version that can take a seed function and transform elements while rolling up.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
'''Emits when''' downstream stops backpressuring and there is a conflated element available
'''Backpressures when''' never
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
see also Source.conflateWithSeed
Source.batch
Source.batchWeighted
aggregate
- Takes the currently aggregated value and the current pending element to produce a new aggregatepublic <S> Source<S,Mat> batch(long max, Function<Out,S> seed, Function2<S,Out,S> aggregate)
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
'''Emits when''' downstream stops backpressuring and there is an aggregated element available
'''Backpressures when''' there are max
batched elements and 1 pending element and downstream backpressures
'''Completes when''' upstream completes and there is no batched/pending element waiting
'''Cancels when''' downstream cancels
See also Source.conflate
, Source.batchWeighted
max
- maximum number of elements to batch before backpressuring upstream (must be positive non-zero)seed
- Provides the first state for a batched value using the first unconsumed element as a startaggregate
- Takes the currently batched value and the current pending element to produce a new aggregatepublic <S> Source<S,Mat> batchWeighted(long max, Function<Out,java.lang.Object> costFn, Function<Out,S> seed, Function2<S,Out,S> aggregate)
ByteString
elements up to the allowed max limit if the upstream publisher is faster.
This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not duplicate elements.
Batching will apply for all elements, even if a single element cost is greater than the total allowed limit.
In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after
being applied with the seed
function) without batching further elements with it, and then the rest of the
incoming elements are batched.
'''Emits when''' downstream stops backpressuring and there is a batched element available
'''Backpressures when''' there are max
weighted batched elements + 1 pending element and downstream backpressures
'''Completes when''' upstream completes and there is no batched/pending element waiting
'''Cancels when''' downstream cancels
See also Source.conflate
, Source.batch
max
- maximum weight of elements to batch before backpressuring upstream (must be positive non-zero)costFn
- a function to compute a single element weightseed
- Provides the first state for a batched value using the first unconsumed element as a startaggregate
- Takes the currently batched value and the current pending element to produce a new batchpublic <U> Source<U,Mat> expand(Function<Out,java.util.Iterator<U>> extrapolate)
This element will never "drop" upstream elements as all elements go through at least one extrapolation step. This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber.
Expand does not support Supervision.restart()
and Supervision.resume()
.
Exceptions from the seed
or extrapolate
functions will complete the stream with failure.
'''Emits when''' downstream stops backpressuring
'''Backpressures when''' downstream backpressures or iterator runs empty
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
extrapolate
- Takes the current extrapolation state to produce an output element and the next extrapolation
state.public Source<Out,Mat> buffer(int size, OverflowStrategy overflowStrategy)
OverflowStrategy
it might drop elements or backpressure the upstream if
there is no space available
'''Emits when''' downstream stops backpressuring and there is a pending element in the buffer
'''Backpressures when''' depending on OverflowStrategy * Backpressure - backpressures when buffer is full * DropHead, DropTail, DropBuffer - never backpressures * Fail - fails the stream if buffer gets full
'''Completes when''' upstream completes and buffered elements has been drained
'''Cancels when''' downstream cancels
size
- The size of the buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic <U> Source<U,Mat> transform(Creator<Stage<Out,U>> mkStage)
Stage
.
This operator makes it possible to extend the Flow
API when there is no specialized
operator that performs the transformation.mkStage
- (undocumented)public Source<Pair<java.util.List<Out>,Source<Out,NotUsed>>,Mat> prefixAndTail(int n)
n
elements from the stream (less than n
if the upstream completes before emitting n
elements)
and returns a pair containing a strict sequence of the taken element
and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
of an empty collection and a stream containing the whole upstream unchanged.
In case of an upstream error, depending on the current state
- the master stream signals the error if less than n
elements has been seen, and therefore the substream
has not yet been emitted
- the tail substream signals the error after the prefix and tail has been emitted by the main stream
(at that point the main stream has already completed)
'''Emits when''' the configured number of prefix elements are available. Emits this prefix, and the rest as a substream
'''Backpressures when''' downstream backpressures or substream backpressures
'''Completes when''' prefix elements has been consumed and substream has been consumed
'''Cancels when''' downstream cancels or substream cancels
n
- (undocumented)public <K> SubSource<Out,Mat> groupBy(int maxSubstreams, Function<Out,K> f)
The object returned from this method is not a normal Flow
,
it is a SubSource
. This means that after this combinator all transformations
are applied to all encountered substreams in the same fashion. Substream mode
is exited either by closing the substream (i.e. connecting it to a Sink
)
or by merging the substreams back together; see the to
and mergeBack
methods
on SubSource
for more information.
It is important to note that the substreams also propagate back-pressure as
any other stream, which means that blocking one substream will block the groupBy
operator itself—and thereby all substreams—once all internal or
explicit buffers are filled.
If the group by function f
throws an exception and the supervision decision
is Supervision.stop()
the stream and substreams will be completed
with failure.
If the group by function f
throws an exception and the supervision decision
is Supervision.resume()
or Supervision.restart()
the element is dropped and the stream and substreams continue.
'''Emits when''' an element for which the grouping function returns a group that has not yet been created. Emits the new group
'''Backpressures when''' there is an element pending for a group whose substream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels and all substreams cancel
maxSubstreams
- configures the maximum number of substreams (keys)
that are supported; if more distinct keys are encountered then the stream failsf
- (undocumented)public SubSource<Out,Mat> splitWhen(Predicate<Out> p)
false, // element goes into first substream
true, false, // elements go into second substream
true, false, false // elements go into third substream
In case the *first* element of the stream matches the predicate, the first substream emitted by splitWhen will start from that element. For example:
true, false, false // first substream starts from the split-by element
true, false // subsequent substreams operate the same way
The object returned from this method is not a normal Flow
,
it is a SubSource
. This means that after this combinator all transformations
are applied to all encountered substreams in the same fashion. Substream mode
is exited either by closing the substream (i.e. connecting it to a Sink
)
or by merging the substreams back together; see the to
and mergeBack
methods
on SubSource
for more information.
It is important to note that the substreams also propagate back-pressure as
any other stream, which means that blocking one substream will block the splitWhen
operator itself—and thereby all substreams—once all internal or
explicit buffers are filled.
If the split predicate p
throws an exception and the supervision decision
is akka.stream.Supervision.Stop
the stream and substreams will be completed
with failure.
If the split predicate p
throws an exception and the supervision decision
is akka.stream.Supervision.Resume
or akka.stream.Supervision.Restart
the element is dropped and the stream and substreams continue.
'''Emits when''' an element for which the provided predicate is true, opening and emitting a new substream for subsequent element
'''Backpressures when''' there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels and substreams cancel
See also Source.splitAfter
.
p
- (undocumented)public <U> SubSource<Out,Mat> splitAfter(Predicate<Out> p)
false, true, // elements go into first substream
false, true, // elements go into second substream
false, false, true // elements go into third substream
The object returned from this method is not a normal Flow
,
it is a SubSource
. This means that after this combinator all transformations
are applied to all encountered substreams in the same fashion. Substream mode
is exited either by closing the substream (i.e. connecting it to a Sink
)
or by merging the substreams back together; see the to
and mergeBack
methods
on SubSource
for more information.
It is important to note that the substreams also propagate back-pressure as
any other stream, which means that blocking one substream will block the splitAfter
operator itself—and thereby all substreams—once all internal or
explicit buffers are filled.
If the split predicate p
throws an exception and the supervision decision
is akka.stream.Supervision.Stop
the stream and substreams will be completed
with failure.
If the split predicate p
throws an exception and the supervision decision
is akka.stream.Supervision.Resume
or akka.stream.Supervision.Restart
the element is dropped and the stream and substreams continue.
'''Emits when''' an element passes through. When the provided predicate is true it emits the element and opens a new substream for subsequent element
'''Backpressures when''' there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels and substreams cancel
See also Source.splitWhen
.
p
- (undocumented)public <T,M> Source<T,Mat> flatMapConcat(Function<Out,? extends Graph<SourceShape<T>,M>> f)
Source
of output elements that is
then flattened into the output stream by concatenation,
fully consuming one Source after the other.
'''Emits when''' a currently consumed substream has an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes and all consumed substreams complete
'''Cancels when''' downstream cancels
f
- (undocumented)public <T,M> Source<T,Mat> flatMapMerge(int breadth, Function<Out,? extends Graph<SourceShape<T>,M>> f)
Source
of output elements that is
then flattened into the output stream by merging, where at most breadth
substreams are being consumed at any given time.
'''Emits when''' a currently consumed substream has an element available
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes and all consumed substreams complete
'''Cancels when''' downstream cancels
breadth
- (undocumented)f
- (undocumented)public Source<Out,Mat> initialTimeout(scala.concurrent.duration.FiniteDuration timeout)
TimeoutException
.
'''Emits when''' upstream emits an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or fails if timeout elapses before first element arrives
'''Cancels when''' downstream cancels
timeout
- (undocumented)public Source<Out,Mat> completionTimeout(scala.concurrent.duration.FiniteDuration timeout)
TimeoutException
.
'''Emits when''' upstream emits an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or fails if timeout elapses before upstream completes
'''Cancels when''' downstream cancels
timeout
- (undocumented)public Source<Out,Mat> idleTimeout(scala.concurrent.duration.FiniteDuration timeout)
TimeoutException
. The timeout is checked periodically,
so the resolution of the check is one period (equals to timeout value).
'''Emits when''' upstream emits an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
'''Cancels when''' downstream cancels
timeout
- (undocumented)public Source<Out,Mat> backpressureTimeout(scala.concurrent.duration.FiniteDuration timeout)
TimeoutException
. The timeout is checked periodically,
so the resolution of the check is one period (equals to timeout value).
'''Emits when''' upstream emits an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
'''Cancels when''' downstream cancels
timeout
- (undocumented)public <U> Source<U,Mat> keepAlive(scala.concurrent.duration.FiniteDuration maxIdle, Creator<U> injectedElem)
If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements do not accumulate during this period.
Upstream elements are always preferred over injected elements.
'''Emits when''' upstream emits an element or if the upstream was idle for the configured period
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
maxIdle
- (undocumented)injectedElem
- (undocumented)public Source<Out,Mat> throttle(int elements, scala.concurrent.duration.FiniteDuration per, int maximumBurst, ThrottleMode mode)
elements/per
. In other words, this stage set the maximum rate
for emitting messages. This combinator works for streams where all elements have the same cost or length.
Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
Tokens drops into the bucket at a given rate and can be spared
for later use up to bucket capacity
to allow some burstiness. Whenever stream wants to send an element, it takes as many
tokens from the bucket as number of elements. If there isn't any, throttle waits until the
bucket accumulates enough tokens. Bucket is full when stream just materialized and started.
Parameter mode
manages behaviour when upstream is faster than throttle rate:
- akka.stream.ThrottleMode.Shaping
makes pauses before emitting messages to meet throttle rate
- akka.stream.ThrottleMode.Enforcing
fails with exception when upstream is faster than throttle rate
It is recommended to use non-zero burst sizes as they improve both performance and throttling precision by allowing the implementation to avoid using the scheduler when input rates fall below the enforced limit and to reduce most of the inaccuracy caused by the scheduler resolution (which is in the range of milliseconds).
Throttler always enforces the rate limit, but in certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. This can be also mitigated by increasing the burst size.
'''Emits when''' upstream emits an element and configured time per each element elapsed
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
elements
- (undocumented)per
- (undocumented)maximumBurst
- (undocumented)mode
- (undocumented)public Source<Out,Mat> throttle(int cost, scala.concurrent.duration.FiniteDuration per, int maximumBurst, Function<Out,java.lang.Integer> costCalculation, ThrottleMode mode)
cost/per
. Cost is
calculating for each element individually by calling calculateCost
function.
This combinator works for streams when elements have different cost(length).
Streams of ByteString
for example.
Throttle implements the token bucket model. There is a bucket with a given token capacity (burst size or maximumBurst).
Tokens drops into the bucket at a given rate and can be spared
for later use up to bucket capacity
to allow some burstiness. Whenever stream wants to send an element, it takes as many
tokens from the bucket as element cost. If there isn't any, throttle waits until the
bucket accumulates enough tokens. Elements that costs more than the allowed burst will be delayed proportionally
to their cost minus available tokens, meeting the target rate.
Parameter mode
manages behaviour when upstream is faster than throttle rate:
- akka.stream.ThrottleMode.Shaping
makes pauses before emitting messages to meet throttle rate
- akka.stream.ThrottleMode.Enforcing
fails with exception when upstream is faster than throttle rate. Enforcing
cannot emit elements that cost more than the maximumBurst
'''Emits when''' upstream emits an element and configured time per each element elapsed
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
cost
- (undocumented)per
- (undocumented)maximumBurst
- (undocumented)costCalculation
- (undocumented)mode
- (undocumented)public Source<Out,Mat> detach()
'''Emits when''' upstream emits an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
public <M> Source<Out,M> watchTermination(Function2<Mat,java.util.concurrent.CompletionStage<Done>,M> matF)
Future[Done]
that completes on getting termination message.
The Future completes with success when received complete message from upstream or cancel
from downstream. It fails with the same error when received error message from
downstream.matF
- (undocumented)public <M> Source<Out,M> monitor(Function2<Mat,FlowMonitor<Out>,M> combine)
FlowMonitor[Out]
that allows monitoring of the current flow. All events are propagated
by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
event, and may therefor affect performance.
The combine
function is used to combine the FlowMonitor
with this flow's materialized value.combine
- (undocumented)public Source<Out,Mat> initialDelay(scala.concurrent.duration.FiniteDuration delay)
'''Emits when''' upstream emits an element if the initial delay is already elapsed
'''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
delay
- (undocumented)public Source<Out,Mat> withAttributes(Attributes attr)
Source
to the given ones and seal the list
of attributes. This means that further calls will not be able to remove these
attributes, but instead add new ones. Note that this
operation has no effect on an empty Flow (because the attributes apply
only to the contained processing stages).withAttributes
in interface Graph<SourceShape<Out>,Mat>
attr
- (undocumented)public Source<Out,Mat> addAttributes(Attributes attr)
withAttributes
will not remove these attributes. Note that this
operation has no effect on an empty Flow (because the attributes apply
only to the contained processing stages).addAttributes
in interface Graph<SourceShape<Out>,Mat>
attr
- (undocumented)public Source<Out,Mat> named(java.lang.String name)
name
attribute to this Source.named
in interface Graph<SourceShape<Out>,Mat>
name
- (undocumented)public Source<Out,Mat> async()
Source
async
in interface Graph<SourceShape<Out>,Mat>
public Source<Out,Mat> log(java.lang.String name, Function<Out,java.lang.Object> extract, LoggingAdapter log)
By default element and completion signals are logged on debug level, and errors are logged on Error level.
This can be adjusted according to your needs by providing a custom Attributes.LogLevels
attribute on the given Flow:
The extract
function will be applied to each element before logging, so it is possible to log only those fields
of a complex object flowing through this element.
Uses the given LoggingAdapter
for logging.
'''Emits when''' the mapping function returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
name
- (undocumented)extract
- (undocumented)log
- (undocumented)public Source<Out,Mat> log(java.lang.String name, Function<Out,java.lang.Object> extract)
By default element and completion signals are logged on debug level, and errors are logged on Error level.
This can be adjusted according to your needs by providing a custom Attributes.LogLevels
attribute on the given Flow:
The extract
function will be applied to each element before logging, so it is possible to log only those fields
of a complex object flowing through this element.
Uses an internally created LoggingAdapter
which uses akka.stream.Log
as it's source (use this class to configure slf4j loggers).
'''Emits when''' the mapping function returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
name
- (undocumented)extract
- (undocumented)public Source<Out,Mat> log(java.lang.String name, LoggingAdapter log)
By default element and completion signals are logged on debug level, and errors are logged on Error level.
This can be adjusted according to your needs by providing a custom Attributes.LogLevels
attribute on the given Flow:
Uses the given LoggingAdapter
for logging.
'''Emits when''' the mapping function returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
name
- (undocumented)log
- (undocumented)public Source<Out,Mat> log(java.lang.String name)
By default element and completion signals are logged on debug level, and errors are logged on Error level.
This can be adjusted according to your needs by providing a custom Attributes.LogLevels
attribute on the given Flow:
Uses an internally created LoggingAdapter
which uses akka.stream.Log
as it's source (use this class to configure slf4j loggers).
'''Emits when''' the mapping function returns an element
'''Backpressures when''' downstream backpressures
'''Completes when''' upstream completes
'''Cancels when''' downstream cancels
name
- (undocumented)