public final class Source<Out,Mat> extends java.lang.Object implements FlowOpsMat<Out,Mat>, Graph<SourceShape<Out>,Mat>
Source
is a set of stream processing steps that has one open output. It can comprise
any number of internal sources and transformations that are wired together, or it can be
an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into
a Reactive Streams Publisher
(at least conceptually).Constructor and Description |
---|
Source(StreamLayout.Module module) |
Modifier and Type | Method and Description |
---|---|
static <U,M> FlowOps |
$plus$plus(Graph<SourceShape<U>,M> that) |
static <T> Source<T,ActorRef> |
actorPublisher(Props props)
|
static <T> Source<T,ActorRef> |
actorRef(int bufferSize,
OverflowStrategy overflowStrategy)
Creates a
Source that is materialized as an ActorRef . |
Source<Out,Mat> |
addAttributes(Attributes attr)
Add the given attributes to this Source.
|
static FlowOps |
alsoTo(Graph<SinkShape<Out>,?> that) |
protected static <M> Graph<FlowShape<Out,Out>,M> |
alsoToGraph(Graph<SinkShape<Out>,M> that) |
static <Mat2,Mat3> |
alsoToMat(Graph<SinkShape<Out>,Mat2> that,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <T> FlowOps |
andThen(Stages.SymbolicStage<Out,T> op) |
static <T> Source<T,NotUsed> |
apply(scala.collection.immutable.Iterable<T> iterable)
Helper to create
Source from Iterable . |
Source<Out,Mat> |
asJava()
Converts this Scala DSL element to it's Java DSL counterpart.
|
static <T> Source<T,org.reactivestreams.Subscriber<T>> |
asSubscriber()
Creates a
Source that is materialized as a Subscriber |
Source<Out,Mat> |
async()
Put an asynchronous boundary around this
Source |
static FlowOps |
backpressureTimeout(scala.concurrent.duration.FiniteDuration timeout) |
static <S> FlowOps |
batch(long max,
scala.Function1<Out,S> seed,
scala.Function2<S,Out,S> aggregate) |
static <S> FlowOps |
batchWeighted(long max,
scala.Function1<Out,java.lang.Object> costFn,
scala.Function1<Out,S> seed,
scala.Function2<S,Out,S> aggregate) |
static FlowOps |
buffer(int size,
OverflowStrategy overflowStrategy) |
static <T> FlowOps |
collect(scala.PartialFunction<Out,T> pf) |
<T,U> Source<U,NotUsed> |
combine(Source<T,?> first,
Source<T,?> second,
scala.collection.Seq<Source<T,?>> rest,
scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy)
Combines several sources with fun-in strategy like
Merge or Concat and returns Source . |
static FlowOps |
completionTimeout(scala.concurrent.duration.FiniteDuration timeout) |
static <U,Mat2> FlowOps |
concat(Graph<SourceShape<U>,Mat2> that) |
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> |
concatGraph(Graph<SourceShape<U>,Mat2> that) |
static <U,Mat2,Mat3> |
concatMat(Graph<SourceShape<U>,Mat2> that,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <O2> FlowOps |
conflate(scala.Function2<O2,O2,O2> aggregate) |
static <S> FlowOps |
conflateWithSeed(scala.Function1<Out,S> seed,
scala.Function2<S,Out,S> aggregate) |
static <T> Source<T,NotUsed> |
cycle(scala.Function0<scala.collection.Iterator<T>> f)
Create
Source that will continually produce given elements in specified order. |
static FlowOps |
delay(scala.concurrent.duration.FiniteDuration of,
DelayOverflowStrategy strategy) |
static DelayOverflowStrategy |
delay$default$2() |
static FlowOps |
detach() |
static FlowOps |
drop(long n) |
static FlowOps |
dropWhile(scala.Function1<Out,java.lang.Object> p) |
static FlowOps |
dropWithin(scala.concurrent.duration.FiniteDuration d) |
static <T> Source<T,NotUsed> |
empty()
A
Source with no elements, i.e. |
static <U> FlowOps |
expand(scala.Function1<Out,scala.collection.Iterator<U>> extrapolate) |
static <T> Source<T,NotUsed> |
failed(java.lang.Throwable cause)
Create a
Source that immediately ends the stream with the cause error to every connected Sink . |
static FlowOps |
filter(scala.Function1<Out,java.lang.Object> p) |
static FlowOps |
filterNot(scala.Function1<Out,java.lang.Object> p) |
static <T,M> FlowOps |
flatMapConcat(scala.Function1<Out,Graph<SourceShape<T>,M>> f) |
static <T,M> FlowOps |
flatMapMerge(int breadth,
scala.Function1<Out,Graph<SourceShape<T>,M>> f) |
static <T> FlowOps |
fold(T zero,
scala.Function2<T,Out,T> f) |
static <T> FlowOps |
foldAsync(T zero,
scala.Function2<T,Out,scala.concurrent.Future<T>> f) |
static <T> Source<T,NotUsed> |
fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Start a new
Source from the given Future . |
static <T> Source<T,NotUsed> |
fromFuture(scala.concurrent.Future<T> future)
Start a new
Source from the given Future . |
static <T,M> Source<T,M> |
fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes
it so also in type.
|
static <T> Source<T,NotUsed> |
fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Helper to create
Source from Iterator . |
static <T> Source<T,NotUsed> |
fromPublisher(org.reactivestreams.Publisher<T> publisher)
Helper to create
Source from Publisher . |
static <K> SubFlow<Out,Mat,FlowOps,java.lang.Object> |
groupBy(int maxSubstreams,
scala.Function1<Out,K> f) |
static FlowOps |
grouped(int n) |
static FlowOps |
groupedWithin(int n,
scala.concurrent.duration.FiniteDuration d) |
static FlowOps |
idleTimeout(scala.concurrent.duration.FiniteDuration timeout) |
static FlowOps |
initialDelay(scala.concurrent.duration.FiniteDuration delay) |
static FlowOps |
initialTimeout(scala.concurrent.duration.FiniteDuration timeout) |
static <U> FlowOps |
interleave(Graph<SourceShape<U>,?> that,
int segmentSize) |
protected static <U,M> Graph<FlowShape<Out,U>,M> |
interleaveGraph(Graph<SourceShape<U>,M> that,
int segmentSize) |
static <U,Mat2,Mat3> |
interleaveMat(Graph<SourceShape<U>,Mat2> that,
int request,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <T> FlowOps |
intersperse(T inject) |
static <T> FlowOps |
intersperse(T start,
T inject,
T end) |
static <U> FlowOps |
keepAlive(scala.concurrent.duration.FiniteDuration maxIdle,
scala.Function0<U> injectedElem) |
static <T,M> Source<T,scala.concurrent.Future<M>> |
lazily(scala.Function0<Source<T,M>> create)
Creates a
Source that is not materialized until there is downstream demand, when the source gets materialized
the materialized future is completed with its value, if downstream cancels or fails without any demand the
create factory is never called and the materialized Future is failed. |
static FlowOps |
limit(long max) |
static <T> FlowOps |
limitWeighted(long max,
scala.Function1<Out,java.lang.Object> costFn) |
static FlowOps |
log(java.lang.String name,
scala.Function1<Out,java.lang.Object> extract,
LoggingAdapter log) |
static scala.Function1<Out,java.lang.Object> |
log$default$2() |
static LoggingAdapter |
log$default$3(java.lang.String name,
scala.Function1<Out,java.lang.Object> extract) |
static <T> FlowOps |
map(scala.Function1<Out,T> f) |
static <T> FlowOps |
mapAsync(int parallelism,
scala.Function1<Out,scala.concurrent.Future<T>> f) |
static <T> FlowOps |
mapAsyncUnordered(int parallelism,
scala.Function1<Out,scala.concurrent.Future<T>> f) |
static <T> FlowOps |
mapConcat(scala.Function1<Out,scala.collection.immutable.Iterable<T>> f) |
static FlowOps |
mapError(scala.PartialFunction<java.lang.Throwable,java.lang.Throwable> pf) |
<Mat2> Source<Out,Mat2> |
mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform only the materialized value of this Source, leaving all other properties as they were.
|
static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>> |
maybe()
Create a
Source which materializes a Promise which controls what element
will be emitted by the Source. |
static <U,M> FlowOps |
merge(Graph<SourceShape<U>,M> that,
boolean eagerComplete) |
static <U,M> boolean |
merge$default$2() |
protected static <U,M> Graph<FlowShape<Out,U>,M> |
mergeGraph(Graph<SourceShape<U>,M> that,
boolean eagerComplete) |
static <U,Mat2,Mat3> |
mergeMat(Graph<SourceShape<U>,Mat2> that,
boolean eagerComplete,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <U,Mat2,Mat3> |
mergeMat$default$2() |
static <U,M> FlowOps |
mergeSorted(Graph<SourceShape<U>,M> that,
scala.math.Ordering<U> ord) |
protected static <U,M> Graph<FlowShape<Out,U>,M> |
mergeSortedGraph(Graph<SourceShape<U>,M> that,
scala.math.Ordering<U> ord) |
static <U,Mat2,Mat3> |
mergeSortedMat(Graph<SourceShape<U>,Mat2> that,
scala.Function2<Mat,Mat2,Mat3> matF,
scala.math.Ordering<U> ord) |
StreamLayout.Module |
module()
INTERNAL API.
|
static <Mat2> FlowOpsMat |
monitor(scala.Function2<Mat,FlowMonitor<Out>,Mat2> combine) |
Source<Out,Mat> |
named(java.lang.String name)
Add a
name attribute to this Source. |
static <U,Mat2> FlowOps |
orElse(Graph<SourceShape<U>,Mat2> secondary) |
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> |
orElseGraph(Graph<SourceShape<U>,Mat2> secondary) |
static <U,Mat2,Mat3> |
orElseMat(Graph<SourceShape<U>,Mat2> secondary,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <U> FlowOps |
prefixAndTail(int n) |
static <U,Mat2> FlowOps |
prepend(Graph<SourceShape<U>,Mat2> that) |
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> |
prependGraph(Graph<SourceShape<U>,Mat2> that) |
static <U,Mat2,Mat3> |
prependMat(Graph<SourceShape<U>,Mat2> that,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <T> Source<T,SourceQueueWithComplete<T>> |
queue(int bufferSize,
OverflowStrategy overflowStrategy)
Creates a
Source that is materialized as an SourceQueue . |
static <T> FlowOps |
recover(scala.PartialFunction<java.lang.Throwable,T> pf) |
static <T> FlowOps |
recoverWith(scala.PartialFunction<java.lang.Throwable,Graph<SourceShape<T>,NotUsed>> pf) |
static <T> FlowOps |
recoverWithRetries(int attempts,
scala.PartialFunction<java.lang.Throwable,Graph<SourceShape<T>,NotUsed>> pf) |
static <T> FlowOps |
reduce(scala.Function2<T,T,T> f) |
static <T> Source<T,NotUsed> |
repeat(T element)
Create a
Source that will continually emit the given element. |
<U> scala.concurrent.Future<U> |
runFold(U zero,
scala.Function2<U,Out,U> f,
Materializer materializer)
Shortcut for running this
Source with a fold function. |
<U> scala.concurrent.Future<U> |
runFoldAsync(U zero,
scala.Function2<U,Out,scala.concurrent.Future<U>> f,
Materializer materializer)
Shortcut for running this
Source with a foldAsync function. |
scala.concurrent.Future<Done> |
runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f,
Materializer materializer)
Shortcut for running this
Source with a foreach procedure. |
<U> scala.concurrent.Future<U> |
runReduce(scala.Function2<U,U,U> f,
Materializer materializer)
Shortcut for running this
Source with a reduce function. |
<Mat2> Mat2 |
runWith(Graph<SinkShape<Out>,Mat2> sink,
Materializer materializer)
Connect this
Source to a Sink and run it. |
static <T> FlowOps |
scan(T zero,
scala.Function2<T,Out,T> f) |
static <T> FlowOps |
scanAsync(T zero,
scala.Function2<T,Out,scala.concurrent.Future<T>> f) |
SourceShape<Out> |
shape()
The shape of a graph is all that is externally visible: its inlets and outlets.
|
static <T> Source<T,NotUsed> |
single(T element)
Create a
Source with one element. |
static FlowOps |
sliding(int n,
int step) |
static int |
sliding$default$2() |
static SubFlow<Out,Mat,FlowOps,java.lang.Object> |
splitAfter(scala.Function1<Out,java.lang.Object> p) |
static SubFlow<Out,Mat,FlowOps,java.lang.Object> |
splitAfter(SubstreamCancelStrategy substreamCancelStrategy,
scala.Function1<Out,java.lang.Object> p) |
static SubFlow<Out,Mat,FlowOps,java.lang.Object> |
splitWhen(scala.Function1<Out,java.lang.Object> p) |
static SubFlow<Out,Mat,FlowOps,java.lang.Object> |
splitWhen(SubstreamCancelStrategy substreamCancelStrategy,
scala.Function1<Out,java.lang.Object> p) |
static <T> FlowOps |
statefulMapConcat(scala.Function0<scala.Function1<Out,scala.collection.immutable.Iterable<T>>> f) |
static FlowOps |
take(long n) |
static FlowOps |
takeWhile(scala.Function1<Out,java.lang.Object> p) |
static FlowOps |
takeWhile(scala.Function1<Out,java.lang.Object> p,
boolean inclusive) |
static FlowOps |
takeWithin(scala.concurrent.duration.FiniteDuration d) |
static FlowOps |
throttle(int cost,
scala.concurrent.duration.FiniteDuration per,
int maximumBurst,
scala.Function1<Out,java.lang.Object> costCalculation,
ThrottleMode mode) |
static FlowOps |
throttle(int elements,
scala.concurrent.duration.FiniteDuration per,
int maximumBurst,
ThrottleMode mode) |
static <T> Source<T,Cancellable> |
tick(scala.concurrent.duration.FiniteDuration initialDelay,
scala.concurrent.duration.FiniteDuration interval,
T tick)
Elements are emitted periodically with the specified interval.
|
<Mat2> RunnableGraph<Mat> |
to(Graph<SinkShape<Out>,Mat2> sink)
|
<Mat2,Mat3> |
toMat(Graph<SinkShape<Out>,Mat2> sink,
scala.Function2<Mat,Mat2,Mat3> combine)
|
java.lang.String |
toString() |
static <T> FlowOps |
transform(scala.Function0<Stage<Out,T>> mkStage) |
static <T,M> FlowOpsMat |
transformMaterializing(scala.Function0<scala.Tuple2<Stage<Out,T>,M>> mkStageAndMaterialized) |
static <S,E> Source<E,NotUsed> |
unfold(S s,
scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Create a
Source that will unfold a value of type S into
a pair of the next state S and output elements of type E . |
static <S,E> Source<E,NotUsed> |
unfoldAsync(S s,
scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
Same as
unfold , but uses an async function to generate the next state-element tuple. |
static <T,S> Source<T,NotUsed> |
unfoldResource(scala.Function0<S> create,
scala.Function1<S,scala.Option<T>> read,
scala.Function1<S,scala.runtime.BoxedUnit> close)
Start a new
Source from some resource which can be opened, read and closed. |
static <T,S> Source<T,NotUsed> |
unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create,
scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read,
scala.Function1<S,scala.concurrent.Future<Done>> close)
Start a new
Source from some resource which can be opened, read and closed. |
<T,Mat2> Source<T,Mat> |
via(Graph<FlowShape<Out,T>,Mat2> flow) |
<T,Mat2,Mat3> |
viaMat(Graph<FlowShape<Out,T>,Mat2> flow,
scala.Function2<Mat,Mat2,Mat3> combine)
Transform this
Flow by appending the given processing steps. |
static <Mat2> FlowOpsMat |
watchTermination(scala.Function2<Mat,scala.concurrent.Future<Done>,Mat2> matF) |
Source<Out,Mat> |
withAttributes(Attributes attr)
Change the attributes of this
Source to the given ones and seal the list
of attributes. |
static <U> FlowOps |
zip(Graph<SourceShape<U>,?> that) |
protected static <U,M> Graph<FlowShape<Out,scala.Tuple2<Out,U>>,M> |
zipGraph(Graph<SourceShape<U>,M> that) |
static <U,Mat2,Mat3> |
zipMat(Graph<SourceShape<U>,Mat2> that,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <T> Source<scala.collection.immutable.Seq<T>,NotUsed> |
zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of sequences.
|
static <Out2,Out3> |
zipWith(Graph<SourceShape<Out2>,?> that,
scala.Function2<Out,Out2,Out3> combine) |
protected static <Out2,Out3,M> |
zipWithGraph(Graph<SourceShape<Out2>,M> that,
scala.Function2<Out,Out2,Out3> combine) |
static FlowOps |
zipWithIndex() |
static <Out2,Out3,Mat2,Mat3> |
zipWithMat(Graph<SourceShape<Out2>,Mat2> that,
scala.Function2<Out,Out2,Out3> combine,
scala.Function2<Mat,Mat2,Mat3> matF) |
static <T,O> Source<O,NotUsed> |
zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper,
scala.collection.immutable.Seq<Source<T,?>> sources) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
alsoToMat, concatMat, interleaveMat, mergeMat, mergeSortedMat, monitor, orElseMat, prependMat, transformMaterializing, watchTermination, zipMat, zipWithMat
alsoTo, alsoToGraph, andThen, backpressureTimeout, batch, batchWeighted, buffer, collect, completionTimeout, concat, concatGraph, conflate, conflateWithSeed, delay, detach, drop, dropWhile, dropWithin, expand, filter, filterNot, flatMapConcat, flatMapMerge, fold, foldAsync, groupBy, grouped, groupedWithin, idleTimeout, initialDelay, initialTimeout, interleave, interleaveGraph, intersperse, intersperse, keepAlive, limit, limitWeighted, log, map, mapAsync, mapAsyncUnordered, mapConcat, mapError, merge, mergeGraph, mergeSorted, mergeSortedGraph, orElse, orElseGraph, prefixAndTail, prepend, prependGraph, recover, recoverWith, recoverWithRetries, reduce, scan, scanAsync, sliding, splitAfter, splitAfter, splitWhen, splitWhen, statefulMapConcat, take, takeWhile, takeWhile, takeWithin, throttle, throttle, transform, zip, zipGraph, zipWith, zipWithGraph, zipWithIndex
public Source(StreamLayout.Module module)
public static <T> Source<T,NotUsed> fromPublisher(org.reactivestreams.Publisher<T> publisher)
Source
from Publisher
.
Construct a transformation starting with given publisher. The transformation steps
are executed by a series of Processor
instances
that mediate the flow of elements downstream and the propagation of
back-pressure upstream.
publisher
- (undocumented)public static <T> Source<T,NotUsed> fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Source
from Iterator
.
Example usage: Source.fromIterator(() => Iterator.from(0))
Start a new Source
from the given function that produces anIterator.
The produced stream of elements will continue until the iterator runs empty
or fails during evaluation of the next()
method.
Elements are pulled out of the iterator in accordance with the demand coming
from the downstream transformation steps.
f
- (undocumented)public static <T> Source<T,NotUsed> cycle(scala.Function0<scala.collection.Iterator<T>> f)
Source
that will continually produce given elements in specified order.
Start a new 'cycled' Source
from the given elements. The producer stream of elements
will continue infinitely by repeating the sequence of elements provided by function parameter.
f
- (undocumented)public static <T,M> Source<T,M> fromGraph(Graph<SourceShape<T>,M> g)
g
- (undocumented)public static <T> Source<T,NotUsed> apply(scala.collection.immutable.Iterable<T> iterable)
Source
from Iterable
.
Example usage: Source(Seq(1,2,3))
Starts a new Source
from the given Iterable
. This is like starting from an
Iterator, but every Subscriber directly attached to the Publisher of this
stream will see an individual flow of elements (always starting from the
beginning) regardless of when they subscribed.
iterable
- (undocumented)public static <T> Source<T,NotUsed> fromFuture(scala.concurrent.Future<T> future)
Source
from the given Future
. The stream will consist of
one element when the Future
is completed with a successful value, which
may happen before or after materializing the Flow
.
The stream terminates with a failure if the Future
is completed with a failure.future
- (undocumented)public static <T> Source<T,NotUsed> fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Source
from the given Future
. The stream will consist of
one element when the Future
is completed with a successful value, which
may happen before or after materializing the Flow
.
The stream terminates with a failure if the Future
is completed with a failure.future
- (undocumented)public static <T> Source<T,Cancellable> tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)
initialDelay
- (undocumented)interval
- (undocumented)tick
- (undocumented)public static <T> Source<T,NotUsed> single(T element)
Source
with one element.
Every connected Sink
of this stream will see an individual stream consisting of one element.element
- (undocumented)public static <T> Source<T,NotUsed> repeat(T element)
Source
that will continually emit the given element.element
- (undocumented)public static <S,E> Source<E,NotUsed> unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Source
that will unfold a value of type S
into
a pair of the next state S
and output elements of type E
.
For example, all the Fibonacci numbers under 10M:
Source.unfold(0 → 1) {
case (a, _) if a > 10000000 ⇒ None
case (a, b) ⇒ Some((b → (a + b)) → a)
}
s
- (undocumented)f
- (undocumented)public static <S,E> Source<E,NotUsed> unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
unfold
, but uses an async function to generate the next state-element tuple.
async fibonacci example:
Source.unfoldAsync(0 → 1) {
case (a, _) if a > 10000000 ⇒ Future.successful(None)
case (a, b) ⇒ Future{
Thread.sleep(1000)
Some((b → (a + b)) → a)
}
}
s
- (undocumented)f
- (undocumented)public static <T> Source<T,NotUsed> empty()
Source
with no elements, i.e. an empty stream that is completed immediately for every connected Sink
.public static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>> maybe()
Source
which materializes a Promise
which controls what element
will be emitted by the Source.
If the materialized promise is completed with a Some, that value will be produced downstream,
followed by completion.
If the materialized promise is completed with a None, no value will be produced downstream and completion will
be signalled immediately.
If the materialized promise is completed with a failure, then the returned source will terminate with that error.
If the downstream of this source cancels before the promise has been completed, then the promise will be completed
with None.public static <T> Source<T,NotUsed> failed(java.lang.Throwable cause)
Source
that immediately ends the stream with the cause
error to every connected Sink
.cause
- (undocumented)public static <T,M> Source<T,scala.concurrent.Future<M>> lazily(scala.Function0<Source<T,M>> create)
Source
that is not materialized until there is downstream demand, when the source gets materialized
the materialized future is completed with its value, if downstream cancels or fails without any demand the
create factory is never called and the materialized Future
is failed.create
- (undocumented)public static <T> Source<T,org.reactivestreams.Subscriber<T>> asSubscriber()
Source
that is materialized as a Subscriber
public static <T> Source<T,ActorRef> actorPublisher(Props props)
Source
that is materialized to an ActorRef
which points to an Actor
created according to the passed in Props
. Actor created by the props
must
be ActorPublisher
.props
- (undocumented)public static <T> Source<T,ActorRef> actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Source
that is materialized as an ActorRef
.
Messages sent to this actor will be emitted to the stream if there is demand from downstream,
otherwise they will be buffered until request for demand is received.
Depending on the defined OverflowStrategy
it might drop elements if
there is no space available in the buffer.
The strategy akka.stream.OverflowStrategy.backpressure
is not supported, and an
IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.
The buffer can be disabled by using bufferSize
of 0 and then received messages are dropped if there is no demand
from downstream. When bufferSize
is 0 the overflowStrategy
does not matter. An async boundary is added after
this Source; as such, it is never safe to assume the downstream will always generate demand.
The stream can be completed successfully by sending the actor reference a Status.Success
(whose content will be ignored) in which case already buffered elements will be signaled before signaling
completion, or by sending PoisonPill
in which case completion will be signaled immediately.
The stream can be completed with failure by sending a Status.Failure
to the
actor reference. In case the Actor is still draining its internal buffer (after having received
a Status.Success
) before signaling completion and it receives a Status.Failure
,
the failure will be signaled downstream immediately (instead of the completion signal).
The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also akka.stream.scaladsl.Source.queue
.
bufferSize
- The size of the buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic static <T> Source<scala.collection.immutable.Seq<T>,NotUsed> zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
sources
- (undocumented)public static <T,O> Source<O,NotUsed> zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
public static <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy)
Source
that is materialized as an SourceQueue
.
You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
if downstream is terminated.
Depending on the defined OverflowStrategy
it might drop elements if
there is no space available in the buffer.
Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueue.offer
returns Future[QueueOfferResult]
which completes with
QueueOfferResult.Enqueued
if element was added to buffer or sent downstream. It completes with
QueueOfferResult.Dropped
if element was dropped. Can also complete with QueueOfferResult.Failure
-
when stream failed or QueueOfferResult.QueueClosed
when downstream is completed.
The strategy akka.stream.OverflowStrategy.backpressure
will not complete last offer():Future
call when buffer is full.
You can watch accessibility of stream with akka.stream.scaladsl.SourceQueue.watchCompletion
.
It returns future that completes with success when stream is completed or fail when stream is failed.
The buffer can be disabled by using bufferSize
of 0 and then received message will wait
for downstream demand unless there is another message waiting for downstream demand, in that case
offer result will be completed according to the overflow strategy.
SourceQueue that current source is materialized to is for single thread usage only.
bufferSize
- size of buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the bufferpublic static <T,S> Source<T,NotUsed> unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)
Source
from some resource which can be opened, read and closed.
Interaction with resource happens in a blocking way.
Example:
Source.unfoldResource(
() => new BufferedReader(new FileReader("...")),
reader => Option(reader.readLine()),
reader => reader.close())
You can use the supervision strategy to handle exceptions for read
function. All exceptions thrown by create
or close
will fail the stream.
Restart
supervision strategy will close and create blocking IO again. Default strategy is Stop
which means
that stream will be terminated on error in read
function by default.
You can configure the default dispatcher for this Source by changing the akka.stream.blocking-io-dispatcher
or
set it for a given Source by using ActorAttributes
.
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal
is received. Stream calls close and completes when read
returns None.close
- - function that closes resourcepublic static <T,S> Source<T,NotUsed> unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)
Source
from some resource which can be opened, read and closed.
It's similar to unfoldResource
but takes functions that return Futures
instead of plain values.
You can use the supervision strategy to handle exceptions for read
function or failures of produced Futures
.
All exceptions thrown by create
or close
as well as fails of returned futures will fail the stream.
Restart
supervision strategy will close and create resource. Default strategy is Stop
which means
that stream will be terminated on error in read
function (or future) by default.
You can configure the default dispatcher for this Source by changing the akka.stream.blocking-io-dispatcher
or
set it for a given Source by using ActorAttributes
.
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal
is received. Stream calls close and completes when Future
from read function returns None.close
- - function that closes resourcepublic static <T> FlowOps recover(scala.PartialFunction<java.lang.Throwable,T> pf)
public static <T> FlowOps recoverWith(scala.PartialFunction<java.lang.Throwable,Graph<SourceShape<T>,NotUsed>> pf)
public static <T> FlowOps recoverWithRetries(int attempts, scala.PartialFunction<java.lang.Throwable,Graph<SourceShape<T>,NotUsed>> pf)
public static FlowOps mapError(scala.PartialFunction<java.lang.Throwable,java.lang.Throwable> pf)
public static <T> FlowOps map(scala.Function1<Out,T> f)
public static <T> FlowOps mapConcat(scala.Function1<Out,scala.collection.immutable.Iterable<T>> f)
public static <T> FlowOps statefulMapConcat(scala.Function0<scala.Function1<Out,scala.collection.immutable.Iterable<T>>> f)
public static <T> FlowOps mapAsync(int parallelism, scala.Function1<Out,scala.concurrent.Future<T>> f)
public static <T> FlowOps mapAsyncUnordered(int parallelism, scala.Function1<Out,scala.concurrent.Future<T>> f)
public static FlowOps filter(scala.Function1<Out,java.lang.Object> p)
public static FlowOps filterNot(scala.Function1<Out,java.lang.Object> p)
public static FlowOps takeWhile(scala.Function1<Out,java.lang.Object> p)
public static FlowOps takeWhile(scala.Function1<Out,java.lang.Object> p, boolean inclusive)
public static FlowOps dropWhile(scala.Function1<Out,java.lang.Object> p)
public static <T> FlowOps collect(scala.PartialFunction<Out,T> pf)
public static FlowOps grouped(int n)
public static FlowOps limit(long max)
public static <T> FlowOps limitWeighted(long max, scala.Function1<Out,java.lang.Object> costFn)
public static FlowOps sliding(int n, int step)
public static <T> FlowOps scan(T zero, scala.Function2<T,Out,T> f)
public static <T> FlowOps scanAsync(T zero, scala.Function2<T,Out,scala.concurrent.Future<T>> f)
public static <T> FlowOps fold(T zero, scala.Function2<T,Out,T> f)
public static <T> FlowOps foldAsync(T zero, scala.Function2<T,Out,scala.concurrent.Future<T>> f)
public static <T> FlowOps reduce(scala.Function2<T,T,T> f)
public static <T> FlowOps intersperse(T start, T inject, T end)
public static <T> FlowOps intersperse(T inject)
public static FlowOps groupedWithin(int n, scala.concurrent.duration.FiniteDuration d)
public static FlowOps delay(scala.concurrent.duration.FiniteDuration of, DelayOverflowStrategy strategy)
public static FlowOps drop(long n)
public static FlowOps dropWithin(scala.concurrent.duration.FiniteDuration d)
public static FlowOps take(long n)
public static FlowOps takeWithin(scala.concurrent.duration.FiniteDuration d)
public static <S> FlowOps conflateWithSeed(scala.Function1<Out,S> seed, scala.Function2<S,Out,S> aggregate)
public static <O2> FlowOps conflate(scala.Function2<O2,O2,O2> aggregate)
public static <S> FlowOps batch(long max, scala.Function1<Out,S> seed, scala.Function2<S,Out,S> aggregate)
public static <S> FlowOps batchWeighted(long max, scala.Function1<Out,java.lang.Object> costFn, scala.Function1<Out,S> seed, scala.Function2<S,Out,S> aggregate)
public static <U> FlowOps expand(scala.Function1<Out,scala.collection.Iterator<U>> extrapolate)
public static FlowOps buffer(int size, OverflowStrategy overflowStrategy)
public static <U> FlowOps prefixAndTail(int n)
public static <K> SubFlow<Out,Mat,FlowOps,java.lang.Object> groupBy(int maxSubstreams, scala.Function1<Out,K> f)
public static SubFlow<Out,Mat,FlowOps,java.lang.Object> splitWhen(SubstreamCancelStrategy substreamCancelStrategy, scala.Function1<Out,java.lang.Object> p)
public static SubFlow<Out,Mat,FlowOps,java.lang.Object> splitWhen(scala.Function1<Out,java.lang.Object> p)
public static SubFlow<Out,Mat,FlowOps,java.lang.Object> splitAfter(SubstreamCancelStrategy substreamCancelStrategy, scala.Function1<Out,java.lang.Object> p)
public static SubFlow<Out,Mat,FlowOps,java.lang.Object> splitAfter(scala.Function1<Out,java.lang.Object> p)
public static <T,M> FlowOps flatMapConcat(scala.Function1<Out,Graph<SourceShape<T>,M>> f)
public static <T,M> FlowOps flatMapMerge(int breadth, scala.Function1<Out,Graph<SourceShape<T>,M>> f)
public static FlowOps initialTimeout(scala.concurrent.duration.FiniteDuration timeout)
public static FlowOps completionTimeout(scala.concurrent.duration.FiniteDuration timeout)
public static FlowOps idleTimeout(scala.concurrent.duration.FiniteDuration timeout)
public static FlowOps backpressureTimeout(scala.concurrent.duration.FiniteDuration timeout)
public static <U> FlowOps keepAlive(scala.concurrent.duration.FiniteDuration maxIdle, scala.Function0<U> injectedElem)
public static FlowOps throttle(int elements, scala.concurrent.duration.FiniteDuration per, int maximumBurst, ThrottleMode mode)
public static FlowOps throttle(int cost, scala.concurrent.duration.FiniteDuration per, int maximumBurst, scala.Function1<Out,java.lang.Object> costCalculation, ThrottleMode mode)
public static FlowOps detach()
public static FlowOps initialDelay(scala.concurrent.duration.FiniteDuration delay)
public static FlowOps log(java.lang.String name, scala.Function1<Out,java.lang.Object> extract, LoggingAdapter log)
public static <U> FlowOps zip(Graph<SourceShape<U>,?> that)
protected static <U,M> Graph<FlowShape<Out,scala.Tuple2<Out,U>>,M> zipGraph(Graph<SourceShape<U>,M> that)
public static <Out2,Out3> FlowOps zipWith(Graph<SourceShape<Out2>,?> that, scala.Function2<Out,Out2,Out3> combine)
protected static <Out2,Out3,M> Graph<FlowShape<Out,Out3>,M> zipWithGraph(Graph<SourceShape<Out2>,M> that, scala.Function2<Out,Out2,Out3> combine)
public static FlowOps zipWithIndex()
public static <U> FlowOps interleave(Graph<SourceShape<U>,?> that, int segmentSize)
protected static <U,M> Graph<FlowShape<Out,U>,M> interleaveGraph(Graph<SourceShape<U>,M> that, int segmentSize)
public static <U,M> FlowOps merge(Graph<SourceShape<U>,M> that, boolean eagerComplete)
protected static <U,M> Graph<FlowShape<Out,U>,M> mergeGraph(Graph<SourceShape<U>,M> that, boolean eagerComplete)
public static <U,M> FlowOps mergeSorted(Graph<SourceShape<U>,M> that, scala.math.Ordering<U> ord)
protected static <U,M> Graph<FlowShape<Out,U>,M> mergeSortedGraph(Graph<SourceShape<U>,M> that, scala.math.Ordering<U> ord)
public static <U,Mat2> FlowOps concat(Graph<SourceShape<U>,Mat2> that)
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> concatGraph(Graph<SourceShape<U>,Mat2> that)
public static <U,Mat2> FlowOps prepend(Graph<SourceShape<U>,Mat2> that)
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> prependGraph(Graph<SourceShape<U>,Mat2> that)
public static <U,Mat2> FlowOps orElse(Graph<SourceShape<U>,Mat2> secondary)
protected static <U,Mat2> Graph<FlowShape<Out,U>,Mat2> orElseGraph(Graph<SourceShape<U>,Mat2> secondary)
public static <U,M> FlowOps $plus$plus(Graph<SourceShape<U>,M> that)
protected static <M> Graph<FlowShape<Out,Out>,M> alsoToGraph(Graph<SinkShape<Out>,M> that)
public static <T> FlowOps andThen(Stages.SymbolicStage<Out,T> op)
public static int sliding$default$2()
public static DelayOverflowStrategy delay$default$2()
public static <U,M> boolean merge$default$2()
public static scala.Function1<Out,java.lang.Object> log$default$2()
public static LoggingAdapter log$default$3(java.lang.String name, scala.Function1<Out,java.lang.Object> extract)
public static <U,Mat2,Mat3> FlowOpsMat zipMat(Graph<SourceShape<U>,Mat2> that, scala.Function2<Mat,Mat2,Mat3> matF)
public static <Out2,Out3,Mat2,Mat3> FlowOpsMat zipWithMat(Graph<SourceShape<Out2>,Mat2> that, scala.Function2<Out,Out2,Out3> combine, scala.Function2<Mat,Mat2,Mat3> matF)
public static <U,Mat2,Mat3> FlowOpsMat mergeMat(Graph<SourceShape<U>,Mat2> that, boolean eagerComplete, scala.Function2<Mat,Mat2,Mat3> matF)
public static <U,Mat2,Mat3> FlowOpsMat interleaveMat(Graph<SourceShape<U>,Mat2> that, int request, scala.Function2<Mat,Mat2,Mat3> matF)
public static <U,Mat2,Mat3> FlowOpsMat mergeSortedMat(Graph<SourceShape<U>,Mat2> that, scala.Function2<Mat,Mat2,Mat3> matF, scala.math.Ordering<U> ord)
public static <U,Mat2,Mat3> FlowOpsMat concatMat(Graph<SourceShape<U>,Mat2> that, scala.Function2<Mat,Mat2,Mat3> matF)
public static <U,Mat2,Mat3> FlowOpsMat prependMat(Graph<SourceShape<U>,Mat2> that, scala.Function2<Mat,Mat2,Mat3> matF)
public static <U,Mat2,Mat3> FlowOpsMat orElseMat(Graph<SourceShape<U>,Mat2> secondary, scala.Function2<Mat,Mat2,Mat3> matF)
public static <Mat2,Mat3> FlowOpsMat alsoToMat(Graph<SinkShape<Out>,Mat2> that, scala.Function2<Mat,Mat2,Mat3> matF)
public static <Mat2> FlowOpsMat watchTermination(scala.Function2<Mat,scala.concurrent.Future<Done>,Mat2> matF)
public static <Mat2> FlowOpsMat monitor(scala.Function2<Mat,FlowMonitor<Out>,Mat2> combine)
public static <T,M> FlowOpsMat transformMaterializing(scala.Function0<scala.Tuple2<Stage<Out,T>,M>> mkStageAndMaterialized)
public static <U,Mat2,Mat3> boolean mergeMat$default$2()
public StreamLayout.Module module()
Graph
Every materializable element must be backed by a stream layout module
module
in interface Graph<SourceShape<Out>,Mat>
public SourceShape<Out> shape()
Graph
shape
in interface Graph<SourceShape<Out>,Mat>
public java.lang.String toString()
toString
in class java.lang.Object
public <T,Mat2,Mat3> Source<T,Mat3> viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
FlowOpsMat
Flow
by appending the given processing steps.
+----------------------------+
| Resulting Flow |
| |
| +------+ +------+ |
| | | | | |
In ~~> | this | ~Out~> | flow | ~~> T
| | | | | |
| +------+ +------+ |
+----------------------------+
The combine
function is used to compose the materialized values of this flow and that
flow into the materialized value of the resulting Flow.
It is recommended to use the internally optimized Keep.left
and Keep.right
combiners
where appropriate instead of manually writing functions that pass through one of the values.
viaMat
in interface FlowOpsMat<Out,Mat>
flow
- (undocumented)combine
- (undocumented)public <Mat2> RunnableGraph<Mat> to(Graph<SinkShape<Out>,Mat2> sink)
public <Mat2,Mat3> RunnableGraph<Mat3> toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
toMat
in interface FlowOpsMat<Out,Mat>
sink
- (undocumented)combine
- (undocumented)public <Mat2> Source<Out,Mat2> mapMaterializedValue(scala.Function1<Mat,Mat2> f)
mapMaterializedValue
in interface FlowOpsMat<Out,Mat>
f
- (undocumented)public <Mat2> Mat2 runWith(Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Source
to a Sink
and run it. The returned value is the materialized value
of the Sink
, e.g. the Publisher
of a akka.stream.scaladsl.Sink#publisher
.sink
- (undocumented)materializer
- (undocumented)public <U> scala.concurrent.Future<U> runFold(U zero, scala.Function2<U,Out,U> f, Materializer materializer)
Source
with a fold function.
The given function is invoked for every received element, giving it its previous
output (or the given zero
value) and the element as input.
The returned Future
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure signaled in the stream.zero
- (undocumented)f
- (undocumented)materializer
- (undocumented)public <U> scala.concurrent.Future<U> runFoldAsync(U zero, scala.Function2<U,Out,scala.concurrent.Future<U>> f, Materializer materializer)
Source
with a foldAsync function.
The given function is invoked for every received element, giving it its previous
output (or the given zero
value) and the element as input.
The returned Future
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure signaled in the stream.zero
- (undocumented)f
- (undocumented)materializer
- (undocumented)public <U> scala.concurrent.Future<U> runReduce(scala.Function2<U,U,U> f, Materializer materializer)
Source
with a reduce function.
The given function is invoked for every received element, giving it its previous
output (from the second element) and the element as input.
The returned Future
will be completed with value of the final
function evaluation when the input stream ends, or completed with Failure
if there is a failure signaled in the stream.
If the stream is empty (i.e. completes before signalling any elements),
the reduce stage will fail its downstream with a NoSuchElementException
,
which is semantically in-line with that Scala's standard library collections
do in such situations.
f
- (undocumented)materializer
- (undocumented)public scala.concurrent.Future<Done> runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f, Materializer materializer)
Source
with a foreach procedure. The given procedure is invoked
for each received element.
The returned Future
will be completed with Success
when reaching the
normal end of the stream, or completed with Failure
if there is a failure signaled in
the stream.f
- (undocumented)materializer
- (undocumented)public Source<Out,Mat> withAttributes(Attributes attr)
Source
to the given ones and seal the list
of attributes. This means that further calls will not be able to remove these
attributes, but instead add new ones. Note that this
operation has no effect on an empty Flow (because the attributes apply
only to the contained processing stages).withAttributes
in interface Graph<SourceShape<Out>,Mat>
withAttributes
in interface FlowOps<Out,Mat>
attr
- (undocumented)public Source<Out,Mat> addAttributes(Attributes attr)
withAttributes
will not remove these attributes. Note that this
operation has no effect on an empty Flow (because the attributes apply
only to the contained processing stages).addAttributes
in interface Graph<SourceShape<Out>,Mat>
addAttributes
in interface FlowOps<Out,Mat>
attr
- (undocumented)public Source<Out,Mat> asJava()
public <T,U> Source<U,NotUsed> combine(Source<T,?> first, Source<T,?> second, scala.collection.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy)
Merge
or Concat
and returns Source
.first
- (undocumented)second
- (undocumented)rest
- (undocumented)strategy
- (undocumented)