Custom stream processing
Loading

Custom stream processing

While the processing vocabulary of Akka Streams is quite rich (see the Streams Cookbook for examples) it is sometimes necessary to define new transformation stages either because some functionality is missing from the stock operations, or for performance reasons. In this part we show how to build custom processing stages and graph junctions of various kinds.

Custom linear processing stages

To extend the available transformations on a Flow or Source one can use the transform() method which takes a factory function returning a Stage. Stages come in different flavors swhich we will introduce in this page.

Using PushPullStage

The most elementary transformation stage is the PushPullStage which can express a large class of algorithms working on streams. A PushPullStage can be illustrated as a box with two "input" and two "output ports" as it is seen in the illustration below.

../_images/stage_conceptual1.png

The "input ports" are implemented as event handlers onPush(elem,ctx) and onPull(ctx) while "output ports" correspond to methods on the Context object that is handed as a parameter to the event handlers. By calling exactly one "output port" method we wire up these four ports in various ways which we demonstrate shortly.

Warning

There is one very important rule to remember when working with a Stage. Exactly one method should be called on the currently passed Context exactly once and as the last statement of the handler where the return type of the called method matches the expected return type of the handler. Any violation of this rule will almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions to this rule are the query methods isHolding() and isFinishing()

To illustrate these concepts we create a small PushPullStage that implements the map transformation.

../_images/stage_map1.png

Map calls ctx.push() from the onPush() handler and it also calls ctx.pull() form the onPull handler resulting in the conceptual wiring above, and fully expressed in code below:

class Map[A, B](f: A => B) extends PushPullStage[A, B] {
  override def onPush(elem: A, ctx: Context[B]): Directive =
    ctx.push(f(elem))

  override def onPull(ctx: Context[B]): Directive =
    ctx.pull()
}

Map is a typical example of a one-to-one transformation of a stream. To demonstrate a many-to-one stage we will implement filter. The conceptual wiring of Filter looks like this:

../_images/stage_filter1.png

As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise we return the "ball" to our upstream so that we get the new element. This is achieved by modifying the map example by adding a conditional in the onPush handler and decide between a ctx.pull() or ctx.push() call (and of course not having a mapping f function).

class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
  override def onPush(elem: A, ctx: Context[A]): Directive =
    if (p(elem)) ctx.push(elem)
    else ctx.pull()

  override def onPull(ctx: Context[A]): Directive =
    ctx.pull()
}

To complete the picture we define a one-to-many transformation as the next step. We chose a straightforward example stage that emits every upstream element twice downstream. The conceptual wiring of this stage looks like this:

../_images/stage_doubler1.png

This is a stage that has state: the last element it has seen, and a flag oneLeft that indicates if we have duplicated this last element already or not. Looking at the code below, the reader might notice that our onPull method is more complex than it is demonstrated by the figure above. The reason for this is completion handling, which we will explain a little bit later. For now it is enough to look at the if(!ctx.isFinishing) block which corresponds to the logic we expect by looking at the conceptual picture.

class Duplicator[A]() extends PushPullStage[A, A] {
  private var lastElem: A = _
  private var oneLeft = false

  override def onPush(elem: A, ctx: Context[A]): Directive = {
    lastElem = elem
    oneLeft = true
    ctx.push(elem)
  }

  override def onPull(ctx: Context[A]): Directive =
    if (!ctx.isFinishing) {
      // the main pulling logic is below as it is demonstrated on the illustration
      if (oneLeft) {
        oneLeft = false
        ctx.push(lastElem)
      } else
        ctx.pull()
    } else {
      // If we need to emit a final element after the upstream
      // finished
      if (oneLeft) ctx.pushAndFinish(lastElem)
      else ctx.finish()
    }

  override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
    ctx.absorbTermination()

}

Finally, to demonstrate all of the stages above, we put them together into a processing chain, which conceptually would correspond to the following structure:

../_images/stage_chain1.png

In code this is only a few lines, using the transform method to inject our custom processing into a stream:

val runnable: RunnableFlow = Source(1 to 10)
  .transform(() => new Filter(_ % 2 == 0))
  .transform(() => new Duplicator())
  .transform(() => new Map(_ / 2))
  .to(sink)

Completion handling

