Class Flow<In,Out,Mat>
- java.lang.Object
-
- akka.stream.scaladsl.Flow<In,Out,Mat>
-
- All Implemented Interfaces:
Graph<FlowShape<In,Out>,Mat>
,FlowOps<Out,Mat>
,FlowOpsMat<Out,Mat>
public final class Flow<In,Out,Mat> extends java.lang.Object implements FlowOpsMat<Out,Mat>, Graph<FlowShape<In,Out>,Mat>
AFlow
is a set of stream processing steps that has one open input and one open output.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface akka.stream.Graph
Graph.GraphMapMatVal<S extends Shape,M>
-
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description Flow<In,Out,Mat>
addAttributes(Attributes attr)
Add the given attributes to thisFlow
.static <T> Flow<T,T,NotUsed>
apply()
Returns aFlow
which outputs all its inputs.<U,CtxU,CtxOut>
FlowWithContext<U,CtxU,Out,CtxOut,Mat>asFlowWithContext(scala.Function2<U,CtxU,In> collapseContext, scala.Function1<Out,CtxOut> extractContext)
Turns a Flow into a FlowWithContext which manages a context per element along a stream.<JIn extends In>
Flow<JIn,Out,Mat>asJava()
Converts this Scala DSL element to it's Java DSL counterpart.Flow<In,Out,Mat>
async()
Put an asynchronous boundary around thisFlow
Flow<In,Out,Mat>
async(java.lang.String dispatcher)
Put an asynchronous boundary around thisFlow
Flow<In,Out,Mat>
async(java.lang.String dispatcher, int inputBufferSize)
Put an asynchronous boundary around thisFlow
static <A,B>
Flow<A,B,NotUsed>fromFunction(scala.Function1<A,B> f)
Creates a [Flow] which will use the given function to transform its inputs to outputs.static <I,O,M>
Flow<I,O,M>fromGraph(Graph<FlowShape<I,O>,M> g)
A graph with the shape of a flow logically is a flow, this method makes it so also in type.static <T,U,M>
Flow<T,U,scala.concurrent.Future<M>>fromMaterializer(scala.Function2<Materializer,Attributes,Flow<T,U,M>> factory)
Defers the creation of aFlow
until materialization.static <I,O>
Flow<I,O,NotUsed>fromProcessor(scala.Function0<org.reactivestreams.Processor<I,O>> processorFactory)
Creates a Flow from a Reactive StreamsProcessor
static <I,O,M>
Flow<I,O,M>fromProcessorMat(scala.Function0<scala.Tuple2<org.reactivestreams.Processor<I,O>,M>> processorFactory)
Creates a Flow from a Reactive StreamsProcessor
and returns a materialized value.static <I,O>
Flow<I,O,NotUsed>fromSinkAndSource(Graph<SinkShape<I>,?> sink, Graph<SourceShape<O>,?> source)
Creates aFlow
from aSink
and aSource
where the Flow's input will be sent to the Sink and the Flow's output will come from the Source.static <I,O>
Flow<I,O,NotUsed>fromSinkAndSourceCoupled(Graph<SinkShape<I>,?> sink, Graph<SourceShape<O>,?> source)
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.static <I,O,M1,M2,M>
Flow<I,O,M>fromSinkAndSourceCoupledMat(Graph<SinkShape<I>,M1> sink, Graph<SourceShape<O>,M2> source, scala.Function2<M1,M2,M> combine)
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.static <I,O,M1,M2,M>
Flow<I,O,M>fromSinkAndSourceMat(Graph<SinkShape<I>,M1> sink, Graph<SourceShape<O>,M2> source, scala.Function2<M1,M2,M> combine)
Creates aFlow
from aSink
and aSource
where the Flow's input will be sent to the Sink and the Flow's output will come from the Source.static <I,O,M>
Flow<I,O,scala.concurrent.Future<M>>futureFlow(scala.concurrent.Future<Flow<I,O,M>> flow)
Turn aFuture[Flow]
into a flow that will consume the values of the source when the future completes successfully.<I2,O1,Mat2>
Flow<I2,O1,Mat>join(Graph<BidiShape<Out,O1,I2,In>,Mat2> bidi)
<Mat2> RunnableGraph<Mat>
join(Graph<FlowShape<Out,In>,Mat2> flow)
Join thisFlow
to anotherFlow
, by cross connecting the inputs and outputs, creating aRunnableGraph
.<I2,O1,Mat2,M>
Flow<I2,O1,M>joinMat(Graph<BidiShape<Out,O1,I2,In>,Mat2> bidi, scala.Function2<Mat,Mat2,M> combine)
<Mat2,Mat3>
RunnableGraph<Mat3>joinMat(Graph<FlowShape<Out,In>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Join thisFlow
to anotherFlow
, by cross connecting the inputs and outputs, creating aRunnableGraph
static <I,O,M>
Flow<I,O,scala.concurrent.Future<M>>lazyFlow(scala.Function0<Flow<I,O,M>> create)
Defers invoking thecreate
function to create a future flow until there is downstream demand and passing that downstream demand upstream triggers the first element.static <I,O,M>
Flow<I,O,scala.concurrent.Future<M>>lazyFutureFlow(scala.Function0<scala.concurrent.Future<Flow<I,O,M>>> create)
Defers invoking thecreate
function to create a future flow until there downstream demand has caused upstream to send a first element.static <I,O,M>
Flow<I,O,M>lazyInit(scala.Function1<I,scala.concurrent.Future<Flow<I,O,M>>> flowFactory, scala.Function0<M> fallback)
Deprecated.Use 'Flow.futureFlow' in combination with prefixAndTail(1) instead, see `futureFlow` operator docs for details.static <I,O,M>
Flow<I,O,scala.concurrent.Future<scala.Option<M>>>lazyInitAsync(scala.Function0<scala.concurrent.Future<Flow<I,O,M>>> flowFactory)
Deprecated.Use 'Flow.lazyFutureFlow' instead.<Mat2> Flow<In,Out,Mat2>
mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform the materialized value of this Flow, leaving all other properties as they were.Flow<In,Out,Mat>
named(java.lang.String name)
Add aname
attribute to this Flow.
scala.Tuple2<Mat,Flow<In,Out,NotUsed>>
preMaterialize(Materializer materializer)
<Mat1,Mat2>
scala.Tuple2<Mat1,Mat2>runWith(Graph<SourceShape<In>,Mat1> source, Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Connect theSource
to thisFlow
and then connect it to theSink
and run it.static <T,U,M>
Flow<T,U,scala.concurrent.Future<M>>setup(scala.Function2<ActorMaterializer,Attributes,Flow<T,U,M>> factory)
Deprecated.Use 'fromMaterializer' instead.FlowShape<In,Out>
shape()
The shape of a graph is all that is externally visible: its inlets and outlets.<Mat2> Sink<In,Mat>
to(Graph<SinkShape<Out>,Mat2> sink)
<Mat2,Mat3>
Sink<In,Mat3>toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
RunnableGraph<org.reactivestreams.Processor<In,Out>>
toProcessor()
Converts this Flow to aRunnableGraph
that materializes to a Reactive StreamsProcessor
which implements the operations encapsulated by this Flow.java.lang.String
toString()
akka.stream.impl.LinearTraversalBuilder
traversalBuilder()
INTERNAL API.<T,Mat2>
Flow<In,T,Mat>via(Graph<FlowShape<Out,T>,Mat2> flow)
<T,Mat2,Mat3>
Flow<In,T,Mat3>viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Transform thisFlow
by appending the given processing steps.Flow<In,Out,Mat>
withAttributes(Attributes attr)
Replace the attributes of thisFlow
with the given ones.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface akka.stream.scaladsl.FlowOps
$plus$plus, aggregateWithBoundary, alsoTo, alsoToAll, alsoToGraph, ask, ask, backpressureTimeout, batch, batchWeighted, buffer, collect, collectType, completionTimeout, concat, concatAllLazy, concatGraph, concatLazy, conflate, conflateWithSeed, delay, delay$default$2, delayWith, detach, divertTo, divertToGraph, drop, dropWhile, dropWithin, expand, extrapolate, extrapolate$default$2, filter, filterNot, flatMapConcat, flatMapMerge, flatMapPrefix, fold, foldAsync, groupBy, groupBy, grouped, groupedWeighted, groupedWeightedWithin, groupedWeightedWithin, groupedWithin, idleTimeout, initialDelay, initialTimeout, interleave, interleave, interleaveAll, interleaveGraph, interleaveGraph$default$3, internalConcat, internalConcatAll, intersperse, intersperse, keepAlive, limit, limitWeighted, log, log$default$2, log$default$3, logWithMarker, logWithMarker$default$3, logWithMarker$default$4, map, mapAsync, mapAsyncPartitioned, mapAsyncUnordered, mapConcat, mapError, mapWithResource, merge, merge$default$2, mergeAll, mergeGraph, mergeLatest, mergeLatest$default$2, mergeLatestGraph, mergePreferred, mergePreferred$default$3, mergePreferredGraph, mergePrioritized, mergePrioritized$default$4, mergePrioritizedGraph, mergeSorted, mergeSortedGraph, orElse, orElseGraph, prefixAndTail, prepend, prependGraph, prependLazy, recover, recoverWith, recoverWithRetries, reduce, scan, scanAsync, sliding, sliding$default$2, splitAfter, splitAfter, splitWhen, splitWhen, statefulMap, statefulMapConcat, take, takeWhile, takeWhile, takeWithin, throttle, throttle, throttle, throttle, watch, wireTap, wireTap, wireTapGraph, zip, zipAll, zipAllFlow, zipGraph, zipLatest, zipLatestGraph, zipLatestWith, zipLatestWith, zipLatestWithGraph, zipLatestWithGraph, zipWith, zipWithGraph, zipWithIndex
-
Methods inherited from interface akka.stream.scaladsl.FlowOpsMat
alsoToMat, concatLazyMat, concatMat, divertToMat, flatMapPrefixMat, interleaveMat, interleaveMat, mergeLatestMat, mergeMat, mergeMat$default$2, mergePreferredMat, mergePrioritizedMat, mergeSortedMat, monitor, monitorMat, orElseMat, prependLazyMat, prependMat, watchTermination, wireTapMat, zipAllMat, zipLatestMat, zipLatestWithMat, zipLatestWithMat, zipMat, zipWithMat
-
Methods inherited from interface akka.stream.Graph
getAttributes
-
-
-
-
Method Detail
-
fromProcessor
public static <I,O> Flow<I,O,NotUsed> fromProcessor(scala.Function0<org.reactivestreams.Processor<I,O>> processorFactory)
Creates a Flow from a Reactive StreamsProcessor
-
fromProcessorMat
public static <I,O,M> Flow<I,O,M> fromProcessorMat(scala.Function0<scala.Tuple2<org.reactivestreams.Processor<I,O>,M>> processorFactory)
Creates a Flow from a Reactive StreamsProcessor
and returns a materialized value.
-
fromFunction
public static <A,B> Flow<A,B,NotUsed> fromFunction(scala.Function1<A,B> f)
Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent toFlow[T].map(f)
-
fromGraph
public static <I,O,M> Flow<I,O,M> fromGraph(Graph<FlowShape<I,O>,M> g)
A graph with the shape of a flow logically is a flow, this method makes it so also in type.
-
fromMaterializer
public static <T,U,M> Flow<T,U,scala.concurrent.Future<M>> fromMaterializer(scala.Function2<Materializer,Attributes,Flow<T,U,M>> factory)
-
setup
public static <T,U,M> Flow<T,U,scala.concurrent.Future<M>> setup(scala.Function2<ActorMaterializer,Attributes,Flow<T,U,M>> factory)
Deprecated.Use 'fromMaterializer' instead. Since 2.6.0.
-
fromSinkAndSource
public static <I,O> Flow<I,O,NotUsed> fromSinkAndSource(Graph<SinkShape<I>,?> sink, Graph<SourceShape<O>,?> source)
Creates aFlow
from aSink
and aSource
where the Flow's input will be sent to the Sink and the Flow's output will come from the Source.The resulting flow can be visualized as:
+----------------------------------------------+ | Resulting Flow[I, O, NotUsed] | | | | +---------+ +-----------+ | | | | | | | I ~~> | Sink[I] | [no-connection!] | Source[O] | ~~> O | | | | | | | +---------+ +-----------+ | +----------------------------------------------+
The completion of the Sink and Source sides of a Flow constructed using this method are independent. So if the Sink receives a completion signal, the Source side will remain unaware of that. If you are looking to couple the termination signals of the two sides use
Flow.fromSinkAndSourceCoupled
instead.See also
<I,O,M1,M2,M>fromSinkAndSourceMat(akka.stream.Graph<akka.stream.SinkShape<I>,M1>,akka.stream.Graph<akka.stream.SourceShape<O>,M2>,scala.Function2<M1,M2,M>)
when access to materialized values of the parameters is needed.
-
fromSinkAndSourceMat
public static <I,O,M1,M2,M> Flow<I,O,M> fromSinkAndSourceMat(Graph<SinkShape<I>,M1> sink, Graph<SourceShape<O>,M2> source, scala.Function2<M1,M2,M> combine)
Creates aFlow
from aSink
and aSource
where the Flow's input will be sent to the Sink and the Flow's output will come from the Source.The resulting flow can be visualized as:
+-------------------------------------------------------+ | Resulting Flow[I, O, M] | | | | +-------------+ +---------------+ | | | | | | | I ~~> | Sink[I, M1] | [no-connection!] | Source[O, M2] | ~~> O | | | | | | | +-------------+ +---------------+ | +------------------------------------------------------+
The completion of the Sink and Source sides of a Flow constructed using this method are independent. So if the Sink receives a completion signal, the Source side will remain unaware of that. If you are looking to couple the termination signals of the two sides use
Flow.fromSinkAndSourceCoupledMat
instead.The
combine
function is used to compose the materialized values of thesink
andsource
into the materialized value of the resultingFlow
.
-
fromSinkAndSourceCoupled
public static <I,O> Flow<I,O,NotUsed> fromSinkAndSourceCoupled(Graph<SinkShape<I>,?> sink, Graph<SourceShape<O>,?> source)
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. Similar toFlow.fromSinkAndSource
however couples the termination of these two operators.The resulting flow can be visualized as:
+---------------------------------------------+ | Resulting Flow[I, O, NotUsed] | | | | +---------+ +-----------+ | | | | | | | I ~~> | Sink[I] | ~~~(coupled)~~~ | Source[O] | ~~> O | | | | | | | +---------+ +-----------+ | +---------------------------------------------+
E.g. if the emitted
Flow
gets a cancellation, theSource
of course is cancelled, however the Sink will also be completed. The table below illustrates the effects in detail:Returned Flow Sink ( in
)Source ( out
)cause: upstream (sink-side) receives completion effect: receives completion effect: receives cancel cause: upstream (sink-side) receives error effect: receives error effect: receives cancel cause: downstream (source-side) receives cancel effect: completes effect: receives cancel effect: cancels upstream, completes downstream effect: completes cause: signals complete effect: cancels upstream, errors downstream effect: receives error cause: signals error or throws effect: cancels upstream, completes downstream cause: cancels effect: receives cancel See also
<I,O,M1,M2,M>fromSinkAndSourceCoupledMat(akka.stream.Graph<akka.stream.SinkShape<I>,M1>,akka.stream.Graph<akka.stream.SourceShape<O>,M2>,scala.Function2<M1,M2,M>)
when access to materialized values of the parameters is needed.
-
fromSinkAndSourceCoupledMat
public static <I,O,M1,M2,M> Flow<I,O,M> fromSinkAndSourceCoupledMat(Graph<SinkShape<I>,M1> sink, Graph<SourceShape<O>,M2> source, scala.Function2<M1,M2,M> combine)
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. Similar toFlow.fromSinkAndSource
however couples the termination of these two operators.The resulting flow can be visualized as:
+-----------------------------------------------------+ | Resulting Flow[I, O, M] | | | | +-------------+ +---------------+ | | | | | | | I ~~> | Sink[I, M1] | ~~~(coupled)~~~ | Source[O, M2] | ~~> O | | | | | | | +-------------+ +---------------+ | +-----------------------------------------------------+
E.g. if the emitted
Flow
gets a cancellation, theSource
of course is cancelled, however the Sink will also be completed. The table onFlow.fromSinkAndSourceCoupled
illustrates the effects in detail.The
combine
function is used to compose the materialized values of thesink
andsource
into the materialized value of the resultingFlow
.
-
lazyInit
public static <I,O,M> Flow<I,O,M> lazyInit(scala.Function1<I,scala.concurrent.Future<Flow<I,O,M>>> flowFactory, scala.Function0<M> fallback)
Deprecated.Use 'Flow.futureFlow' in combination with prefixAndTail(1) instead, see `futureFlow` operator docs for details. Since 2.6.0.Creates a realFlow
upon receiving the first element. InternalFlow
will not be created if there are no elements, because of completion, cancellation, or error.The materialized value of the
Flow
is the value that is created by thefallback
function.'''Emits when''' the internal flow is successfully created and it emits
'''Backpressures when''' the internal flow is successfully created and it backpressures
'''Completes when''' upstream completes and all elements have been emitted from the internal flow
'''Cancels when''' downstream cancels (see below)
The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behaviour can be controlled by setting the
akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested
attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
-
lazyInitAsync
public static <I,O,M> Flow<I,O,scala.concurrent.Future<scala.Option<M>>> lazyInitAsync(scala.Function0<scala.concurrent.Future<Flow<I,O,M>>> flowFactory)
Deprecated.Use 'Flow.lazyFutureFlow' instead. Since 2.6.0.Creates a realFlow
upon receiving the first element. InternalFlow
will not be created if there are no elements, because of completion, cancellation, or error.The materialized value of the
Flow
is aFuture[Option[M}
that is completed withSome(mat)
when the internal flow gets materialized or withNone
when there where no elements. If the flow materialization (including the call of theflowFactory
) fails then the future is completed with a failure.'''Emits when''' the internal flow is successfully created and it emits
'''Backpressures when''' the internal flow is successfully created and it backpressures
'''Completes when''' upstream completes and all elements have been emitted from the internal flow
'''Cancels when''' downstream cancels (see below)
The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behaviour can be controlled by setting the
akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested
attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
-
futureFlow
public static <I,O,M> Flow<I,O,scala.concurrent.Future<M>> futureFlow(scala.concurrent.Future<Flow<I,O,M>> flow)
Turn aFuture[Flow]
into a flow that will consume the values of the source when the future completes successfully. If theFuture
is completed with a failure the stream is failed.The materialized future value is completed with the materialized value of the future flow or failed with a
NeverMaterializedException
if upstream fails or downstream cancels before the future has completed.The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behaviour can be controlled by setting the
akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested
attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
-
lazyFlow
public static <I,O,M> Flow<I,O,scala.concurrent.Future<M>> lazyFlow(scala.Function0<Flow<I,O,M>> create)
Defers invoking thecreate
function to create a future flow until there is downstream demand and passing that downstream demand upstream triggers the first element.The materialized future value is completed with the materialized value of the created flow when that has successfully been materialized.
If the
create
function throws or returns a future that fails the stream is failed, in this case the materialized future value is failed with aNeverMaterializedException
.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and can trigger the factory earlier than expected.
'''Emits when''' the internal flow is successfully created and it emits
'''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures
'''Completes when''' upstream completes and all elements have been emitted from the internal flow
'''Cancels when''' downstream cancels (see below)
The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behaviour can be controlled by setting the
akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested
attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
-
lazyFutureFlow
public static <I,O,M> Flow<I,O,scala.concurrent.Future<M>> lazyFutureFlow(scala.Function0<scala.concurrent.Future<Flow<I,O,M>>> create)
Defers invoking thecreate
function to create a future flow until there downstream demand has caused upstream to send a first element.The materialized future value is completed with the materialized value of the created flow when that has successfully been materialized.
If the
create
function throws or returns a future that fails the stream is failed, in this case the materialized future value is failed with aNeverMaterializedException
.Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts the laziness and can trigger the factory earlier than expected.
'''Emits when''' the internal flow is successfully created and it emits
'''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures
'''Completes when''' upstream completes and all elements have been emitted from the internal flow
'''Cancels when''' downstream cancels (see below)
The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. This behaviour can be controlled by setting the
akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested
attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
-
traversalBuilder
public akka.stream.impl.LinearTraversalBuilder traversalBuilder()
Description copied from interface:Graph
INTERNAL API.Every materializable element must be backed by a stream layout module
- Specified by:
traversalBuilder
in interfaceGraph<In,Out>
-
shape
public FlowShape<In,Out> shape()
Description copied from interface:Graph
The shape of a graph is all that is externally visible: its inlets and outlets.
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
viaMat
public <T,Mat2,Mat3> Flow<In,T,Mat3> viaMat(Graph<FlowShape<Out,T>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Description copied from interface:FlowOpsMat
Transform thisFlow
by appending the given processing steps.
The+---------------------------------+ | Resulting Flow[In, T, M2] | | | | +------+ +------+ | | | | | | | In ~~> | this | ~~Out~~> | flow | ~~> T | | Mat| | M| | | +------+ +------+ | +---------------------------------+
combine
function is used to compose the materialized values of this flow and that flow into the materialized value of the resulting Flow.It is recommended to use the internally optimized
Keep.left
andKeep.right
combiners where appropriate instead of manually writing functions that pass through one of the values.- Specified by:
viaMat
in interfaceFlowOpsMat<In,Out>
-
to
public <Mat2> Sink<In,Mat> to(Graph<SinkShape<Out>,Mat2> sink)
Connect thisFlow
to aSink
, concatenating the processing steps of both.
The materialized value of the combined+------------------------------+ | Resulting Sink[In, Mat] | | | | +------+ +------+ | | | | | | | In ~~> | flow | ~~Out~~> | sink | | | | Mat| | M| | | +------+ +------+ | +------------------------------+
Sink
will be the materialized value of the current flow (ignoring the given Sink’s value), use {@link Flow#toMat[Mat2* toMat} if a different strategy is needed.See also
<Mat2,Mat3>toMat(akka.stream.Graph<akka.stream.SinkShape<Out>,Mat2>,scala.Function2<Mat,Mat2,Mat3>)
when access to materialized values of the parameter is needed.
-
toMat
public <Mat2,Mat3> Sink<In,Mat3> toMat(Graph<SinkShape<Out>,Mat2> sink, scala.Function2<Mat,Mat2,Mat3> combine)
Connect thisFlow
to aSink
, concatenating the processing steps of both.
The+----------------------------+ | Resulting Sink[In, M2] | | | | +------+ +------+ | | | | | | | In ~~> | flow | ~Out~> | sink | | | | Mat| | M| | | +------+ +------+ | +----------------------------+
combine
function is used to compose the materialized values of this flow and that Sink into the materialized value of the resulting Sink.It is recommended to use the internally optimized
Keep.left
andKeep.right
combiners where appropriate instead of manually writing functions that pass through one of the values.- Specified by:
toMat
in interfaceFlowOpsMat<In,Out>
-
mapMaterializedValue
public <Mat2> Flow<In,Out,Mat2> mapMaterializedValue(scala.Function1<Mat,Mat2> f)
Transform the materialized value of this Flow, leaving all other properties as they were.- Specified by:
mapMaterializedValue
in interfaceFlowOpsMat<In,Out>
-
preMaterialize
public scala.Tuple2<Mat,Flow<In,Out,NotUsed>> preMaterialize(Materializer materializer)
-
join
public <Mat2> RunnableGraph<Mat> join(Graph<FlowShape<Out,In>,Mat2> flow)
Join thisFlow
to anotherFlow
, by cross connecting the inputs and outputs, creating aRunnableGraph
.
The materialized value of the combined+------+ +-------+ | | ~Out~> | | | this | | other | | | <~In~ | | +------+ +-------+
Flow
will be the materialized value of the current flow (ignoring the other Flow’s value), use {@link Flow#joinMat[Mat2* joinMat} if a different strategy is needed.
-
joinMat
public <Mat2,Mat3> RunnableGraph<Mat3> joinMat(Graph<FlowShape<Out,In>,Mat2> flow, scala.Function2<Mat,Mat2,Mat3> combine)
Join thisFlow
to anotherFlow
, by cross connecting the inputs and outputs, creating aRunnableGraph
The+------+ +-------+ | | ~Out~> | | | this | | other | | | <~In~ | | +------+ +-------+
combine
function is used to compose the materialized values of this flow and that Flow into the materialized value of the resulting Flow.It is recommended to use the internally optimized
Keep.left
andKeep.right
combiners where appropriate instead of manually writing functions that pass through one of the values.
-
join
public <I2,O1,Mat2> Flow<I2,O1,Mat> join(Graph<BidiShape<Out,O1,I2,In>,Mat2> bidi)
Join thisFlow
to aBidiFlow
to close off the “top” of the protocol stack:
The materialized value of the combined+---------------------------+ | Resulting Flow | | | | +------+ +------+ | | | | ~Out~> | | ~~> O1 | | flow | | bidi | | | | | <~In~ | | <~~ I2 | +------+ +------+ | +---------------------------+
Flow
will be the materialized value of the current flow (ignoring theBidiFlow
’s value), use {@link Flow#joinMat[I2* joinMat} if a different strategy is needed.
-
joinMat
public <I2,O1,Mat2,M> Flow<I2,O1,M> joinMat(Graph<BidiShape<Out,O1,I2,In>,Mat2> bidi, scala.Function2<Mat,Mat2,M> combine)
Join thisFlow
to aBidiFlow
to close off the “top” of the protocol stack:
The+---------------------------+ | Resulting Flow | | | | +------+ +------+ | | | | ~Out~> | | ~~> O1 | | flow | | bidi | | | | | <~In~ | | <~~ I2 | +------+ +------+ | +---------------------------+
combine
function is used to compose the materialized values of this flow and thatBidiFlow
into the materialized value of the resultingFlow
.It is recommended to use the internally optimized
Keep.left
andKeep.right
combiners where appropriate instead of manually writing functions that pass through one of the values.
-
withAttributes
public Flow<In,Out,Mat> withAttributes(Attributes attr)
Replace the attributes of thisFlow
with the given ones. If this Flow is a composite of multiple graphs, new attributes on the composite will be less specific than attributes set directly on the individual graphs of the composite.Note that this operation has no effect on an empty Flow (because the attributes apply only to the contained processing operators).
- Specified by:
withAttributes
in interfaceFlowOps<In,Out>
- Specified by:
withAttributes
in interfaceGraph<In,Out>
-
addAttributes
public Flow<In,Out,Mat> addAttributes(Attributes attr)
Add the given attributes to thisFlow
. If the specific attribute was already present on this graph this means the added attribute will be more specific than the existing one. If this Flow is a composite of multiple graphs, new attributes on the composite will be less specific than attributes set directly on the individual graphs of the composite.- Specified by:
addAttributes
in interfaceFlowOps<In,Out>
- Specified by:
addAttributes
in interfaceGraph<In,Out>
-
async
public Flow<In,Out,Mat> async(java.lang.String dispatcher)
Put an asynchronous boundary around thisFlow
-
async
public Flow<In,Out,Mat> async(java.lang.String dispatcher, int inputBufferSize)
Put an asynchronous boundary around thisFlow
-
runWith
public <Mat1,Mat2> scala.Tuple2<Mat1,Mat2> runWith(Graph<SourceShape<In>,Mat1> source, Graph<SinkShape<Out>,Mat2> sink, Materializer materializer)
Connect theSource
to thisFlow
and then connect it to theSink
and run it. The returned tuple contains the materialized values of theSource
andSink
, e.g. theSubscriber
of a of aSource#subscriber
and andPublisher
of aSink#publisher
.Note that the
ActorSystem
can be used as the implicitmaterializer
parameter to use theSystemMaterializer
for running the stream.
-
toProcessor
public RunnableGraph<org.reactivestreams.Processor<In,Out>> toProcessor()
Converts this Flow to aRunnableGraph
that materializes to a Reactive StreamsProcessor
which implements the operations encapsulated by this Flow. Every materialization results in a new Processor instance, i.e. the returnedRunnableGraph
is reusable.- Returns:
- A
RunnableGraph
that materializes to a Processor when run() is called on it.
-
asFlowWithContext
public <U,CtxU,CtxOut> FlowWithContext<U,CtxU,Out,CtxOut,Mat> asFlowWithContext(scala.Function2<U,CtxU,In> collapseContext, scala.Function1<Out,CtxOut> extractContext)
Turns a Flow into a FlowWithContext which manages a context per element along a stream.- Parameters:
collapseContext
- turn each incoming pair of element and context value into an element of this FlowextractContext
- turn each outgoing element of this Flow into an outgoing context value
-
-