public abstract class PushPullStage<In,Out> extends AbstractStage<In,Out,SyncDirective,SyncDirective,Context<Out>,LifecycleContext>
PushPullStage
implementations participate in 1-bounded regions. For every external non-completion signal these
stages produce *exactly one* push or pull signal.
AbstractStage.onPush(In, Ctx)
is called when an element from upstream is available and there is demand from downstream, i.e.
in onPush
you are allowed to call Context.push(Out)
to emit one element downstream, or you can absorb the
element by calling Context.pull()
. Note that you can only emit zero or one element downstream from onPull
.
To emit more than one element you have to push the remaining elements from AbstractStage.onPull(Ctx)
, one-by-one.
onPush
is not called again until onPull
has requested more elements with Context.pull()
.
StatefulStage
has support for making it easy to emit more than one element from onPush
.
AbstractStage.onPull(Ctx)
is called when there is demand from downstream, i.e. you are allowed to push one element
downstream with Context.push(Out)
, or request elements from upstreams with Context.pull()
. If you
always perform transitive pull by calling ctx.pull
from onPull
you can use PushStage
instead of
PushPullStage
.
Stages are allowed to do early completion of downstream and cancel of upstream. This is done with Context.finish()
,
which is a combination of cancel/complete.
Since onComplete is not a backpressured signal it is sometimes preferable to push a final element and then
immediately finish. This combination is exposed as Context.pushAndFinish(Out)
which enables stages to
propagate completion events without waiting for an extra round of pull.
Another peculiarity is how to convert termination events (complete/failure) into elements. The problem
here is that the termination events are not backpressured while elements are. This means that simply calling
Context.push(Out)
as a response to AbstractStage.onUpstreamFinish(Ctx)
or AbstractStage.onUpstreamFailure(java.lang.Throwable, Ctx)
will very likely break boundedness
and result in a buffer overflow somewhere. Therefore the only allowed command in this case is
Context.absorbTermination()
which stops the propagation of the termination signal, and puts the stage in a
Context.isFinishing()
state. Depending on whether the stage has a pending pull signal it
has not yet "consumed" by a push its AbstractStage.onPull(Ctx)
handler might be called immediately or later. From
AbstractStage.onPull(Ctx)
final elements can be pushed before completing downstream with Context.finish()
or
Context.pushAndFinish(Out)
.
StatefulStage
has support for making it easy to emit final elements.
All these rules are enforced by types and runtime checks where needed. Always return the Directive
from the call to the Context
method, and do only call Context
commands once per callback.
DetachedStage
,
StatefulStage
,
PushStage
AbstractStage.PushPullGraphStage<In,Out,Ext>, AbstractStage.PushPullGraphStageWithMaterializedValue<In,Out,Ext,Mat>
Constructor and Description |
---|
PushPullStage()
Deprecated.
|
decide, isDetached, onDownstreamFinish, onPull, onPush, onUpstreamFailure, onUpstreamFinish, postStop, preStart, restart