public final class GraphInterpreter
extends java.lang.Object
From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
GraphInterpreter#GraphAssembly
object and provides facilities to execute and interact with this assembly.
The lifecycle of the Interpreter is roughly the following:
- Boundary logics are attached via attachDownstreamBoundary()
and attachUpstreamBoundary()
- init()
is called
- execute()
is called whenever there is need for execution, providing an upper limit on the processed events
- finish()
is called before the interpreter is disposed, preferably after isCompleted
returned true, although
in abort cases this is not strictly necessary
The execute()
method of the interpreter accepts an upper bound on the events it will process. After this limit
is reached or there are no more pending events to be processed, the call returns. It is possible to inspect
if there are unprocessed events left via the isSuspended
method. isCompleted
returns true once all stages
reported completion inside the interpreter.
The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations on the hot paths.
One of the basic abstractions inside the interpreter is the GraphInterpreter.Connection
.
A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair).
The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion
or errors across the Connection.
In particular
- portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this
connection. This bitfield is used to decode the event that is in-flight.
- connectionSlot contains a potential element or exception that accompanies the
event encoded in the portStates bitfield
- inHandler contains the InHandler
instance that handles the events corresponding
to the input port of the connection
- outHandler contains the OutHandler
instance that handles the events corresponding
to the output port of the connection
On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue contains the Connections that have pending events to be processed. The pending event itself is encoded in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to be decoded accordingly.
The layout of the portState bitfield is the following:
|- state machn.-| Only one bit is hot among these bits 64 32 16 | 8 4 2 1 | +---+---+---|---+---+---+---| | | | | | | | | | | | | | | From the following flags only one is active in any given time. These bits encode | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits | | | | | | | intact. | | | | | | | | | | | | | +- InReady: The input port is ready to be pulled | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued) | | | | +--------- Pushing: A push is active, but have not arrived yet (queued) | | | +------------- OutReady: The output port is ready to be pushed | | | | | +----------------- InClosed: The input port is closed and will not receive any events. | | A push might be still in flight which will be then processed first. | +--------------------- OutClosed: The output port is closed and will not receive any events. +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event is a failure
Sending an event is usually the following sequence: - An action is requested by a stage logic (push, pull, complete, etc.) - the state machine in portStates is transitioned from a ready state to a pending event - the affected Connection is enqueued
Receiving an event is usually the following sequence: - the Connection to be processed is dequeued - the type of the event is determined from the bits set on portStates - the state machine in portStates is transitioned to a ready state - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed after a bounded number of other events. This property, together with suspendability means that even infinite cycles can be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle edge of a balance is pulled, dissolving the original cycle).
Modifier and Type | Class and Description |
---|---|
static class |
GraphInterpreter.Connection
INERNAL API
|
static class |
GraphInterpreter.DownstreamBoundaryStageLogic<T> |
static class |
GraphInterpreter.Empty$
Marker object that indicates that a port holds no element since it was already grabbed.
|
static class |
GraphInterpreter.Failed |
static class |
GraphInterpreter.Failed$ |
static class |
GraphInterpreter.GraphAssembly
INTERNAL API
|
static class |
GraphInterpreter.GraphAssembly$ |
static class |
GraphInterpreter.UpstreamBoundaryStageLogic<T> |
Constructor and Description |
---|
GraphInterpreter(GraphInterpreter.GraphAssembly assembly,
Materializer materializer,
LoggingAdapter log,
GraphStageLogic[] logics,
GraphInterpreter.Connection[] connections,
scala.Function3<GraphStageLogic,java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput,
boolean fuzzingMode,
ActorRef context) |
Modifier and Type | Method and Description |
---|---|
GraphStageLogic |
activeStage()
INTERNAL API
|
void |
afterStageHasRun(GraphStageLogic logic) |
void |
attachDownstreamBoundary(GraphInterpreter.Connection connection,
GraphInterpreter.DownstreamBoundaryStageLogic<?> logic)
Assign the boundary logic to a given connection.
|
void |
attachDownstreamBoundary(int connection,
GraphInterpreter.DownstreamBoundaryStageLogic<?> logic) |
void |
attachUpstreamBoundary(GraphInterpreter.Connection connection,
GraphInterpreter.UpstreamBoundaryStageLogic<?> logic)
Assign the boundary logic to a given connection.
|
void |
attachUpstreamBoundary(int connection,
GraphInterpreter.UpstreamBoundaryStageLogic<?> logic) |
static int |
Boundary() |
void |
cancel(GraphInterpreter.Connection connection) |
void |
chasePull(GraphInterpreter.Connection connection) |
void |
chasePush(GraphInterpreter.Connection connection) |
void |
complete(GraphInterpreter.Connection connection) |
GraphInterpreter.Connection[] |
connections() |
ActorRef |
context() |
static GraphInterpreter |
currentInterpreter()
INTERNAL API
|
static GraphInterpreter |
currentInterpreterOrNull()
INTERNAL API
|
static boolean |
Debug()
Compile time constant, enable it for debug logging to the console.
|
void |
dumpWaits()
Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.
|
void |
enqueue(GraphInterpreter.Connection connection) |
int |
execute(int eventLimit)
Executes pending events until the given limit is met.
|
void |
fail(GraphInterpreter.Connection connection,
java.lang.Throwable ex) |
void |
finalizeStage(GraphStageLogic logic) |
void |
finish()
Finalizes the state of all stages by calling postStop() (if necessary).
|
boolean |
fuzzingMode() |
static int |
InClosed() |
static int |
InFailed() |
void |
init(Materializer subMat)
Initializes the states of all the stage logics by calling preStart().
|
static int |
InReady() |
boolean |
isCompleted()
Returns true if there are no more running stages and pending events.
|
boolean |
isStageCompleted(GraphStageLogic stage) |
boolean |
isSuspended()
Returns true if there are pending unprocessed events in the event queue.
|
static int |
KeepGoingFlag() |
static int |
KeepGoingMask() |
LoggingAdapter |
log() |
GraphStageLogic[] |
logics() |
Materializer |
materializer() |
java.lang.String |
Name() |
static scala.runtime.Null$ |
NoEvent() |
GraphInterpreter |
nonNull()
INTERNAL API
|
scala.Function3<GraphStageLogic,java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> |
onAsyncInput() |
static int |
OutClosed() |
static int |
OutReady() |
static int |
PullEndFlip() |
static int |
Pulling() |
static int |
PullStartFlip() |
static int |
PushEndFlip() |
static int |
Pushing() |
static int |
PushStartFlip() |
void |
runAsyncInput(GraphStageLogic logic,
java.lang.Object evt,
scala.Function1<java.lang.Object,scala.runtime.BoxedUnit> handler) |
void |
setHandler(GraphInterpreter.Connection connection,
InHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
void |
setHandler(GraphInterpreter.Connection connection,
OutHandler handler)
Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
void |
setKeepGoing(GraphStageLogic logic,
boolean enabled) |
static Attributes[] |
singleNoAttribute() |
Materializer |
subFusingMaterializer() |
java.lang.String |
toString() |
public GraphInterpreter(GraphInterpreter.GraphAssembly assembly, Materializer materializer, LoggingAdapter log, GraphStageLogic[] logics, GraphInterpreter.Connection[] connections, scala.Function3<GraphStageLogic,java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput, boolean fuzzingMode, ActorRef context)
public static final boolean Debug()
public static final scala.runtime.Null$ NoEvent()
public static final int Boundary()
public static final int InReady()
public static final int Pulling()
public static final int Pushing()
public static final int OutReady()
public static final int InClosed()
public static final int OutClosed()
public static final int InFailed()
public static final int PullStartFlip()
public static final int PullEndFlip()
public static final int PushStartFlip()
public static final int PushEndFlip()
public static final int KeepGoingFlag()
public static final int KeepGoingMask()
public static Attributes[] singleNoAttribute()
public static GraphInterpreter currentInterpreter()
public static GraphInterpreter currentInterpreterOrNull()
public Materializer materializer()
public LoggingAdapter log()
public GraphStageLogic[] logics()
public GraphInterpreter.Connection[] connections()
public scala.Function3<GraphStageLogic,java.lang.Object,scala.Function1<java.lang.Object,scala.runtime.BoxedUnit>,scala.runtime.BoxedUnit> onAsyncInput()
public boolean fuzzingMode()
public ActorRef context()
public GraphStageLogic activeStage()
public Materializer subFusingMaterializer()
public java.lang.String Name()
public GraphInterpreter nonNull()
public void attachUpstreamBoundary(GraphInterpreter.Connection connection, GraphInterpreter.UpstreamBoundaryStageLogic<?> logic)
connection
- (undocumented)logic
- (undocumented)public void attachUpstreamBoundary(int connection, GraphInterpreter.UpstreamBoundaryStageLogic<?> logic)
public void attachDownstreamBoundary(GraphInterpreter.Connection connection, GraphInterpreter.DownstreamBoundaryStageLogic<?> logic)
connection
- (undocumented)logic
- (undocumented)public void attachDownstreamBoundary(int connection, GraphInterpreter.DownstreamBoundaryStageLogic<?> logic)
public void setHandler(GraphInterpreter.Connection connection, InHandler handler)
connection
- (undocumented)handler
- (undocumented)public void setHandler(GraphInterpreter.Connection connection, OutHandler handler)
connection
- (undocumented)handler
- (undocumented)public boolean isSuspended()
public boolean isCompleted()
public void init(Materializer subMat)
null
will reuse the normal
materializer for the GraphInterpreter—fusing is only an optimization.subMat
- (undocumented)public void finish()
public int execute(int eventLimit)
eventLimit
- (undocumented)public void runAsyncInput(GraphStageLogic logic, java.lang.Object evt, scala.Function1<java.lang.Object,scala.runtime.BoxedUnit> handler)
public void enqueue(GraphInterpreter.Connection connection)
public void afterStageHasRun(GraphStageLogic logic)
public boolean isStageCompleted(GraphStageLogic stage)
public void setKeepGoing(GraphStageLogic logic, boolean enabled)
public void finalizeStage(GraphStageLogic logic)
public void chasePush(GraphInterpreter.Connection connection)
public void chasePull(GraphInterpreter.Connection connection)
public void complete(GraphInterpreter.Connection connection)
public void fail(GraphInterpreter.Connection connection, java.lang.Throwable ex)
public void cancel(GraphInterpreter.Connection connection)
public void dumpWaits()
Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
public java.lang.String toString()
toString
in class java.lang.Object