Completion handling usually (but not exclusively) comes into the picture when processing stages need to emit a few more elements after their upstream source has been completed. We have seen an example of this in our Duplicator class where the last element needs to be doubled even after the upstream neighbor stage has been completed. Since the onUpstreamFinish() handler expects a TerminationDirective as the return type we are only allowed to call ctx.finish(), ctx.fail() or ctx.absorbTermination(). Since the first two of these available methods will immediately terminate, our only option is absorbTermination(). It is also clear from the return type of onUpstreamFinish that we cannot call ctx.push() but we need to emit elements somehow! The trick is that after calling absorbTermination() the onPull() handler will be called eventually, and at the same time ctx.isFinishing will return true, indicating that ctx.pull() cannot be called anymore. Now we are free to emit additional elementss and call ctx.finish() or ctx.pushAndFinish() eventually to finish processing.

Note

The reason for this slightly complex termination sequence is that the underlying onComplete signal of Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that our push/pull structure that was illustrated in the figure of our custom processing chain does not apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a pipe (it bounces back on Filter, Duplicator for example) cannot describe the termination signals. By calling absorbTermination() the execution environment checks if the conceptual token was above the current stage at that time (which means that it will never come back, so the environment immediately calls onPull) or it was below (which means that it will come back eventually, so the environment does not need to call anything yet).

Using PushStage

Many one-to-one and many-to-one transformations do not need to override the onPull() handler at all since all they do is just propagate the pull upwards. For such transformations it is better to extend PushStage directly. For example our Map and Filter would look like this:

class Map[A, B](f: A => B) extends PushStage[A, B] {
  override def onPush(elem: A, ctx: Context[B]): Directive =
    ctx.push(f(elem))
}

class Filter[A](p: A => Boolean) extends PushStage[A, A] {
  override def onPush(elem: A, ctx: Context[A]): Directive =
    if (p(elem)) ctx.push(elem)
    else ctx.pull()
}

The reason to use PushStage is not just cosmetic: internal optimizations rely on the fact that the onPull method only calls ctx.pull() and allow the environment do process elements faster than without this knowledge. By extending PushStage the environment can be sure that onPull() was not overridden since it is final on PushStage.

Using StatefulStage

On top of PushPullStage which is the most elementary and low-level abstraction and PushStage that is a convenience class that also informs the environment about possible optimizations StatefulStage is a new tool that builds on PushPullStage directly, adding various convenience methods on top of it. It is possible to explicitly maintain state-machine like states using its become() method to encapsulates states explicitly. There is also a handy emit() method that simplifies emitting multiple values given as an iterator. To demonstrate this feature we reimplemented Duplicator in terms of a StatefulStage:

class Duplicator[A]() extends StatefulStage[A, A] {
  override def initial: StageState[A, A] = new StageState[A, A] {
    override def onPush(elem: A, ctx: Context[A]): Directive =
      emit(List(elem, elem).iterator, ctx)
  }
}

Using DetachedStage

TODO

Custom graph processing junctions

To extend available fan-in and fan-out structures (graph stages) Akka Streams include FlexiMerge and FlexiRoute which provide an intuitive DSL which allows to describe which upstream or downstream stream elements should be pulled from or emitted to.

Using FlexiMerge

FlexiMerge can be used to describe a fan-in element which contains some logic about which upstream stage the merge should consume elements. It is recommended to create your custom fan-in stage as a separate class, name it appropriately to the behavior it is exposing and reuse it this way – similarily as you would use built-in fan-in stages.

The first flexi merge example we are going to implement is a so-called "preferring merge", in which one of the input ports is preferred, e.g. if the merge could pull from the preferred or another secondary input port, it will pull from the preferred port, only pulling from the secondary ports once the preferred one does not have elements available.

Implementing a custom merge stage is done by extending the FlexiMerge trait, exposing its input ports and finally defining the logic which will decide how this merge should behave. First we need to create the input ports which are used to wire up the fan-in element in a FlowGraph. These input ports must be properly typed and their names should indicate what kind of port it is:

class PreferringMerge extends FlexiMerge[Int] {
  import akka.stream.scaladsl.FlexiMerge._

  val preferred = createInputPort[Int]()
  val secondary1 = createInputPort[Int]()
  val secondary2 = createInputPort[Int]()

