Aggregator Pattern - Version 2.4.20

Aggregator Pattern

The aggregator pattern supports writing actors that aggregate data from multiple other actors and updates its state based on those responses. It is even harder to optionally aggregate more data based on the runtime state of the actor or take certain actions (sending another message and get another response) based on two or more previous responses.

A common thought is to use the ask pattern to request information from other actors. However, ask creates another actor specifically for the ask. We cannot use a callback from the future to update the state as the thread executing the callback is not defined. This will likely close-over the current actor.

The aggregator pattern solves such scenarios. It makes sure we're acting from the same actor in the scope of the actor receive.

Introduction

The aggregator pattern allows match patterns to be dynamically added to and removed from an actor from inside the message handling logic. All match patterns are called from the receive loop and run in the thread handling the incoming message. These dynamically added patterns and logic can safely read and/or modify this actor's mutable state without risking integrity or concurrency issues.

Usage

To use the aggregator pattern, you need to extend the Aggregator trait. The trait takes care of receive and actors extending this trait should not override receive. The trait provides the expect, expectOnce, and unexpect calls. The expect and expectOnce calls return a handle that can be used for later de-registration by passing the handle to unexpect.

expect is often used for standing matches such as catching error messages or timeouts.

expect {
  case TimedOut  collectBalances(force = true)
}

expectOnce is used for matching the initial message as well as other requested messages

expectOnce {
  case GetCustomerAccountBalances(id, types) 
    new AccountAggregator(sender(), id, types)
  case _ 
    sender() ! CantUnderstand
    context.stop(self)
}
def fetchCheckingAccountsBalance() {
  context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
  expectOnce {
    case CheckingAccountBalances(balances) 
      results += (Checking  balances)
      collectBalances()
  }
}

unexpect can be used for expecting multiple responses until a timeout or when the logic dictates such an expect no longer applies.

val handle = expect {
  case Response(name, value) 
    values += value
    if (values.size > 3) processList()
  case TimedOut  processList()
}

def processList() {
  unexpect(handle)

  if (values.size > 0) {
    context.actorSelection("/user/evaluator") ! values.toList
    expectOnce {
      case EvaluationResults(name, eval)  processFinal(eval)
    }
  } else processFinal(List.empty[Int])
}

As the name eludes, expect keeps the partial function matching any received messages until unexpect is called or the actor terminates, whichever comes first. On the other hand, expectOnce removes the partial function once a match has been established.

It is a common pattern to register the initial expectOnce from the construction of the actor to accept the initial message. Once that message is received, the actor starts doing all aggregations and sends the response back to the original requester. The aggregator should terminate after the response is sent (or timed out). A different original request should use a different actor instance.

As you can see, aggregator actors are generally stateful, short lived actors.

Sample Use Case - AccountBalanceRetriever

This example below shows a typical and intended use of the aggregator pattern.

final case class InitialRequest(name: String)
final case class Request(name: String)
final case class Response(name: String, value: String)
final case class EvaluationResults(name: String, eval: List[Int])
final case class FinalResponse(qualifiedValues: List[String])

/**
 * An actor sample demonstrating use of unexpect and chaining.
 * This is just an example and not a complete test case.
 */
class ChainingSample extends Actor with Aggregator {

  expectOnce {
    case InitialRequest(name)  new MultipleResponseHandler(sender(), name)
  }

  class MultipleResponseHandler(originalSender: ActorRef, propName: String) {

    import context.dispatcher
    import collection.mutable.ArrayBuffer

    val values = ArrayBuffer.empty[String]

    context.actorSelection("/user/request_proxies") ! Request(propName)
    context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut)

    val handle = expect {
      case Response(name, value) 
        values += value
        if (values.size > 3) processList()
      case TimedOut  processList()
    }

    def processList() {
      unexpect(handle)

      if (values.size > 0) {
        context.actorSelection("/user/evaluator") ! values.toList
        expectOnce {
          case EvaluationResults(name, eval)  processFinal(eval)
        }
      } else processFinal(List.empty[Int])
    }

    def processFinal(eval: List[Int]) {
      // Select only the entries coming back from eval
      originalSender ! FinalResponse(eval map values)
      context.stop(self)
    }
  }
}

class AggregatorSpec extends TestKit(ActorSystem("AggregatorSpec")) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {

  override def afterAll(): Unit = {
    shutdown()
  }

  test("Test request 1 account type") {
    system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings))
    receiveOne(10.seconds) match {
      case result: List[_] 
        result should have size 1
      case result 
        assert(false, s"Expect List, got ${result.getClass}")
    }
  }

  test("Test request 3 account types") {
    system.actorOf(Props[AccountBalanceRetriever]) !
      GetCustomerAccountBalances(1, Set(Checking, Savings, MoneyMarket))
    receiveOne(10.seconds) match {
      case result: List[_] 
        result should have size 3
      case result 
        assert(false, s"Expect List, got ${result.getClass}")
    }
  }
}

final case class TestEntry(id: Int)

class WorkListSpec extends FunSuiteLike {

  val workList = WorkList.empty[TestEntry]
  var entry2: TestEntry = null
  var entry4: TestEntry = null

  test("Processing empty WorkList") {
    // ProcessAndRemove something in the middle
    val processed = workList process {
      case TestEntry(9)  true
      case _             false
    }
    assert(!processed)
  }

  test("Insert temp entries") {
    assert(workList.head === workList.tail)

    val entry0 = TestEntry(0)
    workList.add(entry0, permanent = false)

    assert(workList.head.next != null)
    assert(workList.tail === workList.head.next)
    assert(workList.tail.ref.get === entry0)

    val entry1 = TestEntry(1)
    workList.add(entry1, permanent = false)

    assert(workList.head.next != workList.tail)
    assert(workList.head.next.ref.get === entry0)
    assert(workList.tail.ref.get === entry1)

    entry2 = TestEntry(2)
    workList.add(entry2, permanent = false)

    assert(workList.tail.ref.get === entry2)

    val entry3 = TestEntry(3)
    workList.add(entry3, permanent = false)

    assert(workList.tail.ref.get === entry3)
  }

