Durable Mailboxes
Loading

Durable Mailboxes

Overview

A durable mailbox is a mailbox which stores the messages on durable storage. What this means in practice is that if there are pending messages in the actor's mailbox when the node of the actor resides on crashes, then when you restart the node, the actor will be able to continue processing as if nothing had happened; with all pending messages still in its mailbox.

You configure durable mailboxes through the dispatcher or the actor deployment (see Mailboxes). The actor is oblivious to which type of mailbox it is using.

This gives you an excellent way of creating bulkheads in your application, where groups of actors sharing the same dispatcher also share the same backing storage. Read more about that in the Dispatchers documentation.

One basic file based durable mailbox is provided by Akka out-of-the-box. Other implementations can easily be added. Some are available as separate community Open Source projects, such as:

A durable mailbox is like any other mailbox not likely to be transactional. It's possible if the actor crashes after receiving a message, but before completing processing of it, that the message could be lost.

Warning

A durable mailbox typically doesn't work with blocking message send, i.e. the message send operations that are relying on futures; ? or ask. If the node has crashed and then restarted, the thread that was blocked waiting for the reply is gone and there is no way we can deliver the message.

File-based durable mailbox

This mailbox is backed by a journaling transaction log on the local file system. It is the simplest to use since it does not require an extra infrastructure piece to administer, but it is usually sufficient and just what you need.

In the configuration of the dispatcher you specify the fully qualified class name of the mailbox:

my-dispatcher {
  mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
}

Here is an example of how to create an actor with a durable dispatcher:

import akka.actor.Props

    val myActor = system.actorOf(Props[MyActor].
      withDispatcher("my-dispatcher"), name = "myactor")

You can also configure and tune the file-based durable mailbox. This is done in the akka.actor.mailbox.file-based section in the Configuration.

#############################################
# Akka File Mailboxes Reference Config File #
#############################################

# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
#
# For more information see <https://github.com/robey/kestrel/>

akka {
  actor {
    mailbox {
      file-based {
        # directory below which this queue resides
        directory-path = "./_mb"

        # attempting to add an item after the queue reaches this size (in items)
        # will fail.
        max-items = 2147483647

        # attempting to add an item after the queue reaches this size (in bytes)
        # will fail.
        max-size = 2147483647 bytes

        # attempting to add an item larger than this size (in bytes) will fail.
        max-item-size = 2147483647 bytes

        # maximum expiration time for this queue (seconds).
        max-age = 0s

        # maximum journal size before the journal should be rotated.
        max-journal-size = 16 MiB

        # maximum size of a queue before it drops into read-behind mode.
        max-memory-size = 128 MiB

        # maximum overflow (multiplier) of a journal file before we re-create it.
        max-journal-overflow = 10

        # absolute maximum size of a journal file until we rebuild it,
        # no matter what.
        max-journal-size-absolute = 9223372036854775807 bytes

        # whether to drop older items (instead of newer) when the queue is full
        discard-old-when-full = on

        # whether to keep a journal file at all
        keep-journal = on

        # whether to sync the journal after each transaction
        sync-journal = off

        # circuit breaker configuration
        circuit-breaker {
          # maximum number of failures before opening breaker
          max-failures = 3

          # duration of time beyond which a call is assumed to be timed out and
          # considered a failure
          call-timeout = 3 seconds

          # duration of time to wait until attempting to reset the breaker during
          # which all calls fail-fast
          reset-timeout = 30 seconds
        }
      }
    }
  }
}

How to implement a durable mailbox

Here is an example of how to implement a custom durable mailbox. Essentially it consists of a configurator (MailboxType) and a queue implementation (DurableMessageQueue).

The envelope contains the message sent to the actor, and information about sender. It is the envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization to serialize and deserialize the envelope using the ordinary Serialization mechanism. This optional and you may store the envelope data in any way you like. Durable mailboxes are an excellent fit for usage of circuit breakers. These are described in the Circuit Breaker documentation.

import com.typesafe.config.Config
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.actor.mailbox.DurableMessageQueue
import akka.actor.mailbox.DurableMessageSerialization
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._

class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
  extends MailboxType {

  override def create(owner: Option[ActorRef],
                      system: Option[ActorSystem]): MessageQueue =
    (owner zip system) headOption match {
      case Some((o, s: ExtendedActorSystem))  new MyMessageQueue(o, s)
      case _ 
        throw new IllegalArgumentException("requires an owner " +
          "(i.e. does not work with BalancingDispatcher)")
    }
}

class MyMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem)
  extends DurableMessageQueue(_owner, _system) with DurableMessageSerialization {

  val storage = new QueueStorage
  // A real-world implementation would use configuration to set the last 
  // three parameters below
  val breaker = CircuitBreaker(system.scheduler, 5, 30.seconds, 1.minute)

  def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
    breaker.withSyncCircuitBreaker {
      val data: Array[Byte] = serialize(envelope)
      storage.push(data)
    }

  def dequeue(): Envelope = breaker.withSyncCircuitBreaker {
    val data: Option[Array[Byte]] = storage.pull()
    data.map(deserialize).orNull
  }

  def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty }

  def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size }

  /**
   * Called when the mailbox is disposed.
   * An ordinary mailbox would send remaining messages to deadLetters,
   * but the purpose of a durable mailbox is to continue
   * with the same message queue when the actor is started again.
   */
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = ()

}

To facilitate testing of a durable mailbox you may use DurableMailboxSpec as base class. To use DurableMailboxDocSpec add this dependency:

"com.typesafe.akka" %% "akka-mailboxes-common" %
  "2.2.5" classifier "test"

It implements a few basic tests and helps you setup the a fixture. More tests can be added in concrete subclass like this:

import akka.actor.mailbox.DurableMailboxSpec

object MyMailboxSpec {
  val config = """
    MyStorage-dispatcher {
      mailbox-type = docs.actor.mailbox.MyMailboxType
    }
    """
}

class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) {
  override def atStartup() {
  }

  override def afterTermination() {
  }

  "MyMailbox" must {
    "deliver a message" in {
      val actor = createMailboxTestActor()
      implicit val sender = testActor
      actor ! "hello"
      expectMsg("hello")
    }

    // add more tests
  }
}

For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, and ZooKeeper, which can be found in Akka git repository tag v2.0.1.

Contents