final class GraphInterpreter extends AnyRef

INTERNAL API

From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a GraphInterpreter#GraphAssembly object and provides facilities to execute and interact with this assembly. The lifecycle of the Interpreter is roughly the following:

  • Boundary logics are attached via attachDownstreamBoundary() and attachUpstreamBoundary()
  • init() is called
  • execute() is called whenever there is need for execution, providing an upper limit on the processed events
  • finish() is called before the interpreter is disposed, preferably after isCompleted returned true, although in abort cases this is not strictly necessary

The execute() method of the interpreter accepts an upper bound on the events it will process. After this limit is reached or there are no more pending events to be processed, the call returns. It is possible to inspect if there are unprocessed events left via the isSuspended method. isCompleted returns true once all stages reported completion inside the interpreter.

The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations on the hot paths.

One of the basic abstractions inside the interpreter is the akka.stream.impl.fusing.GraphInterpreter.Connection. A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair). The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion or errors across the Connection. In particular

  • portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this connection. This bitfield is used to decode the event that is in-flight.
  • connectionSlot contains a potential element or exception that accompanies the event encoded in the portStates bitfield
  • inHandler contains the InHandler instance that handles the events corresponding to the input port of the connection
  • outHandler contains the OutHandler instance that handles the events corresponding to the output port of the connection

On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue contains the Connections that have pending events to be processed. The pending event itself is encoded in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to be decoded accordingly.

The layout of the portState bitfield is the following:

|- state machn.-| Only one bit is hot among these bits 64 32 16 | 8 4 2 1 | +---+---+---|---+---+---+---| | | | | | | | | | | | | | | From the following flags only one is active in any given time. These bits encode | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits | | | | | | | intact. | | | | | | | | | | | | | +- InReady: The input port is ready to be pulled | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued) | | | | +--------- Pushing: A push is active, but have not arrived yet (queued) | | | +------------- OutReady: The output port is ready to be pushed | | | | | +----------------- InClosed: The input port is closed and will not receive any events. | | A push might be still in flight which will be then processed first. | +--------------------- OutClosed: The output port is closed and will not receive any events. +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event is a failure

Sending an event is usually the following sequence:

  • An action is requested by a stage logic (push, pull, complete, etc.)
  • the state machine in portStates is transitioned from a ready state to a pending event
  • the affected Connection is enqueued

Receiving an event is usually the following sequence:

  • the Connection to be processed is dequeued
  • the type of the event is determined from the bits set on portStates
  • the state machine in portStates is transitioned to a ready state
  • using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.

Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed after a bounded number of other events. This property, together with suspendability means that even infinite cycles can be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle edge of a balance is pulled, dissolving the original cycle).

Source
GraphInterpreter.scala
Linear Supertypes
Type Hierarchy
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. GraphInterpreter
  2. AnyRef
  3. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new GraphInterpreter(assembly: GraphAssembly, materializer: Materializer, log: LoggingAdapter, logics: Array[GraphStageLogic], connections: Array[Connection], onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, fuzzingMode: Boolean, context: ActorRef)

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to any2stringadd[GraphInterpreter] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (GraphInterpreter, B)
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to ArrowAssoc[GraphInterpreter] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. def Name: String
  7. def afterStageHasRun(logic: GraphStageLogic): Unit
  8. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  9. def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit
  10. def attachDownstreamBoundary(connection: Connection, logic: DownstreamBoundaryStageLogic[_]): Unit

    Assign the boundary logic to a given connection.

    Assign the boundary logic to a given connection. This will serve as the interface to the external world (outside the interpreter) to process and inject events.

  11. def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit
  12. def attachUpstreamBoundary(connection: Connection, logic: UpstreamBoundaryStageLogic[_]): Unit

    Assign the boundary logic to a given connection.

    Assign the boundary logic to a given connection. This will serve as the interface to the external world (outside the interpreter) to process and inject events.

  13. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  14. val connections: Array[Connection]
  15. val context: ActorRef
  16. def dumpWaits(): Unit

    Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.

    Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.

    Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very simplistic tool, make sure you are understanding what you are doing and then it will serve you well.

  17. def enqueue(connection: Connection): Unit
  18. def ensuring(cond: (GraphInterpreter) ⇒ Boolean, msg: ⇒ Any): GraphInterpreter
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to Ensuring[GraphInterpreter] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  19. def ensuring(cond: (GraphInterpreter) ⇒ Boolean): GraphInterpreter
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to Ensuring[GraphInterpreter] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  20. def ensuring(cond: Boolean, msg: ⇒ Any): GraphInterpreter
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to Ensuring[GraphInterpreter] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  21. def ensuring(cond: Boolean): GraphInterpreter
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to Ensuring[GraphInterpreter] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  22. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  23. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  24. def execute(eventLimit: Int): Int

    Executes pending events until the given limit is met.

    Executes pending events until the given limit is met. If there were remaining events, isSuspended will return true.

  25. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  26. def finish(): Unit

    Finalizes the state of all stages by calling postStop() (if necessary).

  27. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to StringFormat[GraphInterpreter] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  28. val fuzzingMode: Boolean
  29. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
  30. def hashCode(): Int
    Definition Classes
    AnyRef → Any
  31. def init(subMat: Materializer): Unit

    Initializes the states of all the stage logics by calling preStart().

    Initializes the states of all the stage logics by calling preStart(). The passed-in materializer is intended to be a SubFusingActorMaterializer that avoids creating new Actors when stages materialize sub-flows. If no such materializer is available, passing in null will reuse the normal materializer for the GraphInterpreter—fusing is only an optimization.

  32. def isCompleted: Boolean

    Returns true if there are no more running stages and pending events.

  33. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  34. def isStageCompleted(stage: GraphStageLogic): Boolean
  35. def isSuspended: Boolean

    Returns true if there are pending unprocessed events in the event queue.

  36. val log: LoggingAdapter
  37. val logics: Array[GraphStageLogic]
  38. val materializer: Materializer
  39. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  40. final def notify(): Unit
    Definition Classes
    AnyRef
  41. final def notifyAll(): Unit
    Definition Classes
    AnyRef
  42. val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit
  43. def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit): Unit
  44. def setHandler(connection: Connection, handler: OutHandler): Unit

    Dynamic handler changes are communicated from a GraphStageLogic by this method.

  45. def setHandler(connection: Connection, handler: InHandler): Unit

    Dynamic handler changes are communicated from a GraphStageLogic by this method.

  46. def subFusingMaterializer: Materializer
  47. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  48. def toString(): String
    Definition Classes
    GraphInterpreter → AnyRef → Any
  49. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  50. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  51. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  52. def [B](y: B): (GraphInterpreter, B)
    Implicit
    This member is added by an implicit conversion from GraphInterpreter to ArrowAssoc[GraphInterpreter] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from GraphInterpreter to any2stringadd[GraphInterpreter]

Inherited by implicit conversion StringFormat from GraphInterpreter to StringFormat[GraphInterpreter]

Inherited by implicit conversion Ensuring from GraphInterpreter to Ensuring[GraphInterpreter]

Inherited by implicit conversion ArrowAssoc from GraphInterpreter to ArrowAssoc[GraphInterpreter]

Ungrouped