Receive Pipeline Pattern - Version 2.4.20

Receive Pipeline Pattern

The Receive Pipeline Pattern lets you define general interceptors for your messages and plug an arbitrary amount of them into your Actors. It's useful for defining cross aspects to be applied to all or many of your Actors.

Some Possible Use Cases

  • Measure the time spent for processing the messages
  • Audit messages with an associated author
  • Log all important messages
  • Secure restricted messages
  • Text internationalization

Interceptors

Multiple interceptors can be added to actors that mixin the ReceivePipeline trait. These interceptors internally define layers of decorators around the actor's behavior. The first interceptor defines an outer decorator which delegates to a decorator corresponding to the second interceptor and so on, until the last interceptor which defines a decorator for the actor's Receive.

The first or outermost interceptor receives messages sent to the actor.

For each message received by an interceptor, the interceptor will typically perform some processing based on the message and decide whether or not to pass the received message (or some other message) to the next interceptor.

An Interceptor is a type alias for PartialFunction[Any, Delegation]. The Any input is the message it receives from the previous interceptor (or, in the case of the first interceptor, the message that was sent to the actor). The Delegation return type is used to control what (if any) message is passed on to the next interceptor.

A simple example

To pass a transformed message to the actor (or next inner interceptor) an interceptor can return Inner(newMsg) where newMsg is the transformed message.

The following interceptor shows this. It intercepts Int messages, adds one to them and passes on the incremented value to the next interceptor.

val incrementInterceptor: Interceptor = {
  case i: Int ⇒ Inner(i + 1)
}

Building the Pipeline

To give your Actor the ability to pipeline received messages in this way, you'll need to mixin with the ReceivePipeline trait. It has two methods for controlling the pipeline, pipelineOuter and pipelineInner, both receiving an Interceptor. The first one adds the interceptor at the beginning of the pipeline and the second one adds it at the end, just before the current Actor's behavior.

In this example we mixin our Actor with the ReceivePipeline trait and we add Increment and Double interceptors with pipelineInner. They will be applied in this very order.

class PipelinedActor extends Actor with ReceivePipeline {

  // Increment
  pipelineInner { case i: Int ⇒ Inner(i + 1) }
  // Double
  pipelineInner { case i: Int ⇒ Inner(i * 2) }

  def receive: Receive = { case any ⇒ println(any) }
}

actor ! 5 // prints 12 = (5 + 1) * 2

If we add Double with pipelineOuter it will be applied before Increment so the output is 11

// Increment
pipelineInner { case i: Int ⇒ Inner(i + 1) }
// Double
pipelineOuter { case i: Int ⇒ Inner(i * 2) }

// prints 11 = (5 * 2) + 1

Interceptors Mixin

Defining all the pipeline inside the Actor implementation is good for showing up the pattern, but it isn't very practical. The real flexibility of this pattern comes when you define every interceptor in its own trait and then you mixin any of them into your Actors.

Let's see it in an example. We have the following model:

val texts = Map(
  "that.rug_EN"  "That rug really tied the room together.",
  "your.opinion_EN"  "Yeah, well, you know, that's just, like, your opinion, man.",
  "that.rug_ES"  "Esa alfombra realmente completaba la sala.",
  "your.opinion_ES"  "Sí, bueno, ya sabes, eso es solo, como, tu opinion, amigo.")

case class I18nText(locale: String, key: String)
case class Message(author: Option[String], text: Any)

and these two interceptors defined, each one in its own trait:

trait I18nInterceptor {
  this: ReceivePipeline ⇒

  pipelineInner {
    case m @ Message(_, I18nText(loc, key)) ⇒
      Inner(m.copy(text = texts(s"${key}_$loc")))
  }
}

trait AuditInterceptor {
  this: ReceivePipeline ⇒

  pipelineOuter {
    case m @ Message(Some(author), text) ⇒
      println(s"$author is about to say: $text")
      Inner(m)
  }
}

The first one intercepts any messages having an internationalized text and replaces it with the resolved text before resuming with the chain. The second one intercepts any message with an author defined and prints it before resuming the chain with the message unchanged. But since I18n adds the interceptor with pipelineInner and Audit adds it with pipelineOuter, the audit will happen before the internationalization.

So if we mixin both interceptors in our Actor, we will see the following output for these example messages:

class PrinterActor extends Actor with ReceivePipeline
  with I18nInterceptor with AuditInterceptor {

  override def receive: Receive = {
    case Message(author, text) 
      println(s"${author.getOrElse("Unknown")} says '$text'")
  }
}

printerActor ! Message(Some("The Dude"), I18nText("EN", "that.rug"))
// The Dude is about to say: I18nText(EN,that.rug)
// The Dude says 'That rug really tied the room together.'

printerActor ! Message(Some("The Dude"), I18nText("EN", "your.opinion"))
// The Dude is about to say: I18nText(EN,your.opinion)
// The Dude says 'Yeah, well, you know, that's just, like, your opinion, man.'

Unhandled Messages

With all that behaviors chaining occurring, what happens to unhandled messages? Let me explain it with a simple rule.

Note

Every message not handled by an interceptor will be passed to the next one in the chain. If none of the interceptors handles a message, the current Actor's behavior will receive it, and if the behavior doesn't handle it either, it will be treated as usual with the unhandled method.

But sometimes it is desired for interceptors to break the chain. You can do it by explicitly indicating that the message has been completely handled by the interceptor by returning HandledCompletely.

case class PrivateMessage(userId: Option[Long], msg: Any)

trait PrivateInterceptor {
  this: ReceivePipeline 

  pipelineInner {
    case PrivateMessage(Some(userId), msg) 
      if (isGranted(userId))
        Inner(msg)
      else
        HandledCompletely
  }
}

Processing after delegation

But what if you want to perform some action after the actor has processed the message (for example to measure the message processing time)?

In order to support such use cases, the Inner return type has a method andAfter which accepts a code block that can perform some action after the message has been processed by subsequent inner interceptors.

The following is a simple interceptor that times message processing:

trait TimerInterceptor extends ActorLogging {
  this: ReceivePipeline 

  def logTimeTaken(time: Long) = log.debug(s"Time taken: $time ns")

  pipelineOuter {
    case e 
      val start = System.nanoTime
      Inner(e).andAfter {
        val end = System.nanoTime
        logTimeTaken(end - start)
      }
  }
}

Note

The andAfter code blocks are run on return from handling the message with the next inner handler and on the same thread. It is therefore safe for the andAfter logic to close over the interceptor's state.

Using Receive Pipelines with Persistence

When using ReceivePipeline together with PersistentActor make sure to mix-in the traits in the following order for them to properly co-operate:

class ExampleActor extends PersistentActor with ReceivePipeline {
  /* ... */
}

The order is important here because of how both traits use internal "around" methods to implement their features, and if mixed-in the other way around it would not work as expected. If you want to learn more about how exactly this works, you can read up on Scala's type linearization mechanism;

Contents