  def createMergeLogic = new MergeLogic[Int] {
    override def inputHandles(inputCount: Int) = {
      require(inputCount == 3, s"PreferringMerge must have 3 connected inputs, was $inputCount")
      Vector(preferred, secondary1, secondary2)
    }

    override def initialState =
      State[Int](ReadPreferred(preferred)(secondary1, secondary2)) {
        (ctx, input, element) =>
          ctx.emit(element)
          SameState
      }
  }
}

Next we implement the createMergeLogic method, which will be used as factory of merges MergeLogic. A new MergeLogic object will be created for each materialized stream, so it is allowed to be stateful.

The MergeLogic defines the behaviour of our merge stage, and may be stateful (for example to buffer some elements internally). The first method we must implement in a merge logic is inputHandles in which we have the opportunity to validate the number of connected input ports, e.g. in our preferring merge we only require that at least one input is connected.

Warning

While a MergeLogic instance may be stateful, the FlexiMerge instance must not hold any mutable state, since it may be shared across several materialized FlowGraph instances.

Next we implement the initialState method, which returns the behaviour of the merge stage. A MergeLogic#State defines the behaviour of the merge by signaling which input ports it is interested in consuming, and how to handle the element once it has been pulled from its upstream. Signalling which input port we are interested in pulling data from is done by using an appropriate read condition. Available read conditions include:

  • Read(input) - reads from only the given input,
  • ReadAny(inputs) – reads from any of the given inputs,
  • ReadPreferred(preferred)(secondaries) – reads from the preferred input if elements available, otherwise from one of the secondaries,
  • ReadAll(inputs) – reads from all given inputs (like Zip), and offers an ReadAllInputs as the element passed into the state function, which allows to obtain the pulled element values in a type-safe way.

In our case we use the ReadPreferred read condition which has the exact semantics which we need to implement our preferring merge – it pulls elements from the preferred input port if there are any available, otherwise reverting to pulling from the secondary inputs. The context object passed into the state function allows us to interact with the connected streams, for example by emitting an element, which was just pulled from the given input, or signalling completion or failure to the merges downstream stage.

The state function must always return the next behaviour to be used when an element should be pulled from its upstreams, we use the special SameState object which signals FlexiMerge that no state transition is needed.

Note

As response to an input element it is allowed to emit at most one output element.

Implementing Zip-like merges

More complex fan-in junctions may require not only multiple States but also sharing state between those states. As MergeLogic is allowed to be stateful, it can be easily used to hold the state of the merge junction.

