Durable Mailboxes
Loading

Durable Mailboxes

Overview

A durable mailbox is a mailbox which stores the messages on durable storage. What this means in practice is that if there are pending messages in the actor's mailbox when the node of the actor resides on crashes, then when you restart the node, the actor will be able to continue processing as if nothing had happened; with all pending messages still in its mailbox.

You configure durable mailboxes through the dispatcher or the actor deployment (see Mailboxes). The actor is oblivious to which type of mailbox it is using.

This gives you an excellent way of creating bulkheads in your application, where groups of actors sharing the same dispatcher also share the same backing storage. Read more about that in the Dispatchers documentation.

One basic file based durable mailbox is provided by Akka out-of-the-box. Other implementations can easily be added. Some are available as separate community Open Source projects, such as:

A durable mailbox is like any other mailbox not likely to be transactional. It's possible if the actor crashes after receiving a message, but before completing processing of it, that the message could be lost.

Warning

A durable mailbox typically doesn't work with blocking message send, i.e. the message send operations that are relying on futures; ask. If the node has crashed and then restarted, the thread that was blocked waiting for the reply is gone and there is no way we can deliver the message.

File-based durable mailbox

This mailbox is backed by a journaling transaction log on the local file system. It is the simplest to use since it does not require an extra infrastructure piece to administer, but it is usually sufficient and just what you need.

In the configuration of the dispatcher you specify the fully qualified class name of the mailbox:

  1. my-dispatcher {
  2. mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
  3. }

Here is an example of how to create an actor with a durable dispatcher:

  1. import akka.actor.Props;
  2. import akka.actor.ActorRef;
  3.  
  4. ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class).
  5. withDispatcher("my-dispatcher"), "myactor");

You can also configure and tune the file-based durable mailbox. This is done in the akka.actor.mailbox.file-based section in the Configuration.

  1. #############################################
  2. # Akka File Mailboxes Reference Config File #
  3. #############################################
  4.  
  5. # This is the reference config file that contains all the default settings.
  6. # Make your edits/overrides in your application.conf.
  7. #
  8. # For more information see <https://github.com/robey/kestrel/>
  9.  
  10. akka {
  11. actor {
  12. mailbox {
  13. file-based {
  14. # directory below which this queue resides
  15. directory-path = "./_mb"
  16.  
  17. # attempting to add an item after the queue reaches this size (in items)
  18. # will fail.
  19. max-items = 2147483647
  20.  
  21. # attempting to add an item after the queue reaches this size (in bytes)
  22. # will fail.
  23. max-size = 2147483647 bytes
  24.  
  25. # attempting to add an item larger than this size (in bytes) will fail.
  26. max-item-size = 2147483647 bytes
  27.  
  28. # maximum expiration time for this queue (seconds).
  29. max-age = 0s
  30.  
  31. # maximum journal size before the journal should be rotated.
  32. max-journal-size = 16 MiB
  33.  
  34. # maximum size of a queue before it drops into read-behind mode.
  35. max-memory-size = 128 MiB
  36.  
  37. # maximum overflow (multiplier) of a journal file before we re-create it.
  38. max-journal-overflow = 10
  39.  
  40. # absolute maximum size of a journal file until we rebuild it,
  41. # no matter what.
  42. max-journal-size-absolute = 9223372036854775807 bytes
  43.  
  44. # whether to drop older items (instead of newer) when the queue is full
  45. discard-old-when-full = on
  46.  
  47. # whether to keep a journal file at all
  48. keep-journal = on
  49.  
  50. # whether to sync the journal after each transaction
  51. sync-journal = off
  52.  
  53. # circuit breaker configuration
  54. circuit-breaker {
  55. # maximum number of failures before opening breaker
  56. max-failures = 3
  57.  
  58. # duration of time beyond which a call is assumed to be timed out and
  59. # considered a failure
  60. call-timeout = 3 seconds
  61.  
  62. # duration of time to wait until attempting to reset the breaker during
  63. # which all calls fail-fast
  64. reset-timeout = 30 seconds
  65. }
  66. }
  67. }
  68. }
  69. }

