Class Source<Out,Mat>
- java.lang.Object
-
- akka.stream.scaladsl.Source<Out,Mat>
-
- All Implemented Interfaces:
Graph<SourceShape<Out>,Mat>
,FlowOps<Out,Mat>
,FlowOpsMat<Out,Mat>
public final class Source<Out,Mat> extends java.lang.Object implements FlowOpsMat<Out,Mat>, Graph<SourceShape<Out>,Mat>
ASource
is a set of stream processing steps that has one open output. It can comprise any number of internal sources and transformations that are wired together, or it can be an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into a Reactive StreamsPublisher
(at least conceptually).
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface akka.stream.Graph
Graph.GraphMapMatVal<S extends Shape,M>
-
-
Constructor Summary
Constructors Constructor Description Source(akka.stream.impl.LinearTraversalBuilder traversalBuilder, SourceShape<Out> shape)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static <T> Source<T,ActorRef>
actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Deprecated.Use variant accepting completion and failure matchers instead.static <T> Source<T,ActorRef>
actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)
Creates aSource
that is materialized as anActorRef
.static <T> Source<T,ActorRef>
actorRefWithAck(java.lang.Object ackMessage)
Deprecated.Use actorRefWithBackpressure accepting completion and failure matchers instead.static <T> Source<T,ActorRef>
actorRefWithBackpressure(java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)
Creates aSource
that is materialized as anActorRef
.Source<Out,Mat>
addAttributes(Attributes attr)
Add the given attributes to this Source.static <T> Source<T,NotUsed>
apply(scala.collection.immutable.Iterable<T> iterable)
Helper to createSource
fromIterable
.Source<Out,Mat>
asJava()
Converts this Scala DSL element to it's Java DSL counterpart.<Ctx> SourceWithContext<Out,Ctx,Mat>
asSourceWithContext(scala.Function1<Out,Ctx> f)
Transform this source whose element ise
into a source producing tuple
(e, f(e))
static <T> Source<T,org.reactivestreams.Subscriber<T>>
asSubscriber()
Creates aSource
that is materialized as aSubscriber
Source<Out,Mat>
async()
Put an asynchronous boundary around thisSource
Source<Out,Mat>
async(java.lang.String dispatcher)
Put an asynchronous boundary around thisGraph
Source<Out,Mat>
async(java.lang.String dispatcher, int inputBufferSize)
Put an asynchronous boundary around thisGraph
static <T,U>
Source<U,NotUsed>combine(Source<T,?> first, Source<T,?> second, scala.collection.immutable.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
static <T,U,M>
Source<U,scala.collection.immutable.Seq<M>>combine(scala.collection.immutable.Seq<Graph<SourceShape<T>,M>> sources, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
static <T,U,M1,M2,M>
Source<U,M>combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy, scala.Function2<M1,M2,M> matF)
static <T> Source<T,NotUsed>
completionStage(java.util.concurrent.CompletionStage<T> completionStage)
Emits a single value when the givenCompletionStage
is successfully completed and then completes the stream.static <T> Source<T,NotUsed>
cycle(scala.Function0<scala.collection.Iterator<T>> f)
CreatesSource
that will continually produce given elements in specified order.static <T> Source<T,NotUsed>
empty()
ASource
with no elements, i.e.static <T> Source<T,NotUsed>
failed(java.lang.Throwable cause)
Create aSource
that immediately ends the stream with thecause
error to every connectedSink
.static <T> Source<T,NotUsed>
fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Deprecated.Use 'Source.completionStage' instead.static <T> Source<T,NotUsed>
fromFuture(scala.concurrent.Future<T> future)
Deprecated.Use 'Source.future' instead.static <T,M>
Source<T,scala.concurrent.Future<M>>fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)
Deprecated.Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead.static <T,M>
Source<T,M>fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes it so also in type.static <T> Source<T,NotUsed>
fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Helper to createSource
fromIterator
.static <T,S extends java.util.stream.BaseStream<T,S>>
Source<T,NotUsed>fromJavaStream(scala.Function0<java.util.stream.BaseStream<T,S>> stream)
Creates a source that wraps a Java 8Stream
.
static <T,M>
Source<T,scala.concurrent.Future<M>>fromMaterializer(scala.Function2<Materializer,Attributes,Source<T,M>> factory)
Defers the creation of aSource
until materialization.static <T> Source<T,NotUsed>
fromPublisher(org.reactivestreams.Publisher<T> publisher)
Helper to createSource
fromPublisher
.static <T,M>
Source<T,java.util.concurrent.CompletionStage<M>>fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)
Deprecated.Use scala-compat CompletionStage to future converter and 'Source.futureSource' instead.static <T> Source<T,NotUsed>
future(scala.concurrent.Future<T> futureElement)
Emits a single value when the givenFuture
is successfully completed and then completes the stream.static <T,M>
Source<T,scala.concurrent.Future<M>>futureSource(scala.concurrent.Future<Source<T,M>> futureSource)
Turn aFuture[Source]
into a source that will emit the values of the source when the future completes successfully.Attributes
getAttributes()
static <T,M>
Source<T,scala.concurrent.Future<M>>lazily(scala.Function0<Source<T,M>> create)
Deprecated.Use 'Source.lazySource' instead.static <T> Source<T,scala.concurrent.Future<NotUsed>>
lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)
Deprecated.Use 'Source.lazyFuture' instead.static <T> Source<T,NotUsed>
lazyFuture(scala.Function0<scala.concurrent.Future<T>> create)
Defers invoking thecreate
function to create a future element until there is downstream demand.static <T,M>
Source<T,scala.concurrent.Future<M>>lazyFutureSource(scala.Function0<scala.concurrent.Future<Source<T,M>>> create)
Defers invoking thecreate
function to create a future source until there is downstream demand.static <T> Source<T,NotUsed>
lazySingle(scala.Function0<T> create)
Defers invoking thecreate
function to create a single element until there is downstream demand.static <T,M>
Source<T,scala.concurrent.Future<M>>lazySource(scala.Function0<Source<T,M>> create)
Defers invoking thecreate
function to create a future source until there is downstream demand.<Mat2> Source<Out,Mat2>
mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform only the materialized value of this Source, leaving all other properties as they were.static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>>
maybe()
Create aSource
which materializes aPromise
which controls what element will be emitted by the Source.static <T> Source<T,NotUsed>
mergePrioritizedN(scala.collection.immutable.Seq<scala.Tuple2<Source<T,?>,java.lang.Object>> sourcesAndPriorities, boolean eagerComplete)
Merge multipleSource
s.Source<Out,Mat>
named(java.lang.String name)
Add aname
attribute to this Source.
static <T> Source<T,NotUsed>
never()
Never emits any elements, never completes and never fails.scala.Tuple2<Mat,Source<Out,NotUsed>>
preMaterialize(Materializer materializer)
Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can be used to consume elements from the newly materialized Source.static <T> Source<T,BoundedSourceQueue<T>>
queue(int bufferSize)
Creates aSource
that is materialized as anBoundedSourceQueue
.static <T> Source<T,SourceQueueWithComplete<T>>
queue(int bufferSize, OverflowStrategy overflowStrategy)
Creates aSource
that is materialized as anSourceQueueWithComplete
.static <T> Source<T,SourceQueueWithComplete<T>>
queue(int bufferSize, OverflowStrategy overflowStrategy, int maxConcurrentOffers)
Creates aSource
that is materialized as anSourceQueueWithComplete
.static <T> Source<T,NotUsed>
repeat(T element)
Create aSource
that will continually emit the given element.scala.concurrent.Future<Done>
run(Materializer materializer)
Connect thisSource
to theSink.ignore
and run it.<U> scala.concurrent.Future<U>
runFold(U zero, scala.Function2<U,Out,U> f, Materializer materializer)
Shortcut for running thisSource
with a fold function.<U> scala.concurrent.Future<U>
runFoldAsync(U zero, scala.Function2<U,Out,scala.concurrent.Future<U>> f, Materializer materializer)
Shortcut for running thisSource
with a foldAsync function.scala.concurrent.Future<Done>
runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f, Materializer materializer)
Shortcut for running thisSource
with a foreach procedure.<U> scala.concurrent.Future<U>
runReduce(scala.Function2<U,U,U> f, Materializer materializer)
Shortcut for running thisSource
with a reduce function.<Mat2> Mat2
runWith(Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Connect thisSource
to aSink
and run it.static <T,M>
Source<T,scala.concurrent.Future<M>>setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)
Deprecated.Use 'fromMaterializer' instead.SourceShape<Out>
shape()
The shape of a graph is all that is externally visible: its inlets and outlets.static <T> Source<T,NotUsed>
single(T element)
Create aSource
with one element.static <T> Source<T,Cancellable>
tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)
Elements are emitted periodically with the specified interval.<Mat2> RunnableGraph<Mat>
to(Graph<SinkShape<Out>,Mat2> sink)
<Mat2,Mat3>
RunnableGraph<Mat3>toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
java.lang.String
toString()
akka.stream.impl.LinearTraversalBuilder
traversalBuilder()
INTERNAL API.static <S,E>
Source<E,NotUsed>unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Create aSource
that will unfold a value of typeS
into a pair of the next stateS
and output elements of typeE
.static <S,E>
Source<E,NotUsed>unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>)
, but uses an async function to generate the next state-element tuple.static <T,S>
Source<T,NotUsed>unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)
Start a newSource
from some resource which can be opened, read and closed.static <T,S>
Source<T,NotUsed>unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)
Start a newSource
from some resource which can be opened, read and closed.<T,Mat2>
Source<T,Mat>via(Graph<FlowShape<Out,T>,Mat2> flow)
<T,Mat2,Mat3>
Source<T,Mat3>viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Transform thisFlow
by appending the given processing steps.Source<Out,Mat>
withAttributes(Attributes attr)
Replace the attributes of thisSource
with the given ones.static <T> Source<scala.collection.immutable.Seq<T>,NotUsed>
zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of sequences.static <T,O>
Source<O,NotUsed>zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface akka.stream.scaladsl.FlowOps
$plus$plus, aggregateWithBoundary, alsoTo, alsoToAll, alsoToGraph, ask, ask, backpressureTimeout, batch, batchWeighted, buffer, collect, collectType, completionTimeout, concat, concatAllLazy, concatGraph, concatLazy, conflate, conflateWithSeed, delay, delay$default$2, delayWith, detach, divertTo, divertToGraph, drop, dropWhile, dropWithin, expand, extrapolate, extrapolate$default$2, filter, filterNot, flatMapConcat, flatMapMerge, flatMapPrefix, fold, foldAsync, groupBy, groupBy, grouped, groupedWeighted, groupedWeightedWithin, groupedWeightedWithin, groupedWithin, idleTimeout, initialDelay, initialTimeout, interleave, interleave, interleaveAll, interleaveGraph, interleaveGraph$default$3, internalConcat, internalConcatAll, intersperse, intersperse, keepAlive, limit, limitWeighted, log, log$default$2, log$default$3, logWithMarker, logWithMarker$default$3, logWithMarker$default$4, map, mapAsync, mapAsyncPartitioned, mapAsyncUnordered, mapConcat, mapError, mapWithResource, merge, merge$default$2, mergeAll, mergeGraph, mergeLatest, mergeLatest$default$2, mergeLatestGraph, mergePreferred, mergePreferred$default$3, mergePreferredGraph, mergePrioritized, mergePrioritized$default$4, mergePrioritizedGraph, mergeSorted, mergeSortedGraph, onErrorComplete, onErrorComplete, orElse, orElseGraph, prefixAndTail, prepend, prependGraph, prependLazy, recover, recoverWith, recoverWithRetries, reduce, scan, scanAsync, sliding, sliding$default$2, splitAfter, splitAfter, splitWhen, splitWhen, statefulMap, statefulMapConcat, take, takeWhile, takeWhile, takeWithin, throttle, throttle, throttle, throttle, watch, wireTap, wireTap, wireTapGraph, zip, zipAll, zipAllFlow, zipGraph, zipLatest, zipLatestGraph, zipLatestWith, zipLatestWith, zipLatestWithGraph, zipLatestWithGraph, zipWith, zipWithGraph, zipWithIndex
-
Methods inherited from interface akka.stream.scaladsl.FlowOpsMat
alsoToMat, concatLazyMat, concatMat, divertToMat, flatMapPrefixMat, interleaveMat, interleaveMat, mergeLatestMat, mergeMat, mergeMat$default$2, mergePreferredMat, mergePrioritizedMat, mergeSortedMat, monitor, monitorMat, orElseMat, prependLazyMat, prependMat, watchTermination, wireTapMat, zipAllMat, zipLatestMat, zipLatestWithMat, zipLatestWithMat, zipMat, zipWithMat
-
-
-
-
Constructor Detail
-
Source
public Source(akka.stream.impl.LinearTraversalBuilder traversalBuilder, SourceShape<Out> shape)
-
-
Method Detail
-
fromPublisher
public static <T> Source<T,NotUsed> fromPublisher(org.reactivestreams.Publisher<T> publisher)
Helper to createSource
fromPublisher
.Construct a transformation starting with given publisher. The transformation steps are executed by a series of
Processor
instances that mediate the flow of elements downstream and the propagation of back-pressure upstream.
-
fromIterator
public static <T> Source<T,NotUsed> fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Helper to createSource
fromIterator
. Example usage:Source.fromIterator(() => Iterator.from(0))
Start a new
Source
from the given function that produces anIterator. The produced stream of elements will continue until the iterator runs empty or fails during evaluation of thenext()
method. Elements are pulled out of the iterator in accordance with the demand coming from the downstream transformation steps.
-
fromJavaStream
public static <T,S extends java.util.stream.BaseStream<T,S>> Source<T,NotUsed> fromJavaStream(scala.Function0<java.util.stream.BaseStream<T,S>> stream)
Creates a source that wraps a Java 8Stream
.
Source
uses a stream iterator to get all its elements and send them downstream on demand.
You can use
Source.async
to create asynchronous boundaries between synchronous JavaStream
and the rest of flow.
-
cycle
public static <T> Source<T,NotUsed> cycle(scala.Function0<scala.collection.Iterator<T>> f)
CreatesSource
that will continually produce given elements in specified order.Starts a new 'cycled'
Source
from the given elements. The producer stream of elements will continue infinitely by repeating the sequence of elements provided by function parameter.
-
fromGraph
public static <T,M> Source<T,M> fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes it so also in type.
-
fromMaterializer
public static <T,M> Source<T,scala.concurrent.Future<M>> fromMaterializer(scala.Function2<Materializer,Attributes,Source<T,M>> factory)
-
setup
public static <T,M> Source<T,scala.concurrent.Future<M>> setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)
Deprecated.Use 'fromMaterializer' instead. Since 2.6.0.
-
apply
public static <T> Source<T,NotUsed> apply(scala.collection.immutable.Iterable<T> iterable)
Helper to createSource
fromIterable
. Example usage:Source(Seq(1,2,3))
Starts a new
Source
from the givenIterable
. This is like starting from an Iterator, but every Subscriber directly attached to the Publisher of this stream will see an individual flow of elements (always starting from the beginning) regardless of when they subscribed.
-
fromFuture
public static <T> Source<T,NotUsed> fromFuture(scala.concurrent.Future<T> future)
Deprecated.Use 'Source.future' instead. Since 2.6.0.Starts a newSource
from the givenFuture
. The stream will consist of one element when theFuture
is completed with a successful value, which may happen before or after materializing theFlow
. The stream terminates with a failure if theFuture
is completed with a failure.
-
fromCompletionStage
public static <T> Source<T,NotUsed> fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Deprecated.Use 'Source.completionStage' instead. Since 2.6.0.Starts a newSource
from the givenFuture
. The stream will consist of one element when theFuture
is completed with a successful value, which may happen before or after materializing theFlow
. The stream terminates with a failure if theFuture
is completed with a failure.
-
fromFutureSource
public static <T,M> Source<T,scala.concurrent.Future<M>> fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)
Deprecated.Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead. Since 2.6.0.Streams the elements of the given future source once it successfully completes. If theFuture
fails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuture
will be failed with aStreamDetachedException
-
fromSourceCompletionStage
public static <T,M> Source<T,java.util.concurrent.CompletionStage<M>> fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)
Deprecated.Use scala-compat CompletionStage to future converter and 'Source.futureSource' instead. Since 2.6.0.Streams the elements of an asynchronous source once its givencompletion
operator completes. If theCompletionStage
fails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuture
will be failed with aStreamDetachedException
-
tick
public static <T> Source<T,Cancellable> tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)
Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements.
-
single
public static <T> Source<T,NotUsed> single(T element)
Create aSource
with one element. Every connectedSink
of this stream will see an individual stream consisting of one element.
-
repeat
public static <T> Source<T,NotUsed> repeat(T element)
Create aSource
that will continually emit the given element.
-
unfold
public static <S,E> Source<E,NotUsed> unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Create aSource
that will unfold a value of typeS
into a pair of the next stateS
and output elements of typeE
.For example, all the Fibonacci numbers under 10M:
Source.unfold(0 -> 1) { case (a, _) if a > 10000000 => None case (a, b) => Some((b -> (a + b)) -> a) }
-
unfoldAsync
public static <S,E> Source<E,NotUsed> unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>)
, but uses an async function to generate the next state-element tuple.async fibonacci example:
Source.unfoldAsync(0 -> 1) { case (a, _) if a > 10000000 => Future.successful(None) case (a, b) => Future{ Thread.sleep(1000) Some((b -> (a + b)) -> a) } }
-
empty
public static <T> Source<T,NotUsed> empty()
ASource
with no elements, i.e. an empty stream that is completed immediately for every connectedSink
.
-
maybe
public static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>> maybe()
Create aSource
which materializes aPromise
which controls what element will be emitted by the Source. If the materialized promise is completed with a Some, that value will be produced downstream, followed by completion. If the materialized promise is completed with a None, no value will be produced downstream and completion will be signalled immediately. If the materialized promise is completed with a failure, then the source will fail with that error. If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with None.
-
failed
public static <T> Source<T,NotUsed> failed(java.lang.Throwable cause)
Create aSource
that immediately ends the stream with thecause
error to every connectedSink
.
-
lazily
public static <T,M> Source<T,scala.concurrent.Future<M>> lazily(scala.Function0<Source<T,M>> create)
Deprecated.Use 'Source.lazySource' instead. Since 2.6.0.Creates aSource
that is not materialized until there is downstream demand, when the source gets materialized the materialized future is completed with its value, if downstream cancels or fails without any demand the create factory is never called and the materializedFuture
is failed.
-
lazilyAsync
public static <T> Source<T,scala.concurrent.Future<NotUsed>> lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)
Deprecated.Use 'Source.lazyFuture' instead. Since 2.6.0.Creates aSource
from supplied future factory that is not called until downstream demand. When source gets materialized the materialized future is completed with the value from the factory. If downstream cancels or fails without any demand the create factory is never called and the materializedFuture
is failed.- See Also:
Source.lazily
-
future
public static <T> Source<T,NotUsed> future(scala.concurrent.Future<T> futureElement)
Emits a single value when the givenFuture
is successfully completed and then completes the stream. The stream fails if theFuture
is completed with a failure.
-
never
public static <T> Source<T,NotUsed> never()
Never emits any elements, never completes and never fails. This stream could be useful in tests.
-
completionStage
public static <T> Source<T,NotUsed> completionStage(java.util.concurrent.CompletionStage<T> completionStage)
Emits a single value when the givenCompletionStage
is successfully completed and then completes the stream. If theCompletionStage
is completed with a failure the stream is failed.Here for Java interoperability, the normal use from Scala should be
Source.future
-
futureSource
public static <T,M> Source<T,scala.concurrent.Future<M>> futureSource(scala.concurrent.Future<Source<T,M>> futureSource)
Turn aFuture[Source]
into a source that will emit the values of the source when the future completes successfully. If theFuture
is completed with a failure the stream is failed.
-
lazySingle
public static <T> Source<T,NotUsed> lazySingle(scala.Function0<T> create)
Defers invoking thecreate
function to create a single element until there is downstream demand.If the
create
function fails when invoked the stream is failed.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
-
lazyFuture
public static <T> Source<T,NotUsed> lazyFuture(scala.Function0<scala.concurrent.Future<T>> create)
Defers invoking thecreate
function to create a future element until there is downstream demand.The returned future element will be emitted downstream when it completes, or fail the stream if the future is failed or the
create
function itself fails.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
-
lazySource
public static <T,M> Source<T,scala.concurrent.Future<M>> lazySource(scala.Function0<Source<T,M>> create)
Defers invoking thecreate
function to create a future source until there is downstream demand.The returned source will emit downstream and behave just like it was the outer source. Downstream completes when the created source completes and fails when the created source fails.
Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
The materialized future value is completed with the materialized value of the created source when it has been materialized. If the function throws or the source materialization fails the future materialized value is failed with the thrown exception.
If downstream cancels or fails before the function is invoked the materialized value is failed with a
NeverMaterializedException
-
lazyFutureSource
public static <T,M> Source<T,scala.concurrent.Future<M>> lazyFutureSource(scala.Function0<scala.concurrent.Future<Source<T,M>>> create)
Defers invoking thecreate
function to create a future source until there is downstream demand.The returned future source will emit downstream and behave just like it was the outer source when the future completes successfully. Downstream completes when the created source completes and fails when the created source fails. If the future or the
create
function fails the stream is failed.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and triggers the factory immediately.
The materialized future value is completed with the materialized value of the created source when it has been materialized. If the function throws or the source materialization fails the future materialized value is failed with the thrown exception.
If downstream cancels or fails before the function is invoked the materialized value is failed with a
NeverMaterializedException
-
asSubscriber
public static <T> Source<T,org.reactivestreams.Subscriber<T>> asSubscriber()
Creates aSource
that is materialized as aSubscriber
-
actorRef
public static <T> Source<T,ActorRef> actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)
Creates aSource
that is materialized as anActorRef
. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategy
it might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressure
is not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSize
of 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSize
is 0 theoverflowStrategy
does not matter.The stream can be completed successfully by sending the actor reference a message that is matched by
completionMatcher
in which case already buffered elements will be signaled before signaling completion.The stream can be completed with failure by sending a message that is matched by
failureMatcher
. The extractedThrowable
will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received a message matched bycompletionMatcher
) before signaling completion and it receives a message matched byfailureMatcher
, the failure will be signaled downstream immediately (instead of the completion signal).Note that terminating the actor without first completing it, either with a success or a failure, will prevent the actor triggering downstream completion and the stream will continue to run even though the source actor is dead. Therefore you should **not** attempt to manually terminate the actor such as with a
PoisonPill
.The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue
.- Parameters:
completionMatcher
- catches the completion message to end the streamfailureMatcher
- catches the failure message to fail the streambufferSize
- The size of the buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the buffer
-
actorRef
public static <T> Source<T,ActorRef> actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Deprecated.Use variant accepting completion and failure matchers instead. Since 2.6.0.Creates aSource
that is materialized as anActorRef
. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategy
it might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressure
is not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSize
of 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSize
is 0 theoverflowStrategy
does not matter.The stream can be completed successfully by sending the actor reference a
Status.Success
. If the content isakka.stream.CompletionStrategy.immediately
the completion will be signaled immediately. Otherwise, if the content isakka.stream.CompletionStrategy.draining
(or anything else) already buffered elements will be sent out before signaling completion. UsingPoisonPill
orakka.actor.ActorSystem.stop
to stop the actor and complete the stream is *not supported*.The stream can be completed with failure by sending a
Status.Failure
to the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success
) before signaling completion and it receives aStatus.Failure
, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue
.- Parameters:
bufferSize
- The size of the buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the buffer
-
actorRefWithBackpressure
public static <T> Source<T,ActorRef> actorRefWithBackpressure(java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)
Creates aSource
that is materialized as anActorRef
. Messages sent to this actor will be emitted to the stream if there is demand from downstream, and a new message will only be accepted after the previous messages has been consumed and acknowledged back. The stream will complete with failure if a message is sent before the acknowledgement has been replied back.The stream can be completed with failure by sending a message that is matched by
failureMatcher
. The extractedThrowable
will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received a message matched bycompletionMatcher
) before signaling completion and it receives a message matched byfailureMatcher
, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
-
actorRefWithAck
public static <T> Source<T,ActorRef> actorRefWithAck(java.lang.Object ackMessage)
Deprecated.Use actorRefWithBackpressure accepting completion and failure matchers instead. Since 2.6.0.Creates aSource
that is materialized as anActorRef
. Messages sent to this actor will be emitted to the stream if there is demand from downstream, and a new message will only be accepted after the previous messages has been consumed and acknowledged back. The stream will complete with failure if a message is sent before the acknowledgement has been replied back.The stream can be completed successfully by sending the actor reference a
Status.Success
. If the content isakka.stream.CompletionStrategy.immediately
the completion will be signaled immediately, otherwise if the content isakka.stream.CompletionStrategy.draining
(or anything else) already buffered element will be signaled before signaling completion.The stream can be completed with failure by sending a
Status.Failure
to the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success
) before signaling completion and it receives aStatus.Failure
, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
-
combine
public static <T,U> Source<U,NotUsed> combine(Source<T,?> first, Source<T,?> second, scala.collection.immutable.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
-
combine
public static <T,U,M> Source<U,scala.collection.immutable.Seq<M>> combine(scala.collection.immutable.Seq<Graph<SourceShape<T>,M>> sources, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
-
combineMat
public static <T,U,M1,M2,M> Source<U,M> combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy, scala.Function2<M1,M2,M> matF)
-
zipN
public static <T> Source<scala.collection.immutable.Seq<T>,NotUsed> zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of sequences.
-
zipWithN
public static <T,O> Source<O,NotUsed> zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
-
queue
public static <T> Source<T,BoundedSourceQueue<T>> queue(int bufferSize)
Creates aSource
that is materialized as anBoundedSourceQueue
. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter. Elements in the buffer will be discarded if downstream is terminated.Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the queue is failed through the materialized
BoundedQueueSource
or theSource
is cancelled by the downstream. An element that was reported to beenqueued
is not guaranteed to be processed by the rest of the stream. If the queue is failed by callingBoundedQueueSource.fail
or the downstream cancels the stream, elements in the buffer are discarded.Acknowledgement of pushed elements is immediate.
akka.stream.BoundedSourceQueue.offer
returnsQueueOfferResult
which is implemented as:QueueOfferResult.Enqueued
element was added to buffer, but may still be discarded later when the queue is failed or cancelledQueueOfferResult.Dropped
element was droppedQueueOfferResult.QueueComplete
the queue was completed withakka.stream.BoundedSourceQueue.complete
QueueOfferResult.Failure
the queue was failed withakka.stream.BoundedSourceQueue.fail
or if the stream failed- Parameters:
bufferSize
- size of the buffer in number of elements
-
queue
public static <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy)
Creates aSource
that is materialized as anSourceQueueWithComplete
. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.Depending on the defined
OverflowStrategy
it might drop elements if there is no space available in the buffer.Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueueWithComplete.offer
returnsFuture[QueueOfferResult]
which completes withQueueOfferResult.Enqueued
if element was added to buffer or sent downstream. It completes withQueueOfferResult.Dropped
if element was dropped. Can also complete withQueueOfferResult.Failure
- when stream failed orQueueOfferResult.QueueClosed
when downstream is completed.The strategy
akka.stream.OverflowStrategy.backpressure
will not complete lastoffer():Future
call when buffer is full.Instead of using the strategy
akka.stream.OverflowStrategy.dropNew
it's recommended to useSource.queue(bufferSize)
instead which returns aQueueOfferResult
synchronously.You can watch accessibility of stream with
akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion
. It returns future that completes with success when the operator is completed or fails when the stream is failed.The buffer can be disabled by using
bufferSize
of 0 and then received message will wait for downstream demand unless there is another message waiting for downstream demand, in that case offer result will be completed according to the overflow strategy.The materialized SourceQueue may only be used from a single producer.
- Parameters:
bufferSize
- size of buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the buffer
-
queue
public static <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy, int maxConcurrentOffers)
Creates aSource
that is materialized as anSourceQueueWithComplete
. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.Depending on the defined
OverflowStrategy
it might drop elements if there is no space available in the buffer.Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueueWithComplete.offer
returnsFuture[QueueOfferResult]
which completes withQueueOfferResult.Enqueued
if element was added to buffer or sent downstream. It completes withQueueOfferResult.Dropped
if element was dropped. Can also complete withQueueOfferResult.Failure
- when stream failed orQueueOfferResult.QueueClosed
when downstream is completed.The strategy
akka.stream.OverflowStrategy.backpressure
will not completemaxConcurrentOffers
number ofoffer():Future
call when buffer is full.Instead of using the strategy
akka.stream.OverflowStrategy.dropNew
it's recommended to useSource.queue(bufferSize)
instead which returns aQueueOfferResult
synchronously.You can watch accessibility of stream with
akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion
. It returns future that completes with success when the operator is completed or fails when the stream is failed.The buffer can be disabled by using
bufferSize
of 0 and then received message will wait for downstream demand unless there is another message waiting for downstream demand, in that case offer result will be completed according to the overflow strategy.The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers.
- Parameters:
bufferSize
- size of buffer in element countoverflowStrategy
- Strategy that is used when incoming elements cannot fit inside the buffermaxConcurrentOffers
- maximum number of pending offers when buffer is full, should be greater than 0, not applicable whenOverflowStrategy.dropNew
is used
-
unfoldResource
public static <T,S> Source<T,NotUsed> unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)
Start a newSource
from some resource which can be opened, read and closed. Interaction with resource happens in a blocking way.Example:
Source.unfoldResource( () => new BufferedReader(new FileReader("...")), reader => Option(reader.readLine()), reader => reader.close())
You can use the supervision strategy to handle exceptions for
read
function. All exceptions thrown bycreate
orclose
will fail the stream.Restart
supervision strategy will close and create blocking IO again. Default strategy isStop
which means that stream will be terminated on error inread
function by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcher
or set it for a given Source by usingActorAttributes
.Adheres to the
ActorAttributes.SupervisionStrategy
attribute.- Parameters:
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenread
returns None.close
- - function that closes resource
-
unfoldResourceAsync
public static <T,S> Source<T,NotUsed> unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)
Start a newSource
from some resource which can be opened, read and closed. It's similar tounfoldResource
but takes functions that returnFutures
instead of plain values.You can use the supervision strategy to handle exceptions for
read
function or failures of producedFutures
. All exceptions thrown bycreate
orclose
as well as fails of returned futures will fail the stream.Restart
supervision strategy will close and create resource. Default strategy isStop
which means that stream will be terminated on error inread
function (or future) by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcher
or set it for a given Source by usingActorAttributes
.Adheres to the
ActorAttributes.SupervisionStrategy
attribute.- Parameters:
create
- - function that is called on stream start and creates/opens resource.read
- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenFuture
from read function returns None.close
- - function that closes resource
-
mergePrioritizedN
public static <T> Source<T,NotUsed> mergePrioritizedN(scala.collection.immutable.Seq<scala.Tuple2<Source<T,?>,java.lang.Object>> sourcesAndPriorities, boolean eagerComplete)
Merge multipleSource
s. Prefer the sources depending on the 'priority' parameters. The provided sources and priorities must have the same size and order.'''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
'''backpressures''' when downstream backpressures
'''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting
eagerComplete=true
.)'''Cancels when''' downstream cancels
-
traversalBuilder
public akka.stream.impl.LinearTraversalBuilder traversalBuilder()
Description copied from interface:Graph
INTERNAL API.Every materializable element must be backed by a stream layout module
- Specified by:
traversalBuilder
in interfaceGraph<Out,Mat>
-
shape
public SourceShape<Out> shape()
Description copied from interface:Graph
The shape of a graph is all that is externally visible: its inlets and outlets.
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
viaMat
public <T,Mat2,Mat3> Source<T,Mat3> viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Description copied from interface:FlowOpsMat
Transform thisFlow
by appending the given processing steps.
The+---------------------------------+ | Resulting Flow[In, T, M2] | | | | +------+ +------+ | | | | | | | In ~~> | this | ~~Out~~> | flow | ~~> T | | Mat| | M| | | +------+ +------+ | +---------------------------------+
combine
function is used to compose the materialized values of this flow and that flow into the materialized value of the resulting Flow.It is recommended to use the internally optimized
Keep.left
andKeep.right
combiners where appropriate instead of manually writing functions that pass through one of the values.- Specified by:
viaMat
in interfaceFlowOpsMat<Out,Mat>
-
to
public <Mat2> RunnableGraph<Mat> to(Graph<SinkShape<Out>,Mat2> sink)
-
toMat
public <Mat2,Mat3> RunnableGraph<Mat3> toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
- Specified by:
toMat
in interfaceFlowOpsMat<Out,Mat>
-
mapMaterializedValue
public <Mat2> Source<Out,Mat2> mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform only the materialized value of this Source, leaving all other properties as they were.- Specified by:
mapMaterializedValue
in interfaceFlowOpsMat<Out,Mat>
-
preMaterialize
public scala.Tuple2<Mat,Source<Out,NotUsed>> preMaterialize(Materializer materializer)
Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can be used to consume elements from the newly materialized Source.Note that
preMaterialize
is implemented through a reactive streamsPublisher
which means that a buffer is introduced and that errors are not propagated upstream but are turned into cancellations without error details.
-
run
public scala.concurrent.Future<Done> run(Materializer materializer)
Connect thisSource
to theSink.ignore
and run it. Elements from the stream will be consumed and discarded.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
runWith
public <Mat2> Mat2 runWith(Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Connect thisSource
to aSink
and run it. The returned value is the materialized value of theSink
, e.g. thePublisher
of aakka.stream.scaladsl.Sink#publisher
.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
runFold
public <U> scala.concurrent.Future<U> runFold(U zero, scala.Function2<U,Out,U> f, Materializer materializer)
Shortcut for running thisSource
with a fold function. The given function is invoked for every received element, giving it its previous output (or the givenzero
value) and the element as input. The returnedFuture
will be completed with value of the final function evaluation when the input stream ends, or completed withFailure
if there is a failure signaled in the stream.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
runFoldAsync
public <U> scala.concurrent.Future<U> runFoldAsync(U zero, scala.Function2<U,Out,scala.concurrent.Future<U>> f, Materializer materializer)
Shortcut for running thisSource
with a foldAsync function. The given function is invoked for every received element, giving it its previous output (or the givenzero
value) and the element as input. The returnedFuture
will be completed with value of the final function evaluation when the input stream ends, or completed withFailure
if there is a failure signaled in the stream.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
runReduce
public <U> scala.concurrent.Future<U> runReduce(scala.Function2<U,U,U> f, Materializer materializer)
Shortcut for running thisSource
with a reduce function. The given function is invoked for every received element, giving it its previous output (from the second element) and the element as input. The returnedFuture
will be completed with value of the final function evaluation when the input stream ends, or completed withFailure
if there is a failure signaled in the stream.If the stream is empty (i.e. completes before signalling any elements), the reduce operator will fail its downstream with a
NoSuchElementException
, which is semantically in-line with that Scala's standard library collections do in such situations.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
runForeach
public scala.concurrent.Future<Done> runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f, Materializer materializer)
Shortcut for running thisSource
with a foreach procedure. The given procedure is invoked for each received element. The returnedFuture
will be completed withSuccess
when reaching the normal end of the stream, or completed withFailure
if there is a failure signaled in the stream.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
withAttributes
public Source<Out,Mat> withAttributes(Attributes attr)
Replace the attributes of thisSource
with the given ones. If this Source is a composite of multiple graphs, new attributes on the composite will be less specific than attributes set directly on the individual graphs of the composite.- Specified by:
withAttributes
in interfaceFlowOps<Out,Mat>
- Specified by:
withAttributes
in interfaceGraph<Out,Mat>
-
addAttributes
public Source<Out,Mat> addAttributes(Attributes attr)
Add the given attributes to this Source. If the specific attribute was already on this source it will replace the previous value. If this Source is a composite of multiple graphs, the added attributes will be on the composite and therefore less specific than attributes set directly on the individual graphs of the composite.- Specified by:
addAttributes
in interfaceFlowOps<Out,Mat>
- Specified by:
addAttributes
in interfaceGraph<Out,Mat>
-
async
public Source<Out,Mat> async(java.lang.String dispatcher)
Put an asynchronous boundary around thisGraph
-
async
public Source<Out,Mat> async(java.lang.String dispatcher, int inputBufferSize)
Put an asynchronous boundary around thisGraph
-
asJava
public Source<Out,Mat> asJava()
Converts this Scala DSL element to it's Java DSL counterpart.
-
asSourceWithContext
public <Ctx> SourceWithContext<Out,Ctx,Mat> asSourceWithContext(scala.Function1<Out,Ctx> f)
Transform this source whose element ise
into a source producing tuple
(e, f(e))
-
getAttributes
public Attributes getAttributes()
- Specified by:
getAttributes
in interfaceGraph<Out,Mat>
-
-