We now implement the equivalent of the built-in Zip junction by using the property that a the MergeLogic can be stateful and that each read is followed by a state transition (much like in Akka FSM or Actor#become).

class Zip[A, B] extends FlexiMerge[(A, B)] {
  import FlexiMerge._
  val left = createInputPort[A]()
  val right = createInputPort[B]()

  def createMergeLogic = new MergeLogic[(A, B)] {
    var lastInA: A = _

    override def inputHandles(inputCount: Int) = {
      require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount")
      Vector(left, right)
    }

    val readA: State[A] = State[A](Read(left)) { (ctx, input, element) =>
      lastInA = element
      readB
    }

    val readB: State[B] = State[B](Read(right)) { (ctx, input, element) =>
      ctx.emit((lastInA, element))
      readA
    }

    override def initialState: State[_] = readA

    override def initialCompletionHandling = eagerClose
  }
}

The above style of implementing complex flexi merges is useful when we need fine grained control over consuming from certain input ports. Sometimes however it is simpler to strictly consume all of a given set of inputs. In the Zip rewrite below we use the ReadAll read condition, which behaves slightly differently than the other read conditions, as the element it is emitting is of the type ReadAllInputs instead of directly handing over the pulled elements:

class Zip[A, B] extends FlexiMerge[(A, B)] {
  import FlexiMerge._
  val left = createInputPort[A]()
  val right = createInputPort[B]()

  def createMergeLogic = new MergeLogic[(A, B)] {
    override def inputHandles(inputCount: Int) = {
      require(inputCount == 2, s"Zip must have two connected inputs, was $inputCount")
      Vector(left, right)
    }

    override def initialState: State[_] =
      State[ReadAllInputs](ReadAll(left, right)) { (ctx, _, inputs) =>
        val a: A = inputs(left)
        val b: B = inputs(right)
        ctx.emit((a, b))
        SameState
      }

    override def initialCompletionHandling = eagerClose
  }
}

Thanks to being handed a ReadAllInputs instance instead of the elements directly it is possible to pick elements in a type-safe way based on their input port.

Connecting your custom junction is as simple as creating an instance and connecting Sources and Sinks to its ports (notice that the merged output port is named out):

val head = Sink.head[(Int, String)]
FlowGraph { implicit b =>
  import FlowGraphImplicits._

  val zip = Zip[Int, String]

  Source.single(1)   ~> zip.left
  Source.single("A") ~> zip.right
                        zip.out ~> head
}

Completion handling

Completion handling in FlexiMerge is defined by an CompletionHandling object which can react on completion and failure signals from its upstream input ports. The default strategy is to remain running while at-least-one upstream input port which are declared to be consumed in the current state is still running (i.e. has not signalled completion or failure).

Customising completion can be done via overriding the MergeLogic#initialCompletionHandling method, or from within a State by calling ctx.changeCompletionHandling(handling). Other than the default completion handling (as late as possible) FlexiMerge also provides an eagerClose completion handling which completes (or fails) its downstream as soon as at least one of its upstream inputs completes (or fails).

In the example below the we implement an ImportantWithBackups fan-in stage which can only keep operating while the important and at-least-one of the replica inputs are active. Therefore in our custom completion strategy we have to investigate which input has completed or failed and act accordingly. If the important input completed or failed we propagate this downstream completing the stream, on the other hand if the first replicated input fails, we log the exception and instead of failing the downstream swallow this exception (as one failed replica is still acceptable). Then we change the completion strategy to eagerClose which will propagate any future completion or failure event right to this stages downstream effectively shutting down the stream.

class ImportantWithBackups[A] extends FlexiMerge[A] {
  import FlexiMerge._

  val important = createInputPort[A]()
  val replica1 = createInputPort[A]()
  val replica2 = createInputPort[A]()

  def createMergeLogic = new MergeLogic[A] {
    val inputs = Vector(important, replica1, replica2)

    override def inputHandles(inputCount: Int) = {
      require(inputCount == 3, s"Must connect 3 inputs, connected only $inputCount")
      inputs
    }

    override def initialCompletionHandling =
      CompletionHandling(
        onUpstreamFinish = (ctx, input) => input match {
          case `important` =>
            log.info("Important input completed, shutting down.")
            ctx.finish()
            SameState

          case replica =>
            log.info("Replica {} completed, " +
              "no more replicas available, " +
              "applying eagerClose completion handling.", replica)

            ctx.changeCompletionHandling(eagerClose)
            SameState
        },
        onUpstreamFailure = (ctx, input, cause) => input match {
          case `important` =>
            ctx.fail(cause)
            SameState

          case replica =>
            log.error(cause, "Replica {} failed, " +
              "no more replicas available, " +
              "applying eagerClose completion handling.", replica)

            ctx.changeCompletionHandling(eagerClose)
            SameState
        })

    override def initialState = State[A](ReadAny(inputs)) {
      (ctx, input, element) =>
        ctx.emit(element)
        SameState
    }
  }
}

In case you want to change back to the default completion handling, it is available as MergeLogic#defaultCompletionHandling.

It is not possible to emit elements from the completion handling, since completion handlers may be invoked at any time (without regard to downstream demand being available).

Using FlexiRoute

Similarily to using FlexiMerge, implementing custom fan-out stages requires extending the FlexiRoute class and with a RouteLogic object which determines how the route should behave.

The first flexi route stage that we are going to implement is Unzip, which consumes a stream of pairs and splits it into two streams of the first and second elements of each tuple.

A FlexiRoute has exactly-one input port (in our example, type parameterized as (A,B)), and may have multiple output ports, all of which must be created before hand (they can not be added dynamically), however not all output ports must be connected. You can validate the number of connected output ports in the RouteLogic#outputHandles method, which receives the number of connected output ports for a given instance of the flexi route in a given materialization. The Vector returned from outputHandles must include all output ports which are to be used by this junction:

class Unzip[A, B] extends FlexiRoute[(A, B)] {
  import FlexiRoute._
  val outA = createOutputPort[A]()
  val outB = createOutputPort[B]()

  override def createRouteLogic() = new RouteLogic[(A, B)] {

    override def outputHandles(outputCount: Int) = {
      require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount")
      Vector(outA, outB)
    }

    override def initialState = State[Any](DemandFromAll(outA, outB)) {
      (ctx, _, element) =>
        val (a, b) = element
        ctx.emit(outA, a)
        ctx.emit(outB, b)
        SameState
    }

    override def initialCompletionHandling = eagerClose
  }
}

Next we implement RouteLogic#initialState by providing a State that uses the DemandFromAll demand condition to signal to flexi route that elements can only be emitted from this stage when demand is available from all given downstream output ports. Other available demand conditions are:

  • DemandFrom(output) - triggers when the given output port has pending demand,
  • DemandFromAny(outputs) - triggers when any of the given output ports has pending demand,
  • DemandFromAll(outputs) - triggers when all of the given output ports has pending demand.

Since the Unzip junction we're implementing signals both downstreams stages at the same time, we use DemandFromAll, unpack the incoming tuple in the state function and signal its first element to the left stream, and the second element of the tuple to the right stream. Notice that since we are emitting values of different types (A and B), the type parameter of this State[_] must be set to Any. This type can be utilised more efficiently when a junction is emitting the same type of element to its downstreams e.g. in all strictly routing stages.

The state function must always return the next behaviour to be used when an element should be emitted, we use the special SameState object which signals FlexiRoute that no state transition is needed.

Warning

While a RouteLogic instance may be stateful, the FlexiRoute instance must not hold any mutable state, since it may be shared across several materialized FlowGraph instances.

Note

It is only allowed to emit at most one element to each output in response to onInput, IllegalStateException is thrown.

Completion handling

Completion handling in FlexiRoute is handled similarily to FlexiMerge (which is explained in depth in Completion handling), however in addition to reacting to its upstreams completion or failure it can also react to its downstream stages cancelling their subscriptions. The default completion handling for FlexiRoute (defined in RouteLogic#defaultCompletionHandling) is to continue running until all of its downstreams have cancelled their subscriptions, or the upstream has completed / failed.

In order to customise completion handling we can override overriding the RouteLogic#initialCompletionHandling method, or call ctx.changeCompletionHandling(handling) from within a State. Other than the default completion handling (as late as possible) FlexiRoute also provides an eagerClose completion handling which completes all its downstream streams as well as cancels its upstream as soon as any of its downstream stages cancels its subscription.

In the example below we implement a custom completion handler which completes the entire stream eagerly if the important downstream cancels, otherwise (if any other downstream cancels their subscription) the ImportantRoute keeps running.

class ImportantRoute[A] extends FlexiRoute[A] {
  import FlexiRoute._
  val important = createOutputPort[A]()
  val additional1 = createOutputPort[A]()
  val additional2 = createOutputPort[A]()

  override def createRouteLogic() = new RouteLogic[A] {
    val outputs = Vector(important, additional1, additional2)

    override def outputHandles(outputCount: Int) = {
      require(outputCount == 3, s"Must have three connected outputs, was $outputCount")
      outputs
    }

    override def initialCompletionHandling =
      CompletionHandling(
        // upstream:
        onUpstreamFinish = (ctx) => (),
        onUpstreamFailure = (ctx, thr) => (),
        // downstream:
        onDownstreamFinish = (ctx, output) => output match {
          case `important` =>
            // finish all downstreams, and cancel the upstream
            ctx.finish()
            SameState
          case _ =>
            SameState
        })

    override def initialState = State[A](DemandFromAny(outputs)) {
      (ctx, output, element) =>
        ctx.emit(output, element)
        SameState
    }
  }
}

Notice that State changes are only allowed in reaction to downstream cancellations, and not in the upstream completion/failure cases. This is because since there is only one upstream, there is nothing else to do than possibly flush buffered elements and continue with shutting down the entire stream.

It is not possible to emit elements from the completion handling, since completion handlers may be invoked at any time (without regard to downstream demand being available).

Contents