Class Source<Out,Mat>
- java.lang.Object
-
- akka.stream.scaladsl.Source<Out,Mat>
-
- All Implemented Interfaces:
Graph<SourceShape<Out>,Mat>,FlowOps<Out,Mat>,FlowOpsMat<Out,Mat>
public final class Source<Out,Mat> extends java.lang.Object implements FlowOpsMat<Out,Mat>, Graph<SourceShape<Out>,Mat>
-
-
Constructor Summary
Constructors Constructor Description Source(LinearTraversalBuilder traversalBuilder, SourceShape<Out> shape)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static <T> Source<T,ActorRef>actorPublisher(Props props)Deprecated.Useakka.stream.stage.GraphStageandfromGraphinstead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.static <T> Source<T,ActorRef>actorRef(int bufferSize, OverflowStrategy overflowStrategy)Creates aSourcethat is materialized as anActorRef.static <T> Source<T,ActorRef>actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)INTERNAL APIstatic <T> Source<T,ActorRef>actorRefWithAck(java.lang.Object ackMessage)Creates aSourcethat is materialized as anActorRef.static <T> Source<T,ActorRef>actorRefWithAck(scala.Option<ActorRef> ackTo, java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)INTERNAL APISource<Out,Mat>addAttributes(Attributes attr)Add the given attributes to this Source.static <T> Source<T,NotUsed>apply(scala.collection.immutable.Iterable<T> iterable)Helper to createSourcefromIterable.Source<Out,Mat>asJava()Converts this Scala DSL element to it's Java DSL counterpart.<Ctx> SourceWithContext<Out,Ctx,Mat>asSourceWithContext(scala.Function1<Out,Ctx> f)API MAY CHANGEstatic <T> Source<T,org.reactivestreams.Subscriber<T>>asSubscriber()Creates aSourcethat is materialized as aSubscriberSource<Out,Mat>async()Put an asynchronous boundary around thisSourceSource<Out,Mat>async(java.lang.String dispatcher)Put an asynchronous boundary around thisGraphSource<Out,Mat>async(java.lang.String dispatcher, int inputBufferSize)Put an asynchronous boundary around thisGraph<T,U>
Source<U,NotUsed>combine(Source<T,?> first, Source<T,?> second, scala.collection.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy)Deprecated.Use `Source.combine` on companion object instead.static <T,U,M1,M2,M>
Source<U,M>combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy, scala.Function2<M1,M2,M> matF)Combines two sources with fan-in strategy likeMergeorConcatand returnsSourcewith a materialized value.static <T> Source<T,NotUsed>cycle(scala.Function0<scala.collection.Iterator<T>> f)CreatesSourcethat will continually produce given elements in specified order.static <T> Source<T,NotUsed>empty()ASourcewith no elements, i.e.static <T> Source<T,NotUsed>failed(java.lang.Throwable cause)Create aSourcethat immediately ends the stream with thecauseerror to every connectedSink.static <T> Source<T,NotUsed>fromCompletionStage(java.util.concurrent.CompletionStage<T> future)Starts a newSourcefrom the givenFuture.static <T> Source<T,NotUsed>fromFuture(scala.concurrent.Future<T> future)Starts a newSourcefrom the givenFuture.static <T,M>
Source<T,scala.concurrent.Future<M>>fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)Streams the elements of the given future source once it successfully completes.static <T,M>
Source<T,M>fromGraph(Graph<SourceShape<T>,M> g)A graph with the shape of a source logically is a source, this method makes it so also in type.static <T> Source<T,NotUsed>fromIterator(scala.Function0<scala.collection.Iterator<T>> f)Helper to createSourcefromIterator.static <T> Source<T,NotUsed>fromPublisher(org.reactivestreams.Publisher<T> publisher)Helper to createSourcefromPublisher.static <T,M>
Source<T,java.util.concurrent.CompletionStage<M>>fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)Streams the elements of an asynchronous source once its givencompletionoperator completes.static <T,M>
Source<T,scala.concurrent.Future<M>>lazily(scala.Function0<Source<T,M>> create)Creates aSourcethat is not materialized until there is downstream demand, when the source gets materialized the materialized future is completed with its value, if downstream cancels or fails without any demand the create factory is never called and the materializedFutureis failed.static <T> Source<T,scala.concurrent.Future<NotUsed>>lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)Creates aSourcefrom supplied future factory that is not called until downstream demand.<Mat2> Source<Out,Mat2>mapMaterializedValue(scala.Function1<Mat,Mat2> f)Transform only the materialized value of this Source, leaving all other properties as they were.static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>>maybe()Create aSourcewhich materializes aPromisewhich controls what element will be emitted by the Source.Source<Out,Mat>named(java.lang.String name)Add anameattribute to this Source.scala.Tuple2<Mat,Source<Out,NotUsed>>preMaterialize(Materializer materializer)Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can be used to consume elements from the newly materialized Source.static <T> Source<T,SourceQueueWithComplete<T>>queue(int bufferSize, OverflowStrategy overflowStrategy)Creates aSourcethat is materialized as anSourceQueue.static <T> Source<T,NotUsed>repeat(T element)Create aSourcethat will continually emit the given element.<U> scala.concurrent.Future<U>runFold(U zero, scala.Function2<U,Out,U> f, Materializer materializer)Shortcut for running thisSourcewith a fold function.<U> scala.concurrent.Future<U>runFoldAsync(U zero, scala.Function2<U,Out,scala.concurrent.Future<U>> f, Materializer materializer)Shortcut for running thisSourcewith a foldAsync function.scala.concurrent.Future<Done>runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f, Materializer materializer)Shortcut for running thisSourcewith a foreach procedure.<U> scala.concurrent.Future<U>runReduce(scala.Function2<U,U,U> f, Materializer materializer)Shortcut for running thisSourcewith a reduce function.<Mat2> Mat2runWith(Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)Connect thisSourceto aSinkand run it.static <T,M>
Source<T,scala.concurrent.Future<M>>setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)Defers the creation of aSourceuntil materialization.SourceShape<Out>shape()The shape of a graph is all that is externally visible: its inlets and outlets.static <T> Source<T,NotUsed>single(T element)Create aSourcewith one element.static <T> Source<T,Cancellable>tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)Elements are emitted periodically with the specified interval.<Mat2> RunnableGraph<Mat>to(Graph<SinkShape<Out>,Mat2> sink)<Mat2,Mat3>
RunnableGraph<Mat3>toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)java.lang.StringtoString()LinearTraversalBuildertraversalBuilder()INTERNAL API.static <S,E>
Source<E,NotUsed>unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)Create aSourcethat will unfold a value of typeSinto a pair of the next stateSand output elements of typeE.static <S,E>
Source<E,NotUsed>unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>), but uses an async function to generate the next state-element tuple.static <T,S>
Source<T,NotUsed>unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)Start a newSourcefrom some resource which can be opened, read and closed.static <T,S>
Source<T,NotUsed>unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)Start a newSourcefrom some resource which can be opened, read and closed.<T,Mat2>
Source<T,Mat>via(Graph<FlowShape<Out,T>,Mat2> flow)<T,Mat2,Mat3>
Source<T,Mat3>viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)Transform thisFlowby appending the given processing steps.Source<Out,Mat>withAttributes(Attributes attr)Replace the attributes of thisSourcewith the given ones.static <T> Source<scala.collection.immutable.Seq<T>,NotUsed>zipN(scala.collection.immutable.Seq<Source<T,?>> sources)Combine the elements of multiple streams into a stream of sequences.static <T,O>
Source<O,NotUsed>zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface akka.stream.scaladsl.FlowOps
$plus$plus, alsoTo, alsoToGraph, ask, ask, backpressureTimeout, batch, batchWeighted, buffer, collect, collectType, completionTimeout, concat, concatGraph, conflate, conflateWithSeed, delay, delay$default$2, detach, divertTo, divertToGraph, drop, dropWhile, dropWithin, expand, extrapolate, extrapolate$default$2, filter, filterNot, flatMapConcat, flatMapMerge, fold, foldAsync, groupBy, groupBy, grouped, groupedWeightedWithin, groupedWithin, idleTimeout, initialDelay, initialTimeout, interleave, interleave, interleaveGraph, interleaveGraph$default$3, intersperse, intersperse, keepAlive, limit, limitWeighted, log, log$default$2, log$default$3, map, mapAsync, mapAsyncUnordered, mapConcat, mapError, merge, merge$default$2, mergeGraph, mergeSorted, mergeSortedGraph, orElse, orElseGraph, prefixAndTail, prepend, prependGraph, recover, recoverWith, recoverWithRetries, reduce, scan, scanAsync, sliding, sliding$default$2, splitAfter, splitAfter, splitWhen, splitWhen, statefulMapConcat, take, takeWhile, takeWhile, takeWithin, throttle, throttle, throttle, throttle, throttleEven, throttleEven, watch, wireTap, wireTap, wireTapGraph, zip, zipGraph, zipLatest, zipLatestGraph, zipLatestWith, zipLatestWithGraph, zipWith, zipWithGraph, zipWithIndex
-
Methods inherited from interface akka.stream.scaladsl.FlowOpsMat
alsoToMat, concatMat, divertToMat, interleaveMat, interleaveMat, mergeMat, mergeMat$default$2, mergeSortedMat, monitor, monitor, monitorMat, orElseMat, prependMat, watchTermination, wireTapMat, zipLatestMat, zipLatestWithMat, zipMat, zipWithMat
-
-
-
-
Constructor Detail
-
Source
public Source(LinearTraversalBuilder traversalBuilder, SourceShape<Out> shape)
-
-
Method Detail
-
fromPublisher
public static <T> Source<T,NotUsed> fromPublisher(org.reactivestreams.Publisher<T> publisher)
Helper to createSourcefromPublisher.Construct a transformation starting with given publisher. The transformation steps are executed by a series of
Processorinstances that mediate the flow of elements downstream and the propagation of back-pressure upstream.- Parameters:
publisher- (undocumented)- Returns:
- (undocumented)
-
fromIterator
public static <T> Source<T,NotUsed> fromIterator(scala.Function0<scala.collection.Iterator<T>> f)
Helper to createSourcefromIterator. Example usage:Source.fromIterator(() => Iterator.from(0))Start a new
Sourcefrom the given function that produces anIterator. The produced stream of elements will continue until the iterator runs empty or fails during evaluation of thenext()method. Elements are pulled out of the iterator in accordance with the demand coming from the downstream transformation steps.- Parameters:
f- (undocumented)- Returns:
- (undocumented)
-
cycle
public static <T> Source<T,NotUsed> cycle(scala.Function0<scala.collection.Iterator<T>> f)
CreatesSourcethat will continually produce given elements in specified order.Starts a new 'cycled'
Sourcefrom the given elements. The producer stream of elements will continue infinitely by repeating the sequence of elements provided by function parameter.- Parameters:
f- (undocumented)- Returns:
- (undocumented)
-
fromGraph
public static <T,M> Source<T,M> fromGraph(Graph<SourceShape<T>,M> g)
A graph with the shape of a source logically is a source, this method makes it so also in type.- Parameters:
g- (undocumented)- Returns:
- (undocumented)
-
setup
public static <T,M> Source<T,scala.concurrent.Future<M>> setup(scala.Function2<ActorMaterializer,Attributes,Source<T,M>> factory)
Defers the creation of aSourceuntil materialization. Thefactoryfunction exposesActorMaterializerwhich is going to be used during materialization andAttributesof theSourcereturned by this method.- Parameters:
factory- (undocumented)- Returns:
- (undocumented)
-
apply
public static <T> Source<T,NotUsed> apply(scala.collection.immutable.Iterable<T> iterable)
Helper to createSourcefromIterable. Example usage:Source(Seq(1,2,3))Starts a new
Sourcefrom the givenIterable. This is like starting from an Iterator, but every Subscriber directly attached to the Publisher of this stream will see an individual flow of elements (always starting from the beginning) regardless of when they subscribed.- Parameters:
iterable- (undocumented)- Returns:
- (undocumented)
-
fromFuture
public static <T> Source<T,NotUsed> fromFuture(scala.concurrent.Future<T> future)
Starts a newSourcefrom the givenFuture. The stream will consist of one element when theFutureis completed with a successful value, which may happen before or after materializing theFlow. The stream terminates with a failure if theFutureis completed with a failure.- Parameters:
future- (undocumented)- Returns:
- (undocumented)
-
fromCompletionStage
public static <T> Source<T,NotUsed> fromCompletionStage(java.util.concurrent.CompletionStage<T> future)
Starts a newSourcefrom the givenFuture. The stream will consist of one element when theFutureis completed with a successful value, which may happen before or after materializing theFlow. The stream terminates with a failure if theFutureis completed with a failure.- Parameters:
future- (undocumented)- Returns:
- (undocumented)
-
fromFutureSource
public static <T,M> Source<T,scala.concurrent.Future<M>> fromFutureSource(scala.concurrent.Future<Graph<SourceShape<T>,M>> future)
Streams the elements of the given future source once it successfully completes. If theFuturefails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuturewill be failed with aStreamDetachedException- Parameters:
future- (undocumented)- Returns:
- (undocumented)
-
fromSourceCompletionStage
public static <T,M> Source<T,java.util.concurrent.CompletionStage<M>> fromSourceCompletionStage(java.util.concurrent.CompletionStage<? extends Graph<SourceShape<T>,M>> completion)
Streams the elements of an asynchronous source once its givencompletionoperator completes. If theCompletionStagefails the stream is failed with the exception from the future. If downstream cancels before the stream completes the materializedFuturewill be failed with aStreamDetachedException- Parameters:
completion- (undocumented)- Returns:
- (undocumented)
-
tick
public static <T> Source<T,Cancellable> tick(scala.concurrent.duration.FiniteDuration initialDelay, scala.concurrent.duration.FiniteDuration interval, T tick)
Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later. It will receive new tick elements as soon as it has requested more elements.- Parameters:
initialDelay- (undocumented)interval- (undocumented)tick- (undocumented)- Returns:
- (undocumented)
-
single
public static <T> Source<T,NotUsed> single(T element)
Create aSourcewith one element. Every connectedSinkof this stream will see an individual stream consisting of one element.- Parameters:
element- (undocumented)- Returns:
- (undocumented)
-
repeat
public static <T> Source<T,NotUsed> repeat(T element)
Create aSourcethat will continually emit the given element.- Parameters:
element- (undocumented)- Returns:
- (undocumented)
-
unfold
public static <S,E> Source<E,NotUsed> unfold(S s, scala.Function1<S,scala.Option<scala.Tuple2<S,E>>> f)
Create aSourcethat will unfold a value of typeSinto a pair of the next stateSand output elements of typeE.For example, all the Fibonacci numbers under 10M:
Source.unfold(0 -> 1) { case (a, _) if a > 10000000 => None case (a, b) => Some((b -> (a + b)) -> a) }- Parameters:
s- (undocumented)f- (undocumented)- Returns:
- (undocumented)
-
unfoldAsync
public static <S,E> Source<E,NotUsed> unfoldAsync(S s, scala.Function1<S,scala.concurrent.Future<scala.Option<scala.Tuple2<S,E>>>> f)
Same as<S,E>unfold(S,scala.Function1<S,scala.Option<scala.Tuple2<S,E>>>), but uses an async function to generate the next state-element tuple.async fibonacci example:
Source.unfoldAsync(0 -> 1) { case (a, _) if a > 10000000 => Future.successful(None) case (a, b) => Future{ Thread.sleep(1000) Some((b -> (a + b)) -> a) } }- Parameters:
s- (undocumented)f- (undocumented)- Returns:
- (undocumented)
-
empty
public static <T> Source<T,NotUsed> empty()
ASourcewith no elements, i.e. an empty stream that is completed immediately for every connectedSink.- Returns:
- (undocumented)
-
maybe
public static <T> Source<T,scala.concurrent.Promise<scala.Option<T>>> maybe()
Create aSourcewhich materializes aPromisewhich controls what element will be emitted by the Source. If the materialized promise is completed with a Some, that value will be produced downstream, followed by completion. If the materialized promise is completed with a None, no value will be produced downstream and completion will be signalled immediately. If the materialized promise is completed with a failure, then the returned source will terminate with that error. If the downstream of this source cancels before the promise has been completed, then the promise will be completed with None.- Returns:
- (undocumented)
-
failed
public static <T> Source<T,NotUsed> failed(java.lang.Throwable cause)
Create aSourcethat immediately ends the stream with thecauseerror to every connectedSink.- Parameters:
cause- (undocumented)- Returns:
- (undocumented)
-
lazily
public static <T,M> Source<T,scala.concurrent.Future<M>> lazily(scala.Function0<Source<T,M>> create)
Creates aSourcethat is not materialized until there is downstream demand, when the source gets materialized the materialized future is completed with its value, if downstream cancels or fails without any demand the create factory is never called and the materializedFutureis failed.- Parameters:
create- (undocumented)- Returns:
- (undocumented)
-
lazilyAsync
public static <T> Source<T,scala.concurrent.Future<NotUsed>> lazilyAsync(scala.Function0<scala.concurrent.Future<T>> create)
Creates aSourcefrom supplied future factory that is not called until downstream demand. When source gets materialized the materialized future is completed with the value from the factory. If downstream cancels or fails without any demand the create factory is never called and the materializedFutureis failed.- Parameters:
create- (undocumented)- Returns:
- (undocumented)
- See Also:
Source.lazily
-
asSubscriber
public static <T> Source<T,org.reactivestreams.Subscriber<T>> asSubscriber()
Creates aSourcethat is materialized as aSubscriber- Returns:
- (undocumented)
-
actorPublisher
public static <T> Source<T,ActorRef> actorPublisher(Props props)
Deprecated.Useakka.stream.stage.GraphStageandfromGraphinstead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.Creates aSourcethat is materialized to anActorRefwhich points to an Actor created according to the passed inProps. Actor created by thepropsmust beActorPublisher.- Parameters:
props- (undocumented)- Returns:
- (undocumented)
-
actorRef
public static <T> Source<T,ActorRef> actorRef(scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher, int bufferSize, OverflowStrategy overflowStrategy)
INTERNAL APICreates a
Sourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressureis not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSizeof 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSizeis 0 theoverflowStrategydoes not matter. An async boundary is added after this Source; as such, it is never safe to assume the downstream will always generate demand.The stream can be completed successfully by sending the actor reference a message that is matched by
completionMatcherin which case already buffered elements will be signaled before signaling completion.The stream can be completed with failure by sending a message that is matched by
failureMatcher. The extractedThrowablewill be used to fail the stream. In case the Actor is still draining its internal buffer (after having received a message matched bycompletionMatcher) before signaling completion and it receives a message matched byfailureMatcher, the failure will be signaled downstream immediately (instead of the completion signal).Note that terminating the actor without first completing it, either with a success or a failure, will prevent the actor triggering downstream completion and the stream will continue to run even though the source actor is dead. Therefore you should **not** attempt to manually terminate the actor such as with a
PoisonPill.The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue.- Parameters:
bufferSize- The size of the buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffercompletionMatcher- (undocumented)failureMatcher- (undocumented)- Returns:
- (undocumented)
-
actorRef
public static <T> Source<T,ActorRef> actorRef(int bufferSize, OverflowStrategy overflowStrategy)
Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.The strategy
akka.stream.OverflowStrategy.backpressureis not supported, and an IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument.The buffer can be disabled by using
bufferSizeof 0 and then received messages are dropped if there is no demand from downstream. WhenbufferSizeis 0 theoverflowStrategydoes not matter. An async boundary is added after this Source; as such, it is never safe to assume the downstream will always generate demand.The stream can be completed successfully by sending the actor reference a
Status.Success. If the content isakka.stream.CompletionStrategy.immediatelythe completion will be signaled immidiately, otherwise if the content isakka.stream.CompletionStrategy.draining(or anything else) already buffered elements will be signaled before siganling completion. SendingPoisonPillwill signal completion immediately but this behavior is deprecated and scheduled to be removed.The stream can be completed with failure by sending a
Status.Failureto the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success) before signaling completion and it receives aStatus.Failure, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
See also
akka.stream.scaladsl.Source.queue.- Parameters:
bufferSize- The size of the buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffer- Returns:
- (undocumented)
-
actorRefWithAck
public static <T> Source<T,ActorRef> actorRefWithAck(scala.Option<ActorRef> ackTo, java.lang.Object ackMessage, scala.PartialFunction<java.lang.Object,CompletionStrategy> completionMatcher, scala.PartialFunction<java.lang.Object,java.lang.Throwable> failureMatcher)
INTERNAL API- Parameters:
ackTo- (undocumented)ackMessage- (undocumented)completionMatcher- (undocumented)failureMatcher- (undocumented)- Returns:
- (undocumented)
-
actorRefWithAck
public static <T> Source<T,ActorRef> actorRefWithAck(java.lang.Object ackMessage)
Creates aSourcethat is materialized as anActorRef. Messages sent to this actor will be emitted to the stream if there is demand from downstream, and a new message will only be accepted after the previous messages has been consumed and acknowledged back. The stream will complete with failure if a message is sent before the acknowledgement has been replied back.The stream can be completed successfully by sending the actor reference a
Status.Success. If the content isakka.stream.CompletionStrategy.immediatelythe completion will be signaled immidiately, otherwise if the content isakka.stream.CompletionStrategy.draining(or anything else) already buffered element will be signaled before siganling completion.The stream can be completed with failure by sending a
Status.Failureto the actor reference. In case the Actor is still draining its internal buffer (after having received aStatus.Success) before signaling completion and it receives aStatus.Failure, the failure will be signaled downstream immediately (instead of the completion signal).The actor will be stopped when the stream is completed, failed or canceled from downstream, i.e. you can watch it to get notified when that happens.
- Parameters:
ackMessage- (undocumented)- Returns:
- (undocumented)
-
combineMat
public static <T,U,M1,M2,M> Source<U,M> combineMat(Source<T,M1> first, Source<T,M2> second, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy, scala.Function2<M1,M2,M> matF)
Combines two sources with fan-in strategy likeMergeorConcatand returnsSourcewith a materialized value.- Parameters:
first- (undocumented)second- (undocumented)strategy- (undocumented)matF- (undocumented)- Returns:
- (undocumented)
-
zipN
public static <T> Source<scala.collection.immutable.Seq<T>,NotUsed> zipN(scala.collection.immutable.Seq<Source<T,?>> sources)
Combine the elements of multiple streams into a stream of sequences.- Parameters:
sources- (undocumented)- Returns:
- (undocumented)
-
zipWithN
public static <T,O> Source<O,NotUsed> zipWithN(scala.Function1<scala.collection.immutable.Seq<T>,O> zipper, scala.collection.immutable.Seq<Source<T,?>> sources)
-
queue
public static <T> Source<T,SourceQueueWithComplete<T>> queue(int bufferSize, OverflowStrategy overflowStrategy)
Creates aSourcethat is materialized as anSourceQueue. You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded if downstream is terminated.Depending on the defined
OverflowStrategyit might drop elements if there is no space available in the buffer.Acknowledgement mechanism is available.
akka.stream.scaladsl.SourceQueue.offerreturnsFuture[QueueOfferResult]which completes withQueueOfferResult.Enqueuedif element was added to buffer or sent downstream. It completes withQueueOfferResult.Droppedif element was dropped. Can also complete withQueueOfferResult.Failure- when stream failed orQueueOfferResult.QueueClosedwhen downstream is completed.The strategy
akka.stream.OverflowStrategy.backpressurewill not complete lastoffer():Futurecall when buffer is full.You can watch accessibility of stream with
akka.stream.scaladsl.SourceQueue.watchCompletion. It returns future that completes with success when the operator is completed or fails when the stream is failed.The buffer can be disabled by using
bufferSizeof 0 and then received message will wait for downstream demand unless there is another message waiting for downstream demand, in that case offer result will be completed according to the overflow strategy.- Parameters:
bufferSize- size of buffer in element countoverflowStrategy- Strategy that is used when incoming elements cannot fit inside the buffer- Returns:
- (undocumented)
-
unfoldResource
public static <T,S> Source<T,NotUsed> unfoldResource(scala.Function0<S> create, scala.Function1<S,scala.Option<T>> read, scala.Function1<S,scala.runtime.BoxedUnit> close)
Start a newSourcefrom some resource which can be opened, read and closed. Interaction with resource happens in a blocking way.Example:
Source.unfoldResource( () => new BufferedReader(new FileReader("...")), reader => Option(reader.readLine()), reader => reader.close())You can use the supervision strategy to handle exceptions for
readfunction. All exceptions thrown bycreateorclosewill fail the stream.Restartsupervision strategy will close and create blocking IO again. Default strategy isStopwhich means that stream will be terminated on error inreadfunction by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcheror set it for a given Source by usingActorAttributes.Adheres to the
ActorAttributes.SupervisionStrategyattribute.- Parameters:
create- - function that is called on stream start and creates/opens resource.read- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenreadreturns None.close- - function that closes resource- Returns:
- (undocumented)
-
unfoldResourceAsync
public static <T,S> Source<T,NotUsed> unfoldResourceAsync(scala.Function0<scala.concurrent.Future<S>> create, scala.Function1<S,scala.concurrent.Future<scala.Option<T>>> read, scala.Function1<S,scala.concurrent.Future<Done>> close)
Start a newSourcefrom some resource which can be opened, read and closed. It's similar tounfoldResourcebut takes functions that returnFuturesinstead of plain values.You can use the supervision strategy to handle exceptions for
readfunction or failures of producedFutures. All exceptions thrown bycreateorcloseas well as fails of returned futures will fail the stream.Restartsupervision strategy will close and create resource. Default strategy isStopwhich means that stream will be terminated on error inreadfunction (or future) by default.You can configure the default dispatcher for this Source by changing the
akka.stream.materializer.blocking-io-dispatcheror set it for a given Source by usingActorAttributes.Adheres to the
ActorAttributes.SupervisionStrategyattribute.- Parameters:
create- - function that is called on stream start and creates/opens resource.read- - function that reads data from opened resource. It is called each time backpressure signal is received. Stream calls close and completes whenFuturefrom read function returns None.close- - function that closes resource- Returns:
- (undocumented)
-
traversalBuilder
public LinearTraversalBuilder traversalBuilder()
Description copied from interface:GraphINTERNAL API.Every materializable element must be backed by a stream layout module
- Specified by:
traversalBuilderin interfaceGraph<Out,Mat>- Returns:
- (undocumented)
-
shape
public SourceShape<Out> shape()
Description copied from interface:GraphThe shape of a graph is all that is externally visible: its inlets and outlets.
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
viaMat
public <T,Mat2,Mat3> Source<T,Mat3> viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Description copied from interface:FlowOpsMatTransform thisFlowby appending the given processing steps.
The+---------------------------------+ | Resulting Flow[In, T, M2] | | | | +------+ +------+ | | | | | | | In ~~> | this | ~~Out~~> | flow | ~~> T | | Mat| | M| | | +------+ +------+ | +---------------------------------+combinefunction is used to compose the materialized values of this flow and that flow into the materialized value of the resulting Flow.It is recommended to use the internally optimized
Keep.leftandKeep.rightcombiners where appropriate instead of manually writing functions that pass through one of the values.- Specified by:
viaMatin interfaceFlowOpsMat<Out,Mat>- Parameters:
flow- (undocumented)combine- (undocumented)- Returns:
- (undocumented)
-
to
public <Mat2> RunnableGraph<Mat> to(Graph<SinkShape<Out>,Mat2> sink)
-
toMat
public <Mat2,Mat3> RunnableGraph<Mat3> toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
- Specified by:
toMatin interfaceFlowOpsMat<Out,Mat>- Parameters:
sink- (undocumented)combine- (undocumented)- Returns:
- (undocumented)
-
mapMaterializedValue
public <Mat2> Source<Out,Mat2> mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform only the materialized value of this Source, leaving all other properties as they were.- Specified by:
mapMaterializedValuein interfaceFlowOpsMat<Out,Mat>- Parameters:
f- (undocumented)- Returns:
- (undocumented)
-
preMaterialize
public scala.Tuple2<Mat,Source<Out,NotUsed>> preMaterialize(Materializer materializer)
Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source that can be used to consume elements from the newly materialized Source.- Parameters:
materializer- (undocumented)- Returns:
- (undocumented)
-
runWith
public <Mat2> Mat2 runWith(Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Connect thisSourceto aSinkand run it. The returned value is the materialized value of theSink, e.g. thePublisherof aakka.stream.scaladsl.Sink#publisher.- Parameters:
sink- (undocumented)materializer- (undocumented)- Returns:
- (undocumented)
-
runFold
public <U> scala.concurrent.Future<U> runFold(U zero, scala.Function2<U,Out,U> f, Materializer materializer)Shortcut for running thisSourcewith a fold function. The given function is invoked for every received element, giving it its previous output (or the givenzerovalue) and the element as input. The returnedFuturewill be completed with value of the final function evaluation when the input stream ends, or completed withFailureif there is a failure signaled in the stream.- Parameters:
zero- (undocumented)f- (undocumented)materializer- (undocumented)- Returns:
- (undocumented)
-
runFoldAsync
public <U> scala.concurrent.Future<U> runFoldAsync(U zero, scala.Function2<U,Out,scala.concurrent.Future<U>> f, Materializer materializer)Shortcut for running thisSourcewith a foldAsync function. The given function is invoked for every received element, giving it its previous output (or the givenzerovalue) and the element as input. The returnedFuturewill be completed with value of the final function evaluation when the input stream ends, or completed withFailureif there is a failure signaled in the stream.- Parameters:
zero- (undocumented)f- (undocumented)materializer- (undocumented)- Returns:
- (undocumented)
-
runReduce
public <U> scala.concurrent.Future<U> runReduce(scala.Function2<U,U,U> f, Materializer materializer)Shortcut for running thisSourcewith a reduce function. The given function is invoked for every received element, giving it its previous output (from the second element) and the element as input. The returnedFuturewill be completed with value of the final function evaluation when the input stream ends, or completed withFailureif there is a failure signaled in the stream.If the stream is empty (i.e. completes before signalling any elements), the reduce operator will fail its downstream with a
NoSuchElementException, which is semantically in-line with that Scala's standard library collections do in such situations.- Parameters:
f- (undocumented)materializer- (undocumented)- Returns:
- (undocumented)
-
runForeach
public scala.concurrent.Future<Done> runForeach(scala.Function1<Out,scala.runtime.BoxedUnit> f, Materializer materializer)
Shortcut for running thisSourcewith a foreach procedure. The given procedure is invoked for each received element. The returnedFuturewill be completed withSuccesswhen reaching the normal end of the stream, or completed withFailureif there is a failure signaled in the stream.- Parameters:
f- (undocumented)materializer- (undocumented)- Returns:
- (undocumented)
-
withAttributes
public Source<Out,Mat> withAttributes(Attributes attr)
Replace the attributes of thisSourcewith the given ones. If this Source is a composite of multiple graphs, new attributes on the composite will be less specific than attributes set directly on the individual graphs of the composite.- Specified by:
withAttributesin interfaceFlowOps<Out,Mat>- Specified by:
withAttributesin interfaceGraph<Out,Mat>- Parameters:
attr- (undocumented)- Returns:
- (undocumented)
-
addAttributes
public Source<Out,Mat> addAttributes(Attributes attr)
Add the given attributes to this Source. If the specific attribute was already on this source it will replace the previous value. If this Source is a composite of multiple graphs, the added attributes will be on the composite and therefore less specific than attributes set directly on the individual graphs of the composite.- Specified by:
addAttributesin interfaceFlowOps<Out,Mat>- Specified by:
addAttributesin interfaceGraph<Out,Mat>- Parameters:
attr- (undocumented)- Returns:
- (undocumented)
-
async
public Source<Out,Mat> async(java.lang.String dispatcher)
Put an asynchronous boundary around thisGraph
-
async
public Source<Out,Mat> async(java.lang.String dispatcher, int inputBufferSize)
Put an asynchronous boundary around thisGraph
-
asJava
public Source<Out,Mat> asJava()
Converts this Scala DSL element to it's Java DSL counterpart.- Returns:
- (undocumented)
-
combine
public <T,U> Source<U,NotUsed> combine(Source<T,?> first, Source<T,?> second, scala.collection.Seq<Source<T,?>> rest, scala.Function1<java.lang.Object,Graph<UniformFanInShape<T,U>,NotUsed>> strategy)
Deprecated.Use `Source.combine` on companion object instead. Since 2.5.5.Combines several sources with fan-in strategy likeMergeorConcatand returnsSource.- Parameters:
first- (undocumented)second- (undocumented)rest- (undocumented)strategy- (undocumented)- Returns:
- (undocumented)
-
asSourceWithContext
public <Ctx> SourceWithContext<Out,Ctx,Mat> asSourceWithContext(scala.Function1<Out,Ctx> f)
API MAY CHANGE- Parameters:
f- (undocumented)- Returns:
- (undocumented)
-
-