Class GraphInterpreter
- java.lang.Object
-
- akka.stream.impl.fusing.GraphInterpreter
-
public final class GraphInterpreter extends java.lang.Object
INTERNAL APIFrom an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
GraphInterpreter#GraphAssembly
object and provides facilities to execute and interact with this assembly. The lifecycle of the Interpreter is roughly the following: -init(akka.stream.Materializer)
is called -execute(int)
is called whenever there is need for execution, providing an upper limit on the processed events -finish()
is called before the interpreter is disposed, preferably afterisCompleted()
returned true, although in abort cases this is not strictly necessaryThe
execute(int)
method of the interpreter accepts an upper bound on the events it will process. After this limit is reached or there are no more pending events to be processed, the call returns. It is possible to inspect if there are unprocessed events left via theisSuspended()
method.isCompleted()
returns true once all operators reported completion inside the interpreter.The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations on the hot paths.
One of the basic abstractions inside the interpreter is the
GraphInterpreter.Connection
. A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair). The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion or errors across the Connection. In particular - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this connection. This bitfield is used to decode the event that is in-flight. - connectionSlot contains a potential element or exception that accompanies the event encoded in the portStates bitfield - inHandler contains theInHandler
instance that handles the events corresponding to the input port of the connection - outHandler contains theOutHandler
instance that handles the events corresponding to the output port of the connectionOn top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue contains the Connections that have pending events to be processed. The pending event itself is encoded in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to be decoded accordingly.
The layout of the portState bitfield is the following:
|- state machn.-| Only one bit is hot among these bits 64 32 16 | 8 4 2 1 | +---+---+---|---+---+---+---| | | | | | | | | | | | | | | From the following flags only one is active in any given time. These bits encode | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits | | | | | | | intact. | | | | | | | | | | | | | +- InReady: The input port is ready to be pulled | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued) | | | | +--------- Pushing: A push is active, but have not arrived yet (queued) | | | +------------- OutReady: The output port is ready to be pushed | | | | | +----------------- InClosed: The input port is closed and will not receive any events. | | A push might be still in flight which will be then processed first. | +--------------------- OutClosed: The output port is closed and will not receive any events. +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event is a failure
Sending an event is usually the following sequence: - An action is requested by an operator logic (push, pull, complete, etc.) - the state machine in portStates is transitioned from a ready state to a pending event - the affected Connection is enqueued
Receiving an event is usually the following sequence: - the Connection to be processed is dequeued - the type of the event is determined from the bits set on portStates - the state machine in portStates is transitioned to a ready state - using the inHandlers/outHandlers table the corresponding callback is called on the operator logic.
Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed after a bounded number of other events. This property, together with suspendability means that even infinite cycles can be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle edge of a balance is pulled, dissolving the original cycle).
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
GraphInterpreter.Cancelled
Marker class that indicates that a port was cancelled with a given causestatic class
GraphInterpreter.Cancelled$
static class
GraphInterpreter.Connection
INERNAL APIstatic class
GraphInterpreter.DownstreamBoundaryStageLogic<T>
static class
GraphInterpreter.Empty$
Marker object that indicates that a port holds no element since it was already grabbed.static class
GraphInterpreter.Failed
Marker class that indicates that a port was failed with a given cause and a potential outstanding elementstatic class
GraphInterpreter.Failed$
static class
GraphInterpreter.UpstreamBoundaryStageLogic<T>
-
Constructor Summary
Constructors Constructor Description GraphInterpreter(Materializer materializer, LoggingAdapter log, GraphStageLogic[] logics, GraphInterpreter.Connection[] connections, scala.Function4<GraphStageLogic,java.lang.Object,scala.concurrent.Promise<Done>,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput, boolean fuzzingMode, ActorRef context)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description GraphStageLogic
activeStage()
INTERNAL APIvoid
activeStage_$eq(GraphStageLogic x$1)
void
afterStageHasRun(GraphStageLogic logic)
void
cancel(GraphInterpreter.Connection connection, java.lang.Throwable cause)
void
chasePull(GraphInterpreter.Connection connection)
void
chasePush(GraphInterpreter.Connection connection)
void
complete(GraphInterpreter.Connection connection)
GraphInterpreter.Connection[]
connections()
ActorRef
context()
static GraphInterpreter
currentInterpreter()
INTERNAL APIstatic GraphInterpreter
currentInterpreterOrNull()
INTERNAL APIstatic boolean
Debug()
Compile time constant, enable it for debug logging to the console.void
enqueue(GraphInterpreter.Connection connection)
int
execute(int eventLimit)
Executes pending events until the given limit is met.void
fail(GraphInterpreter.Connection connection, java.lang.Throwable ex)
void
finalizeStage(GraphStageLogic logic)
void
finish()
Finalizes the state of all operators by calling postStop() (if necessary).boolean
fuzzingMode()
static int
InClosed()
static int
InFailed()
void
init(Materializer subMat)
Initializes the states of all the operator logics by calling preStart().static int
InReady()
boolean
isCompleted()
Returns true if there are no more running operators and pending events.boolean
isStageCompleted(GraphStageLogic stage)
boolean
isSuspended()
Returns true if there are pending unprocessed events in the event queue.static int
KeepGoingFlag()
static int
KeepGoingMask()
LoggingAdapter
log()
GraphStageLogic[]
logics()
Materializer
materializer()
java.lang.String
Name()
static scala.runtime.Null$
NoEvent()
GraphInterpreter
nonNull()
INTERNAL APIscala.Function4<GraphStageLogic,java.lang.Object,scala.concurrent.Promise<Done>,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit>
onAsyncInput()
static int
OutClosed()
static int
OutReady()
static int
PullEndFlip()
static int
Pulling()
static int
PullStartFlip()
static int
PushEndFlip()
static int
Pushing()
static int
PushStartFlip()
void
runAsyncInput(GraphStageLogic logic, java.lang.Object evt, scala.concurrent.Promise<Done> promise, scala.Function1<java.lang.Object,scala.runtime.BoxedUnit> handler)
void
setHandler(GraphInterpreter.Connection connection, InHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.void
setHandler(GraphInterpreter.Connection connection, OutHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.void
setKeepGoing(GraphStageLogic logic, boolean enabled)
static Attributes[]
singleNoAttribute()
Materializer
subFusingMaterializer()
RunningInterpreter
toSnapshot()
Debug utility to dump the "waits-on" relationships in an AST format for rendering in some suitable format for analysis of deadlocks.
-
-
-
Constructor Detail
-
GraphInterpreter
public GraphInterpreter(Materializer materializer, LoggingAdapter log, GraphStageLogic[] logics, GraphInterpreter.Connection[] connections, scala.Function4<GraphStageLogic,java.lang.Object,scala.concurrent.Promise<Done>,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput, boolean fuzzingMode, ActorRef context)
-
-
Method Detail
-
Debug
public static final boolean Debug()
Compile time constant, enable it for debug logging to the console.
-
NoEvent
public static final scala.runtime.Null$ NoEvent()
-
InReady
public static final int InReady()
-
Pulling
public static final int Pulling()
-
Pushing
public static final int Pushing()
-
OutReady
public static final int OutReady()
-
InClosed
public static final int InClosed()
-
OutClosed
public static final int OutClosed()
-
InFailed
public static final int InFailed()
-
PullStartFlip
public static final int PullStartFlip()
-
PullEndFlip
public static final int PullEndFlip()
-
PushStartFlip
public static final int PushStartFlip()
-
PushEndFlip
public static final int PushEndFlip()
-
KeepGoingFlag
public static final int KeepGoingFlag()
-
KeepGoingMask
public static final int KeepGoingMask()
-
singleNoAttribute
public static Attributes[] singleNoAttribute()
-
currentInterpreter
public static GraphInterpreter currentInterpreter()
INTERNAL API
-
currentInterpreterOrNull
public static GraphInterpreter currentInterpreterOrNull()
INTERNAL API
-
materializer
public Materializer materializer()
-
log
public LoggingAdapter log()
-
logics
public GraphStageLogic[] logics()
-
connections
public GraphInterpreter.Connection[] connections()
-
onAsyncInput
public scala.Function4<GraphStageLogic,java.lang.Object,scala.concurrent.Promise<Done>,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput()
-
fuzzingMode
public boolean fuzzingMode()
-
context
public ActorRef context()
-
activeStage
public GraphStageLogic activeStage()
INTERNAL API
-
activeStage_$eq
public void activeStage_$eq(GraphStageLogic x$1)
-
subFusingMaterializer
public Materializer subFusingMaterializer()
-
Name
public java.lang.String Name()
-
nonNull
public GraphInterpreter nonNull()
INTERNAL API
-
setHandler
public void setHandler(GraphInterpreter.Connection connection, InHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.
-
setHandler
public void setHandler(GraphInterpreter.Connection connection, OutHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.
-
isSuspended
public boolean isSuspended()
Returns true if there are pending unprocessed events in the event queue.
-
isCompleted
public boolean isCompleted()
Returns true if there are no more running operators and pending events.
-
init
public void init(Materializer subMat)
Initializes the states of all the operator logics by calling preStart(). The passed-in materializer is intended to be a SubFusingActorMaterializer that avoids creating new Actors when operators materialize sub-flows. If no such materializer is available, passing innull
will reuse the normal materializer for the GraphInterpreter—fusing is only an optimization.
-
finish
public void finish()
Finalizes the state of all operators by calling postStop() (if necessary).
-
execute
public int execute(int eventLimit)
Executes pending events until the given limit is met. If there were remaining events, isSuspended will return true.
-
runAsyncInput
public void runAsyncInput(GraphStageLogic logic, java.lang.Object evt, scala.concurrent.Promise<Done> promise, scala.Function1<java.lang.Object,scala.runtime.BoxedUnit> handler)
-
enqueue
public void enqueue(GraphInterpreter.Connection connection)
-
afterStageHasRun
public void afterStageHasRun(GraphStageLogic logic)
-
isStageCompleted
public boolean isStageCompleted(GraphStageLogic stage)
-
setKeepGoing
public void setKeepGoing(GraphStageLogic logic, boolean enabled)
-
finalizeStage
public void finalizeStage(GraphStageLogic logic)
-
chasePush
public void chasePush(GraphInterpreter.Connection connection)
-
chasePull
public void chasePull(GraphInterpreter.Connection connection)
-
complete
public void complete(GraphInterpreter.Connection connection)
-
fail
public void fail(GraphInterpreter.Connection connection, java.lang.Throwable ex)
-
cancel
public void cancel(GraphInterpreter.Connection connection, java.lang.Throwable cause)
-
toSnapshot
public RunningInterpreter toSnapshot()
Debug utility to dump the "waits-on" relationships in an AST format for rendering in some suitable format for analysis of deadlocks.Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
-
-