Aggregator Pattern
Loading

Aggregator Pattern

The aggregator pattern supports writing actors that aggregate data from multiple other actors and updates its state based on those responses. It is even harder to optionally aggregate more data based on the runtime state of the actor or take certain actions (sending another message and get another response) based on two or more previous responses.

A common thought is to use the ask pattern to request information from other actors. However, ask creates another actor specifically for the ask. We cannot use a callback from the future to update the state as the thread executing the callback is not defined. This will likely close-over the current actor.

The aggregator pattern solves such scenarios. It makes sure we're acting from the same actor in the scope of the actor receive.

Introduction

The aggregator pattern allows match patterns to be dynamically added to and removed from an actor from inside the message handling logic. All match patterns are called from the receive loop and run in the thread handling the incoming message. These dynamically added patterns and logic can safely read and/or modify this actor's mutable state without risking integrity or concurrency issues.

Usage

To use the aggregator pattern, you need to extend the Aggregator trait. The trait takes care of receive and actors extending this trait should not override receive. The trait provides the expect, expectOnce, and unexpect calls. The expect and expectOnce calls return a handle that can be used for later de-registration by passing the handle to unexpect.

expect is often used for standing matches such as catching error messages or timeouts.

expect {
  case TimedOut  collectBalances(force = true)
}

expectOnce is used for matching the initial message as well as other requested messages

expectOnce {
  case GetCustomerAccountBalances(id, types) 
    new AccountAggregator(sender(), id, types)
  case _ 
    sender() ! CantUnderstand
    context.stop(self)
}
def fetchCheckingAccountsBalance() {
  context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
  expectOnce {
    case CheckingAccountBalances(balances) 
      results += (Checking -> balances)
      collectBalances()
  }
}

unexpect can be used for expecting multiple responses until a timeout or when the logic dictates such an expect no longer applies.

val handle = expect {
  case Response(name, value) 
    values += value
    if (values.size > 3) processList()
  case TimedOut  processList()
}

def processList() {
  unexpect(handle)

  if (values.size > 0) {
    context.actorSelection("/user/evaluator") ! values.toList
    expectOnce {
      case EvaluationResults(name, eval)  processFinal(eval)
    }
  } else processFinal(List.empty[Int])
}

As the name eludes, expect keeps the partial function matching any received messages until unexpect is called or the actor terminates, whichever comes first. On the other hand, expectOnce removes the partial function once a match has been established.

It is a common pattern to register the initial expectOnce from the construction of the actor to accept the initial message. Once that message is received, the actor starts doing all aggregations and sends the response back to the original requester. The aggregator should terminate after the response is sent (or timed out). A different original request should use a different actor instance.

As you can see, aggregator actors are generally stateful, short lived actors.

Sample Use Case - AccountBalanceRetriever

This example below shows a typical and intended use of the aggregator pattern.

import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal

import akka.actor._
/**
 * Sample and test code for the aggregator patter.
 * This is based on Jamie Allen's tutorial at
 * http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html
 */

sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType

case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
case class GetAccountBalances(id: Long)

case class AccountBalances(accountType: AccountType,
                           balance: Option[List[(Long, BigDecimal)]])

case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])

case object TimedOut
case object CantUnderstand

class SavingsAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) 
      sender() ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
  }
}
class CheckingAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) 
      sender() ! CheckingAccountBalances(Some(List((3, 15000))))
  }
}
class MoneyMarketAccountProxy extends Actor {
  def receive = {
    case GetAccountBalances(id: Long) 
      sender() ! MoneyMarketAccountBalances(None)
  }
}

class AccountBalanceRetriever extends Actor with Aggregator {

  import context._

  expectOnce {
    case GetCustomerAccountBalances(id, types) 
      new AccountAggregator(sender(), id, types)
    case _ 
      sender() ! CantUnderstand
      context.stop(self)
  }

  class AccountAggregator(originalSender: ActorRef,
                          id: Long, types: Set[AccountType]) {

    val results =
      mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]

    if (types.size > 0)
      types foreach {
        case Checking     fetchCheckingAccountsBalance()
        case Savings      fetchSavingsAccountsBalance()
        case MoneyMarket  fetchMoneyMarketAccountsBalance()
      }
    else collectBalances() // Empty type list yields empty response

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)
    expect {
      case TimedOut  collectBalances(force = true)
    }

    def fetchCheckingAccountsBalance() {
      context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case CheckingAccountBalances(balances) 
          results += (Checking -> balances)
          collectBalances()
      }
    }

    def fetchSavingsAccountsBalance() {
      context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case SavingsAccountBalances(balances) 
          results += (Savings -> balances)
          collectBalances()
      }
    }

    def fetchMoneyMarketAccountsBalance() {
      context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
      expectOnce {
        case MoneyMarketAccountBalances(balances) 
          results += (MoneyMarket -> balances)
          collectBalances()
      }
    }

    def collectBalances(force: Boolean = false) {
      if (results.size == types.size || force) {
        originalSender ! results.toList // Make sure it becomes immutable
        context.stop(self)
      }
    }
  }
}

Sample Use Case - Multiple Response Aggregation and Chaining

A shorter example showing aggregating responses and chaining further requests.

case class InitialRequest(name: String)
case class Request(name: String)
case class Response(name: String, value: String)
case class EvaluationResults(name: String, eval: List[Int])
case class FinalResponse(qualifiedValues: List[String])

/**
 * An actor sample demonstrating use of unexpect and chaining.
 * This is just an example and not a complete test case.
 */
class ChainingSample extends Actor with Aggregator {

  expectOnce {
    case InitialRequest(name)  new MultipleResponseHandler(sender(), name)
  }

  class MultipleResponseHandler(originalSender: ActorRef, propName: String) {

    import context.dispatcher
    import collection.mutable.ArrayBuffer

    val values = ArrayBuffer.empty[String]

    context.actorSelection("/user/request_proxies") ! Request(propName)
    context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut)

    val handle = expect {
      case Response(name, value) 
        values += value
        if (values.size > 3) processList()
      case TimedOut  processList()
    }

    def processList() {
      unexpect(handle)

      if (values.size > 0) {
        context.actorSelection("/user/evaluator") ! values.toList
        expectOnce {
          case EvaluationResults(name, eval)  processFinal(eval)
        }
      } else processFinal(List.empty[Int])
    }

    def processFinal(eval: List[Int]) {
      // Select only the entries coming back from eval
      originalSender ! FinalResponse(eval map values)
      context.stop(self)
    }
  }
}

Pitfalls

  • The current implementation does not match the sender of the message. This is designed to work with ActorSelection as well as ActorRef. Without the sender(), there is a chance a received message can be matched by more than one partial function. The partial function that was registered via expect or expectOnce first (chronologically) and is not yet de-registered by unexpect takes precedence in this case. Developers should make sure the messages can be uniquely matched or the wrong logic can be executed for a certain message.

  • The sender referenced in any expect or expectOnce logic refers to the sender() of that particular message and not the sender() of the original message. The original sender() still needs to be saved so a final response can be sent back.

  • context.become is not supported when extending the Aggregator trait.

  • We strongly recommend against overriding receive. If your use case really dictates, you may do so with extreme caution. Always provide a pattern match handling aggregator messages among your receive pattern matches, as follows:

    case msg if handleMessage(msg)  // noop
    // side effects of handleMessage does the actual match
    

Sorry, there is not yet a Java implementation of the aggregator pattern available.

Contents