How to implement a durable mailbox

Here is an example of how to implement a custom durable mailbox. Essentially it consists of a configurator (MailboxType) and a queue implementation (DurableMessageQueue).

The envelope contains the message sent to the actor, and information about sender. It is the envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization to serialize and deserialize the envelope using the ordinary Serialization mechanism. This optional and you may store the envelope data in any way you like. Durable mailboxes are an excellent fit for usage of circuit breakers. These are described in the Circuit Breaker documentation.

  1. import scala.Option;
  2. import com.typesafe.config.Config;
  3. import akka.actor.ActorRef;
  4. import akka.actor.ActorSystem;
  5. import akka.actor.ExtendedActorSystem;
  6. import akka.dispatch.MailboxType;
  7. import akka.dispatch.MessageQueue;
  8.  
  9. public class MyDurableMailboxType implements MailboxType {
  10.  
  11. public MyDurableMailboxType(ActorSystem.Settings settings, Config config) {
  12. }
  13.  
  14. @Override
  15. public MessageQueue create(Option<ActorRef> owner,
  16. Option<ActorSystem> system) {
  17. if (owner.isEmpty())
  18. throw new IllegalArgumentException("requires an owner " +
  19. "(i.e. does not work with BalancingDispatcher)");
  20. return new MyDurableMessageQueue(owner.get(),
  21. (ExtendedActorSystem) system.get());
  22. }
  23. }
  1. import java.util.concurrent.ConcurrentLinkedQueue;
  2. import java.util.concurrent.Callable;
  3. import scala.concurrent.duration.Duration;
  4. import akka.actor.ActorRef;
  5. import akka.actor.ExtendedActorSystem;
  6. import akka.actor.mailbox.DurableMessageQueueWithSerialization;
  7. import akka.dispatch.Envelope;
  8. import akka.dispatch.MessageQueue;
  9. import akka.pattern.CircuitBreaker;
  10.  
  11. public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization {
  12.  
  13. public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) {
  14. super(owner, system);
  15. }
  16.  
  17. private final QueueStorage storage = new QueueStorage();
  18. // A real-world implementation would use configuration to set the last
  19. // three parameters below
  20. private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(),
  21. 5, Duration.create(30, "seconds"), Duration.create(1, "minute"));
  22.  
  23. @Override
  24. public void enqueue(ActorRef receiver, final Envelope envelope) {
  25. breaker.callWithSyncCircuitBreaker(new Callable<Object>() {
  26. @Override
  27. public Object call() {
  28. byte[] data = serialize(envelope);
  29. storage.push(data);
  30. return null;
  31. }
  32. });
  33. }
  34.  
  35. @Override
  36. public Envelope dequeue() {
  37. return breaker.callWithSyncCircuitBreaker(new Callable<Envelope>() {
  38. @Override
  39. public Envelope call() {
  40. byte[] data = storage.pull();
  41. if (data == null)
  42. return null;
  43. else
  44. return deserialize(data);
  45. }
  46. });
  47. }
  48.  
  49. @Override
  50. public boolean hasMessages() {
  51. return breaker.callWithSyncCircuitBreaker(new Callable<Boolean>() {
  52. @Override
  53. public Boolean call() {
  54. return !storage.isEmpty();
  55. }
  56. });
  57. }
  58.  
  59. @Override
  60. public int numberOfMessages() {
  61. return breaker.callWithSyncCircuitBreaker(new Callable<Integer>() {
  62. @Override
  63. public Integer call() {
  64. return storage.size();
  65. }
  66. });
  67. }
  68.  
  69. /**
  70. * Called when the mailbox is disposed.
  71. * An ordinary mailbox would send remaining messages to deadLetters,
  72. * but the purpose of a durable mailbox is to continue
  73. * with the same message queue when the actor is started again.
  74. */
  75. @Override
  76. public void cleanUp(ActorRef owner, MessageQueue deadLetters) {}
  77.  
  78. // dummy queue storage ...
  79. }

For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, and ZooKeeper, which can be found in Akka git repository tag v2.0.1.