Class GraphInterpreter


  • public final class GraphInterpreter
    extends java.lang.Object
    INTERNAL API

    From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a GraphInterpreter#GraphAssembly object and provides facilities to execute and interact with this assembly. The lifecycle of the Interpreter is roughly the following: - init() is called - execute() is called whenever there is need for execution, providing an upper limit on the processed events - finish() is called before the interpreter is disposed, preferably after isCompleted() returned true, although in abort cases this is not strictly necessary

    The execute() method of the interpreter accepts an upper bound on the events it will process. After this limit is reached or there are no more pending events to be processed, the call returns. It is possible to inspect if there are unprocessed events left via the isSuspended() method. isCompleted() returns true once all operators reported completion inside the interpreter.

    The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations on the hot paths.

    One of the basic abstractions inside the interpreter is the GraphInterpreter.Connection. A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair). The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion or errors across the Connection. In particular - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this connection. This bitfield is used to decode the event that is in-flight. - connectionSlot contains a potential element or exception that accompanies the event encoded in the portStates bitfield - inHandler contains the InHandler instance that handles the events corresponding to the input port of the connection - outHandler contains the OutHandler instance that handles the events corresponding to the output port of the connection

    On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue contains the Connections that have pending events to be processed. The pending event itself is encoded in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to be decoded accordingly.

    The layout of the portState bitfield is the following:

    |- state machn.-| Only one bit is hot among these bits 64 32 16 | 8 4 2 1 | +---+---+---|---+---+---+---| | | | | | | | | | | | | | | From the following flags only one is active in any given time. These bits encode | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits | | | | | | | intact. | | | | | | | | | | | | | +- InReady: The input port is ready to be pulled | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued) | | | | +--------- Pushing: A push is active, but have not arrived yet (queued) | | | +------------- OutReady: The output port is ready to be pushed | | | | | +----------------- InClosed: The input port is closed and will not receive any events. | | A push might be still in flight which will be then processed first. | +--------------------- OutClosed: The output port is closed and will not receive any events. +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event is a failure

    Sending an event is usually the following sequence: - An action is requested by an operator logic (push, pull, complete, etc.) - the state machine in portStates is transitioned from a ready state to a pending event - the affected Connection is enqueued

    Receiving an event is usually the following sequence: - the Connection to be processed is dequeued - the type of the event is determined from the bits set on portStates - the state machine in portStates is transitioned to a ready state - using the inHandlers/outHandlers table the corresponding callback is called on the operator logic.

    Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed after a bounded number of other events. This property, together with suspendability means that even infinite cycles can be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle edge of a balance is pulled, dissolving the original cycle).

    • Method Detail

      • Debug

        public static final boolean Debug()
        Compile time constant, enable it for debug logging to the console.
        Returns:
        (undocumented)
      • NoEvent

        public static final scala.runtime.Null$ NoEvent()
      • InReady

        public static final int InReady()
      • Pulling

        public static final int Pulling()
      • Pushing

        public static final int Pushing()
      • OutReady

        public static final int OutReady()
      • InClosed

        public static final int InClosed()
      • OutClosed

        public static final int OutClosed()
      • InFailed

        public static final int InFailed()
      • PullStartFlip

        public static final int PullStartFlip()
      • PullEndFlip

        public static final int PullEndFlip()
      • PushStartFlip

        public static final int PushStartFlip()
      • PushEndFlip

        public static final int PushEndFlip()
      • KeepGoingFlag

        public static final int KeepGoingFlag()
      • KeepGoingMask

        public static final int KeepGoingMask()
      • singleNoAttribute

        public static Attributes[] singleNoAttribute()
      • currentInterpreter

        public static GraphInterpreter currentInterpreter()
        INTERNAL API
        Returns:
        (undocumented)
      • currentInterpreterOrNull

        public static GraphInterpreter currentInterpreterOrNull()
        INTERNAL API
        Returns:
        (undocumented)
      • onAsyncInput

        public scala.Function4<GraphStageLogic,​java.lang.Object,​scala.concurrent.Promise<Done>,​scala.Function1<java.lang.Object,​scala.runtime.BoxedUnit>,​scala.runtime.BoxedUnit> onAsyncInput()
      • fuzzingMode

        public boolean fuzzingMode()
      • activeStage

        public GraphStageLogic activeStage()
        INTERNAL API
        Returns:
        (undocumented)
      • subFusingMaterializer

        public Materializer subFusingMaterializer()
      • Name

        public java.lang.String Name()
      • nonNull

        public GraphInterpreter nonNull()
        INTERNAL API
        Returns:
        (undocumented)
      • setHandler

        public void setHandler​(GraphInterpreter.Connection connection,
                               InHandler handler)
        Dynamic handler changes are communicated from a GraphStageLogic by this method.
        Parameters:
        connection - (undocumented)
        handler - (undocumented)
      • setHandler

        public void setHandler​(GraphInterpreter.Connection connection,
                               OutHandler handler)
        Dynamic handler changes are communicated from a GraphStageLogic by this method.
        Parameters:
        connection - (undocumented)
        handler - (undocumented)
      • isSuspended

        public boolean isSuspended()
        Returns true if there are pending unprocessed events in the event queue.
        Returns:
        (undocumented)
      • isCompleted

        public boolean isCompleted()
        Returns true if there are no more running operators and pending events.
        Returns:
        (undocumented)
      • init

        public void init​(Materializer subMat)
        Initializes the states of all the operator logics by calling preStart(). The passed-in materializer is intended to be a SubFusingActorMaterializer that avoids creating new Actors when operators materialize sub-flows. If no such materializer is available, passing in null will reuse the normal materializer for the GraphInterpreter—fusing is only an optimization.
        Parameters:
        subMat - (undocumented)
      • finish

        public void finish()
        Finalizes the state of all operators by calling postStop() (if necessary).
      • execute

        public int execute​(int eventLimit)
        Executes pending events until the given limit is met. If there were remaining events, isSuspended will return true.
        Parameters:
        eventLimit - (undocumented)
        Returns:
        (undocumented)
      • runAsyncInput

        public void runAsyncInput​(GraphStageLogic logic,
                                  java.lang.Object evt,
                                  scala.concurrent.Promise<Done> promise,
                                  scala.Function1<java.lang.Object,​scala.runtime.BoxedUnit> handler)
      • afterStageHasRun

        public void afterStageHasRun​(GraphStageLogic logic)
      • isStageCompleted

        public boolean isStageCompleted​(GraphStageLogic stage)
      • setKeepGoing

        public void setKeepGoing​(GraphStageLogic logic,
                                 boolean enabled)
      • toSnapshot

        public RunningInterpreter toSnapshot()
        Debug utility to dump the "waits-on" relationships in an AST format for rendering in some suitable format for analysis of deadlocks.

        Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very simplistic tool, make sure you are understanding what you are doing and then it will serve you well.

        Returns:
        (undocumented)