  test("Process temp entries") {

    // ProcessAndRemove something in the middle
    assert(workList process {
      case TestEntry(2)  true
      case _             false
    })

    // ProcessAndRemove the head
    assert(workList process {
      case TestEntry(0)  true
      case _             false
    })

    // ProcessAndRemove the tail
    assert(workList process {
      case TestEntry(3)  true
      case _             false
    })
  }

  test("Re-insert permanent entry") {
    entry4 = TestEntry(4)
    workList.add(entry4, permanent = true)

    assert(workList.tail.ref.get === entry4)
  }

  test("Process permanent entry") {
    assert(workList process {
      case TestEntry(4)  true
      case _             false
    })
  }

  test("Remove permanent entry") {
    val removed = workList remove entry4
    assert(removed)
  }

  test("Remove temp entry already processed") {
    val removed = workList remove entry2
    assert(!removed)
  }

  test("Process non-matching entries") {

    val processed =
      workList process {
        case TestEntry(2)  true
        case _             false
      }

    assert(!processed)

    val processed2 =
      workList process {
        case TestEntry(5)  true
        case _             false
      }

    assert(!processed2)

  }

  test("Append two lists") {
    workList.removeAll()
    0 to 4 foreach { id  workList.add(TestEntry(id), permanent = false) }

    val l2 = new WorkList[TestEntry]
    5 to 9 foreach { id  l2.add(TestEntry(id), permanent = true) }

    workList addAll l2

    @tailrec
    def checkEntries(id: Int, entry: WorkList.Entry[TestEntry]): Int = {
      if (entry == null) id
      else {
        assert(entry.ref.get.id === id)
        checkEntries(id + 1, entry.next)
      }
    }

    assert(checkEntries(0, workList.head.next) === 10)
  }

  test("Clear list") {
    workList.removeAll()
    assert(workList.head.next === null)
    assert(workList.tail === workList.head)
  }

  val workList2 = WorkList.empty[PartialFunction[Any, Unit]]

  val fn1: PartialFunction[Any, Unit] = {
    case s: String 
      val result1 = workList2 remove fn1
      assert(result1 === true, "First remove must return true")
      val result2 = workList2 remove fn1
      assert(result2 === false, "Second remove must return false")
  }

  val fn2: PartialFunction[Any, Unit] = {
    case s: String 
      workList2.add(fn1, permanent = true)
  }

  test("Reentrant insert") {
    workList2.add(fn2, permanent = false)
    assert(workList2.head.next != null)
    assert(workList2.tail == workList2.head.next)

    // Processing inserted fn1, reentrant adding fn2
    workList2 process { fn 
      var processed = true
      fn.applyOrElse("Foo", (_: Any)  processed = false)
      processed
    }
  }

  test("Reentrant delete") {
    // Processing inserted fn2, should delete itself
    workList2 process { fn 
      var processed = true
      fn.applyOrElse("Foo", (_: Any)  processed = false)
      processed
    }
  }
}

Sample Use Case - Multiple Response Aggregation and Chaining

A shorter example showing aggregating responses and chaining further requests.

final case class InitialRequest(name: String)
final case class Request(name: String)
final case class Response(name: String, value: String)
final case class EvaluationResults(name: String, eval: List[Int])
final case class FinalResponse(qualifiedValues: List[String])

/**
 * An actor sample demonstrating use of unexpect and chaining.
 * This is just an example and not a complete test case.
 */
class ChainingSample extends Actor with Aggregator {

  expectOnce {
    case InitialRequest(name)  new MultipleResponseHandler(sender(), name)
  }

  class MultipleResponseHandler(originalSender: ActorRef, propName: String) {

    import context.dispatcher
    import collection.mutable.ArrayBuffer

    val values = ArrayBuffer.empty[String]

    context.actorSelection("/user/request_proxies") ! Request(propName)
    context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut)

    val handle = expect {
      case Response(name, value) 
        values += value
        if (values.size > 3) processList()
      case TimedOut  processList()
    }

    def processList() {
      unexpect(handle)

      if (values.size > 0) {
        context.actorSelection("/user/evaluator") ! values.toList
        expectOnce {
          case EvaluationResults(name, eval)  processFinal(eval)
        }
      } else processFinal(List.empty[Int])
    }

    def processFinal(eval: List[Int]) {
      // Select only the entries coming back from eval
      originalSender ! FinalResponse(eval map values)
      context.stop(self)
    }
  }
}

Pitfalls

  • The current implementation does not match the sender of the message. This is designed to work with ActorSelection as well as ActorRef. Without the sender(), there is a chance a received message can be matched by more than one partial function. The partial function that was registered via expect or expectOnce first (chronologically) and is not yet de-registered by unexpect takes precedence in this case. Developers should make sure the messages can be uniquely matched or the wrong logic can be executed for a certain message.

  • The sender referenced in any expect or expectOnce logic refers to the sender() of that particular message and not the sender() of the original message. The original sender() still needs to be saved so a final response can be sent back.

  • context.become is not supported when extending the Aggregator trait.

  • We strongly recommend against overriding receive. If your use case really dictates, you may do so with extreme caution. Always provide a pattern match handling aggregator messages among your receive pattern matches, as follows:

    case msg if handleMessage(msg)  // noop
    // side effects of handleMessage does the actual match
    

Sorry, there is not yet a Java implementation of the aggregator pattern available.

Contents