Class Source$
- java.lang.Object
-
- akka.stream.scaladsl.Source$
-
public class Source$ extends java.lang.Object
-
-
Constructor Summary
Constructors Constructor Description Source$()
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description <T> Source<T,ActorRef>actorRef(int bufferSize, OverflowStrategy overflowStrategy)Deprecated.Use variant accepting completion and failure matchers instead.<T> Source<T,ActorRef>actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)Creates aSourcethat is materialized as anActorRef.<T> Source<T,ActorRef>actorRefWithAck(java.lang.Object ackMessage)Deprecated.Use actorRefWithBackpressure accepting completion and failure matchers instead.<T> Source<T,ActorRef>actorRefWithBackpressure(java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)Creates aSourcethat is materialized as anActorRef.<T> Source<T,NotUsed>apply(scala.collection.immutable.Iterable<T> iterable)Helper to createSourcefromIterable.<T> Source<T,org.reactivestreams.Subscriber<T>>asSubscriber()Creates aSourcethat is materialized as aSubscriber<T,U>
Source<U,NotUsed>combine(Source<T,?> first, Source<T,?> second, scala.collection.immutable.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)<T,U,M>
Source<U,scala.collection.immutable.Seq<M>>combine(scala.collection.immutable.Seq<Graph<SourceShape<T>,M>> sources, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)<T,U,M1,M2,M>
Source<U,M>combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy, scala.Function2<M1,M2,M> matF)<T> Source<T,NotUsed>completionStage(java.util.concurrent.CompletionStage<T> completionStage)Emits a single value when the givenCompletionStageis successfully completed and then completes the stream.<T> Source<T,NotUsed>cycle(scala.Function0<scala.collection.Iterator<T>> f)Creates aSourcethat will continually produce elements in the order they are provided.<T> Source<T,NotUsed>empty()ASourcewith no elements, i.e.<T> Source<T,NotUsed>failed(java.lang.Throwable cause)Create aSourcethat immediately ends the stream with thecauseerror to every connectedSink.<T> Source<T,NotUsed>fromCompletionStage(java.util.concurrent.CompletionStage<T> future)Deprecated.Use 'Source.completionStage' instead.<T> Source<T,NotUsed>fromFuture(scala.concurrent.Future<T> future)Deprecated.Use 'Source.future' instead.<T,M>
Source<T,scala.concurrent.Future<M>>fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)Deprecated.Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead.<T,M>
Source<T,M>fromGraph(Graph<SourceShape<T>,M> g)A graph with the shape of a source logically is a source, this method makes it so also in type.<T> Source<T,NotUsed>fromIterator(scala.Function0<scala.collection.Iterator<T>> f)Helper to create aSourcefrom anIterator.<T,S extends java.util.stream.BaseStream<T,S>>
Source<T,NotUsed>fromJavaStream(scala.Function0<java.util.stream.BaseStream<T,S>> stream)Creates a source that wraps a Java 8Stream.<T,M>
Source<T,scala.concurrent.Future<M>>fromMaterializer(scala.Function2<Materializer,Attributes,Source<T,M>> factory)Defers the creation of aSourceuntil materialization.<T> Source<T,NotUsed>fromPublisher(org.reactivestreams.Publisher<T> publisher)Helper to createSourcefromPublisher.<T,M>
Source<T,java.util.concurrent.CompletionStage<M>>fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)Deprecated.Use scala-compat CompletionStage to future converter and 'Source.futureSource' instead.<T> Source<T,NotUsed>future(scala.concurrent.Future<T> futureElement)Emits a single value when the givenFutureis successfully completed and then completes the stream.<T,M>
Source<T,scala.concurrent.Future<M>>futureSource(scala.concurrent.Future<Source<T,M>> futureSource)Turn aFuture[Source]into a source that will emit the values of the source when the future completes successfully.<T,M>
Source<T,scala.concurrent.Future<M>>lazily(scala.Function0<Source<T,M>> create)Deprecated.Use 'Source.lazySource' instead.<T> Source<T,scala.concurrent.Future<NotUsed>>lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)Deprecated.Use 'Source.lazyFuture' instead.<T> Source<T,NotUsed>lazyFuture(scala.Function0<scala.concurrent.Future<T>> create)Defers invoking thecreatefunction to create a future element until there is downstream demand.<T,M>
Source<T,scala.concurrent.Future<M>>lazyFutureSource(scala.Function0<scala.concurrent.Future<Source<T,M>>> create)Defers invoking thecreatefunction to create a future source until there is downstream demand.<T> Source<T,NotUsed>lazySingle(scala.Function0<T> create)Defers invoking thecreatefunction to create a single element until there is downstream demand.<T,M>
Source<T,scala.concurrent.Future<M>>lazySource(scala.Function0<Source<T,M>> create)Defers invoking thecreatefunction to create a future source until there is downstream demand.<T> Source<T,scala.concurrent.Promise<scala.Option<T>>>maybe()Create aSourcewhich materializes aPromisewhich controls what element will be emitted by the Source.<T> Source<T,NotUsed>mergePrioritizedN(scala.collection.immutable.Seq<scala.Tuple2<Source<T,?>,java.lang.Object>> sourcesAndPriorities, boolean eagerComplete)Merge multipleSources.<T> Source<T,NotUsed>never()Never emits any elements, never completes and never fails.<T> Source<T,BoundedSourceQueue<T>>queue(int bufferSize)Creates aSourcethat is materialized as anBoundedSourceQueue.<T> Source<T,SourceQueueWithComplete<T>>queue(int bufferSize, OverflowStrategy overflowStrategy)Creates aSourcethat is materialized as anSourceQueueWithComplete.<T> Source<T,SourceQueueWithComplete<T>>queue(int bufferSize, OverflowStrategy overflowStrategy, int maxConcurrentOffers)Creates aSourcethat is materialized as anSourceQueueWithComplete.<T> Source<T,NotUsed>repeat(T element)Create aSourcethat will continually emit the given element.<T,M>
Source<T,scala.concurrent.Future<M>>setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)Deprecated.Use 'fromMaterializer' instead.<T> SourceShape<T>shape(java.lang.String name)INTERNAL API<T> Source<T,NotUsed>single(T element)Create aSourcewith one element.<T> Source<T,Cancellable>tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)Elements are emitted periodically with the specified interval.<S,E>
Source<E,NotUsed>unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)Create aSourcethat will unfold a value of typeSinto a pair of the next stateSand output elements of typeE.<S,E>
Source<E,NotUsed>unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>), but uses an async function to generate the next state-element tuple.<T,S>
Source<T,NotUsed>unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)Start a newSourcefrom some resource which can be opened, read and closed.<T,S>
Source<T,NotUsed>unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)Start a newSourcefrom some resource which can be opened, read and closed.<T> Source<scala.collection.immutable.Seq<T>,NotUsed>zipN(scala.collection.immutable.Seq<Source<T,?>> sources)Combine the elements of multiple streams into a stream of sequences.<T,O>
Source<O,NotUsed>zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
-
-
-
Field Detail
-
MODULE$
public static final Source$ MODULE$
Static reference to the singleton instance of this Scala object.
-
-
Method Detail
-
shape
public <T> SourceShape<T> shape(java.lang.String name)
INTERNAL API
-
fromPublisher
public <T> Source<T,NotUsed> fromPublisher(org.reactivestreams.Publisher<T> publisher)
Helper to createSourcefromPublisher.Construct a transformation starting with given publisher. The transformation steps are executed by a series of
Processorinstances that mediate the flow of elements downstream and the propagation of back-pressure upstream.
-
fromIterator
public <T> Source<T,NotUsed> fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Helper to create aSourcefrom anIterator.Example usage:
Source.fromIterator(() => Iterator.from(0))Start a new
Sourcefrom the given function that produces anIterator. The produced stream of elements will continue until the iterator runs empty or fails during evaluation of thenext()method. Elements are pulled out of the iterator in accordance with the demand coming from the downstream transformation steps.
-
fromJavaStream
public <T,S extends java.util.stream.BaseStream<T,S>> Source<T,NotUsed> fromJavaStream(scala.Function0<java.util.stream.BaseStream<T,S>> stream)
Creates a source that wraps a Java 8Stream.Sourceuses a stream iterator to get all its elements and send them downstream on demand.You can use
Source.asyncto create asynchronous boundaries between synchronous JavaStreamand the rest of flow.
-
cycle
public <T> Source<T,NotUsed> cycle(scala.Function0<scala.collection.Iterator<T>> f)
Creates aSourcethat will continually produce elements in the order they are provided.The following example produces a
Sourcethat repeatedly cycles through the integers from 0 to 9:Source.cycle(() => Iterator.range(0, 10))The function
fis invoked to obtain anIteratorand elements are emitted into the stream as provided by that iterator. If the iterator is finite, the functionfinvoked again, as necessary, when the elements from the previous iteration are exhausted. If every call to the functionfreturns an iterator that produces the same elements in the same order, then theSourcecan be described as cyclic. However,fis not required to behave that way, in which case theSourcewill not be cyclic.The
Sourcefails iffreturns an empty iterator.
-
fromGraph
public <T,M> Source<T,M> fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes it so also in type.
-
fromMaterializer
public <T,M> Source<T,scala.concurrent.Future<M>> fromMaterializer(scala.Function2<Materializer,Attributes,Source<T,M>> factory)
-
setup
public <T,M> Source<T,scala.concurrent.Future<M>> setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)
Deprecated.Use 'fromMaterializer' instead. Since 2.6.0.
-
apply
public <T> Source<T,NotUsed> apply(scala.collection.immutable.Iterable<T> iterable)
Helper to createSourcefromIterable. Example usage:Source(Seq(1,2,3))Starts a new
Sourcefrom the givenIterable. This is like starting from an Iterator, but every Subscriber directly attached to the Publisher of this stream will see an individual flow of elements (always starting from the beginning) regardless of when they subscribed.
-
fromFuture
public <T> Source<T,NotUsed> fromFuture(scala.concurrent.Future<T> future)
Deprecated.Use 'Source.future' instead. Since 2.6.0.Starts a newSourcefrom the givenFuture. The stream will consist of one element when theFutureis completed with a successful value, which may happen before or after materializing theFlow. The stream terminates with a failure if theFutureis completed with a failure.
-
fromCompletionStage
public <T> Source<T,NotUsed> fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Deprecated.Use 'Source.completionStage' instead. Since 2.6.0.Starts a newSourcefrom the givenFuture. The stream will consist of one element when theFutureis completed with a successful value, which may happen before or after materializing theFlow. The stream terminates with a failure if theFutureis completed with a failure.
-
fromFutureSource
public <T,M> Source<T,scala.concurrent.Future<M>> fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)
Deprecated.Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead. Since 2.6.0.Streams the elements of the given future source once it successfully completes. If theFuturefails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuturewill be failed with aStreamDetachedException
-
fromSourceCompletionStage
public <T,M> Source<T,java.util.concurrent.CompletionStage<M>> fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)
Deprecated.Use scala-compat CompletionStage to future converter and 'Source.futureSource' instead. Since 2.6.0.Streams the elements of an asynchronous source once its givencompletionoperator completes. If theCompletionStagefails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuturewill be failed with aStreamDetachedException
-
tick
public <T> Source<T,Cancellable> tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)
Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements.
-
single
public <T> Source<T,NotUsed> single(T element)
Create aSourcewith one element. Every connectedSinkof this stream will see an individual stream consisting of one element.
-
repeat
public <T> Source<T,NotUsed> repeat(T element)
Create aSourcethat will continually emit the given element.
-
unfold
public <S,E> Source<E,NotUsed> unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Create aSourcethat will unfold a value of typeSinto a pair of the next stateSand output elements of typeE.For example, all the Fibonacci numbers under 10M:
Source.unfold(0 -> 1) { case (a, _) if a > 10000000 => None case (a, b) => Some((b -> (a + b)) -> a) }
-
unfoldAsync
public <S,E> Source<E,NotUsed> unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>), but uses an async function to generate the next state-element tuple.async fibonacci example:
Source.unfoldAsync(0 -> 1) { case (a, _) if a > 10000000 => Future.successful(None) case (a, b) => Future{ Thread.sleep(1000) Some((b -> (a + b)) -> a) } }
-
empty
public <T> Source<T,NotUsed> empty()
ASourcewith no elements, i.e. an empty stream that is completed immediately for every connectedSink.
-
maybe
public <T> Source<T,scala.concurrent.Promise<scala.Option<T>>> maybe()
Create aSourcewhich materializes aPromisewhich controls what element will be emitted by the Source. If the materialized promise is completed with a Some, that value will be produced downstream, followed by completion. If the materialized promise is completed with a None, no value will be produced downstream and completion will be signalled immediately. If the materialized promise is completed with a failure, then the source will fail with that error. If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed with None.
-
failed
public <T> Source<T,NotUsed> failed(java.lang.Throwable cause)
Create aSourcethat immediately ends the stream with thecauseerror to every connectedSink.
-
lazily
public <T,M> Source<T,scala.concurrent.Future<M>> lazily(scala.Function0<Source<T,M>> create)
Deprecated.Use 'Source.lazySource' instead. Since 2.6.0.Creates aSourcethat is not materialized until there is downstream demand, when the source gets materialized the materialized future is completed with its value, if downstream cancels or fails without any demand the create factory is never called and the materializedFutureis failed.
-
lazilyAsync
public <T> Source<T,scala.concurrent.Future<NotUsed>> lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)
Deprecated.Use 'Source.lazyFuture' instead. Since 2.6.0.Creates aSourcefrom supplied future factory that is not called until downstream demand. When source gets materialized the materialized future is completed with the value from the factory. If downstream cancels or fails without any demand the create factory is never called and the materializedFutureis failed.- See Also:
Source.lazily
-
future
public <T> Source<T,NotUsed> future(scala.concurrent.Future<T> futureElement)
Emits a single value when the givenFutureis successfully completed and then completes the stream. The stream fails if theFutureis completed with a failure.
-
never
public <T> Source<T,NotUsed> never()
Never emits any elements, never completes and never fails. This stream could be useful in tests.
-
completionStage
public <T> Source<T,NotUsed> completionStage(java.util.concurrent.CompletionStage<T> completionStage)
Emits a single value when the givenCompletionStageis successfully completed and then completes the stream. If theCompletionStageis completed with a failure the stream is failed.Here for Java interoperability, the normal use from Scala should be
Source.future
-
futureSource
public <T,M> Source<T,scala.concurrent.Future<M>> futureSource(scala.concurrent.Future<Source<T,M>> futureSource)
Turn aFuture[Source]into a source that will emit the values of the source when the future completes successfully. If theFutureis completed with a failure the stream is failed.
-
lazySingle
public <T> Source<T,NotUsed> lazySingle(scala.Function0<T> create)
Defers invoking thecreatefunction to create a single element until there is downstream demand.If the
createfunction fails when invoked the stream is failed.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
-
lazyFuture
public <T> Source<T,NotUsed> lazyFuture(scala.Function0<scala.concurrent.Future<T>> create)
Defers invoking thecreatefunction to create a future element until there is downstream demand.The returned future element will be emitted downstream when it completes, or fail the stream if the future is failed or the
createfunction itself fails.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
-
lazySource
public <T,M> Source<T,scala.concurrent.Future<M>> lazySource(scala.Function0<Source<T,M>> create)
Defers invoking thecreatefunction to create a future source until there is downstream demand.The returned source will emit downstream and behave just like it was the outer source. Downstream completes when the created source completes and fails when the created source fails.
Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and will trigger the factory immediately.
The materialized future value is completed with the materialized value of the created source when it has been materialized. If the function throws or the source materialization fails the future materialized value is failed with the thrown exception.
If downstream cancels or fails before the function is invoked the materialized value is failed with a
NeverMaterializedException
-
lazyFutureSource
public <T,M> Source<T,scala.concurrent.Future<M>> lazyFutureSource(scala.Function0<scala.concurrent.Future<Source<T,M>>> create)
Defers invoking thecreatefunction to create a future source until there is downstream demand.The returned future source will emit downstream and behave just like it was the outer source when the future completes successfully. Downstream completes when the created source completes and fails when the created source fails. If the future or the
createfunction fails the stream is failed.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and triggers the factory immediately.
The materialized future value is completed with the materialized value of the created source when it has been materialized. If the function throws or the source materialization fails the future materialized value is failed with the thrown exception.
If downstream cancels or fails before the function is invoked the materialized value is failed with a
NeverMaterializedException
-
asSubscriber
public <T> Source<T,org.reactivestreams.Subscriber<T>> asSubscriber()
Creates aSourcethat is materialized as aSubscriber
-
actorRef
public <T> Source<T,ActorRef> actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)
Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressureis not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSizeof 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSizeis 0 theoverflowStrategydoes not matter.The stream can be completed successfully by sending the actor reference a message that is matched by
completionMatcherin which case already buffered elements will be signaled before signaling completion.The stream can be completed with failure by sending a message that is matched by
failureMatcher. The extractedThrowablewill be used to fail the stream. In case the Actor is still draining its internal buffer (after having received a message matched bycompletionMatcher) before signaling completion and it receives a message matched byfailureMatcher, the failure will be signaled downstream immediately (instead of the completion signal).Note that terminating the actor without first completing it, either with a success or a failure, will prevent the actor triggering downstream completion and the stream will continue to run even though the source actor is dead. Therefore you should **not** attempt to manually terminate the actor such as with a
PoisonPill.The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue.- Parameters:
completionMatcher- catches the completion message to end the streamfailureMatcher- catches the failure message to fail the streambufferSize- The size of the buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffer
-
actorRef
public <T> Source<T,ActorRef> actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Deprecated.Use variant accepting completion and failure matchers instead. Since 2.6.0.Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressureis not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSizeof 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSizeis 0 theoverflowStrategydoes not matter.The stream can be completed successfully by sending the actor reference a
Status.Success. If the content isakka.stream.CompletionStrategy.immediatelythe completion will be signaled immediately. Otherwise, if the content isakka.stream.CompletionStrategy.draining(or anything else) already buffered elements will be sent out before signaling completion. UsingPoisonPillorakka.actor.ActorSystem.stopto stop the actor and complete the stream is *not supported*.The stream can be completed with failure by sending a
Status.Failureto the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success) before signaling completion and it receives aStatus.Failure, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue.- Parameters:
bufferSize- The size of the buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffer
-
actorRefWithBackpressure
public <T> Source<T,ActorRef> actorRefWithBackpressure(java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)
Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, and a new message will only be accepted after the previous messages has been consumed and acknowledged back. The stream will complete with failure if a message is sent before the acknowledgement has been replied back.The stream can be completed with failure by sending a message that is matched by
failureMatcher. The extractedThrowablewill be used to fail the stream. In case the Actor is still draining its internal buffer (after having received a message matched bycompletionMatcher) before signaling completion and it receives a message matched byfailureMatcher, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
-
actorRefWithAck
public <T> Source<T,ActorRef> actorRefWithAck(java.lang.Object ackMessage)
Deprecated.Use actorRefWithBackpressure accepting completion and failure matchers instead. Since 2.6.0.Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, and a new message will only be accepted after the previous messages has been consumed and acknowledged back. The stream will complete with failure if a message is sent before the acknowledgement has been replied back.The stream can be completed successfully by sending the actor reference a
Status.Success. If the content isakka.stream.CompletionStrategy.immediatelythe completion will be signaled immediately, otherwise if the content isakka.stream.CompletionStrategy.draining(or anything else) already buffered element will be signaled before signaling completion.The stream can be completed with failure by sending a
Status.Failureto the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success) before signaling completion and it receives aStatus.Failure, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
-
combine
public <T,U> Source<U,NotUsed> combine(Source<T,?> first, Source<T,?> second, scala.collection.immutable.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
-
combine
public <T,U,M> Source<U,scala.collection.immutable.Seq<M>> combine(scala.collection.immutable.Seq<Graph<SourceShape<T>,M>> sources, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy)
-
combineMat
public <T,U,M1,M2,M> Source<U,M> combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> fanInStrategy, scala.Function2<M1,M2,M> matF)
-
zipN
public <T> Source<scala.collection.immutable.Seq<T>,NotUsed> zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of sequences.
-
zipWithN
public <T,O> Source<O,NotUsed> zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
-
queue
public <T> Source<T,BoundedSourceQueue<T>> queue(int bufferSize)
Creates aSourcethat is materialized as anBoundedSourceQueue. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. The buffer size is passed in as a parameter. Elements in the buffer will be discarded if downstream is terminated.Pushed elements may be dropped if there is no space available in the buffer. Elements will also be dropped if the queue is failed through the materialized
BoundedQueueSourceor theSourceis cancelled by the downstream. An element that was reported to beenqueuedis not guaranteed to be processed by the rest of the stream. If the queue is failed by callingBoundedQueueSource.failor the downstream cancels the stream, elements in the buffer are discarded.Acknowledgement of pushed elements is immediate.
akka.stream.BoundedSourceQueue.offerreturnsQueueOfferResultwhich is implemented as:QueueOfferResult.Enqueuedelement was added to buffer, but may still be discarded later when the queue is failed or cancelledQueueOfferResult.Droppedelement was droppedQueueOfferResult.QueueCompletethe queue was completed withakka.stream.BoundedSourceQueue.completeQueueOfferResult.Failurethe queue was failed withakka.stream.BoundedSourceQueue.failor if the stream failed- Parameters:
bufferSize- size of the buffer in number of elements
-
queue
public <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy)
Creates aSourcethat is materialized as anSourceQueueWithComplete. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueueWithComplete.offerreturnsFuture[QueueOfferResult]which completes withQueueOfferResult.Enqueuedif element was added to buffer or sent downstream. It completes withQueueOfferResult.Droppedif element was dropped. Can also complete withQueueOfferResult.Failure- when stream failed orQueueOfferResult.QueueClosedwhen downstream is completed.The strategy
akka.stream.OverflowStrategy.backpressurewill not complete lastoffer():Futurecall when buffer is full.Instead of using the strategy
akka.stream.OverflowStrategy.dropNewit's recommended to useSource.queue(bufferSize)instead which returns aQueueOfferResultsynchronously.You can watch accessibility of stream with
akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion. It returns future that completes with success when the operator is completed or fails when the stream is failed.The buffer can be disabled by using
bufferSizeof 0 and then received message will wait for downstream demand unless there is another message waiting for downstream demand, in that case offer result will be completed according to the overflow strategy.The materialized SourceQueue may only be used from a single producer.
- Parameters:
bufferSize- size of buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffer
-
queue
public <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy, int maxConcurrentOffers)
Creates aSourcethat is materialized as anSourceQueueWithComplete. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueueWithComplete.offerreturnsFuture[QueueOfferResult]which completes withQueueOfferResult.Enqueuedif element was added to buffer or sent downstream. It completes withQueueOfferResult.Droppedif element was dropped. Can also complete withQueueOfferResult.Failure- when stream failed orQueueOfferResult.QueueClosedwhen downstream is completed.The strategy
akka.stream.OverflowStrategy.backpressurewill not completemaxConcurrentOffersnumber ofoffer():Futurecall when buffer is full.Instead of using the strategy
akka.stream.OverflowStrategy.dropNewit's recommended to useSource.queue(bufferSize)instead which returns aQueueOfferResultsynchronously.You can watch accessibility of stream with
akka.stream.scaladsl.SourceQueueWithComplete.watchCompletion. It returns future that completes with success when the operator is completed or fails when the stream is failed.The buffer can be disabled by using
bufferSizeof 0 and then received message will wait for downstream demand unless there is another message waiting for downstream demand, in that case offer result will be completed according to the overflow strategy.The materialized SourceQueue may be used by up to maxConcurrentOffers concurrent producers.
- Parameters:
bufferSize- size of buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffermaxConcurrentOffers- maximum number of pending offers when buffer is full, should be greater than 0, not applicable whenOverflowStrategy.dropNewis used
-
unfoldResource
public <T,S> Source<T,NotUsed> unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)
Start a newSourcefrom some resource which can be opened, read and closed. Interaction with resource happens in a blocking way.Example:
Source.unfoldResource( () => new BufferedReader(new FileReader("...")), reader => Option(reader.readLine()), reader => reader.close())You can use the supervision strategy to handle exceptions for
readfunction. All exceptions thrown bycreateorclosewill fail the stream.Restartsupervision strategy will close and create blocking IO again. Default strategy isStopwhich means that stream will be terminated on error inreadfunction by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcheror set it for a given Source by usingActorAttributes.Adheres to the
ActorAttributes.SupervisionStrategyattribute.- Parameters:
create- - function that is called on stream start and creates/opens resource.read- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenreadreturns None.close- - function that closes resource
-
unfoldResourceAsync
public <T,S> Source<T,NotUsed> unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)
Start a newSourcefrom some resource which can be opened, read and closed. It's similar tounfoldResourcebut takes functions that returnFuturesinstead of plain values.You can use the supervision strategy to handle exceptions for
readfunction or failures of producedFutures. All exceptions thrown bycreateorcloseas well as fails of returned futures will fail the stream.Restartsupervision strategy will close and create resource. Default strategy isStopwhich means that stream will be terminated on error inreadfunction (or future) by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcheror set it for a given Source by usingActorAttributes.Adheres to the
ActorAttributes.SupervisionStrategyattribute.- Parameters:
create- - function that is called on stream start and creates/opens resource.read- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenFuturefrom read function returns None.close- - function that closes resource
-
mergePrioritizedN
public <T> Source<T,NotUsed> mergePrioritizedN(scala.collection.immutable.Seq<scala.Tuple2<Source<T,?>,java.lang.Object>> sourcesAndPriorities, boolean eagerComplete)
Merge multipleSources. Prefer the sources depending on the 'priority' parameters. The provided sources and priorities must have the same size and order.'''emits''' when one of the inputs has an element available, preferring inputs based on the 'priority' parameters if both have elements available
'''backpressures''' when downstream backpressures
'''completes''' when both upstreams complete (This behavior is changeable to completing when any upstream completes by setting
eagerComplete=true.)'''Cancels when''' downstream cancels
-
-