Durable Mailboxes (Java)
Loading

Durable Mailboxes (Java)

Overview

A durable mailbox is a mailbox which stores the messages on durable storage. What this means in practice is that if there are pending messages in the actor's mailbox when the node of the actor resides on crashes, then when you restart the node, the actor will be able to continue processing as if nothing had happened; with all pending messages still in its mailbox.

You configure durable mailboxes through the dispatcher. The actor is oblivious to which type of mailbox it is using.

This gives you an excellent way of creating bulkheads in your application, where groups of actors sharing the same dispatcher also share the same backing storage. Read more about that in the Dispatchers (Scala) documentation.

One basic file based durable mailbox is provided by Akka out-of-the-box. Other implementations can easily be added. Some are available as separate community Open Source projects, such as:

A durable mailbox is like any other mailbox not likely to be transactional. It's possible if the actor crashes after receiving a message, but before completing processing of it, that the message could be lost.

Warning

A durable mailbox typically doesn't work with blocking message send, i.e. the message send operations that are relying on futures; ask. If the node has crashed and then restarted, the thread that was blocked waiting for the reply is gone and there is no way we can deliver the message.

File-based durable mailbox

This mailbox is backed by a journaling transaction log on the local file system. It is the simplest to use since it does not require an extra infrastructure piece to administer, but it is usually sufficient and just what you need.

In the configuration of the dispatcher you specify the fully qualified class name of the mailbox:

my-dispatcher {
  mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
}

Here is an example of how to create an actor with a durable dispatcher:

import akka.actor.Props;
import akka.actor.ActorRef;

    ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).
        withDispatcher("my-dispatcher"), "myactor");

You can also configure and tune the file-based durable mailbox. This is done in the akka.actor.mailbox.file-based section in the Configuration.

#############################################
# Akka File Mailboxes Reference Config File #
#############################################

# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
#
# For more information see <https://github.com/robey/kestrel/>

akka {
  actor {
    mailbox {
      file-based {
        # directory below which this queue resides
        directory-path = "./_mb"

        # attempting to add an item after the queue reaches this size (in items)
        # will fail.
        max-items = 2147483647

        # attempting to add an item after the queue reaches this size (in bytes)
        # will fail.
        max-size = 2147483647 bytes

        # attempting to add an item larger than this size (in bytes) will fail.
        max-item-size = 2147483647 bytes

        # maximum expiration time for this queue (seconds).
        max-age = 0s

        # maximum journal size before the journal should be rotated.
        max-journal-size = 16 MiB

        # maximum size of a queue before it drops into read-behind mode.
        max-memory-size = 128 MiB

        # maximum overflow (multiplier) of a journal file before we re-create it.
        max-journal-overflow = 10

        # absolute maximum size of a journal file until we rebuild it,
        # no matter what.
        max-journal-size-absolute = 9223372036854775807 bytes

        # whether to drop older items (instead of newer) when the queue is full
        discard-old-when-full = on

        # whether to keep a journal file at all
        keep-journal = on

        # whether to sync the journal after each transaction
        sync-journal = off

        # circuit breaker configuration
        circuit-breaker {
          # maximum number of failures before opening breaker
          max-failures = 3

          # duration of time beyond which a call is assumed to be timed out and
          # considered a failure
          call-timeout = 3 seconds

          # duration of time to wait until attempting to reset the breaker during
          # which all calls fail-fast
          reset-timeout = 30 seconds
        }
      }
    }
  }
}

How to implement a durable mailbox

Here is an example of how to implement a custom durable mailbox. Essentially it consists of a configurator (MailboxType) and a queue implementation (DurableMessageQueue).

The envelope contains the message sent to the actor, and information about sender. It is the envelope that needs to be stored. As a help utility you can extend DurableMessageQueueWithSerialization to serialize and deserialize the envelope using the ordinary Serialization (Scala) mechanism. This optional and you may store the envelope data in any way you like. Durable mailboxes are an excellent fit for usage of circuit breakers. These are described in the Circuit Breaker documentation.

import scala.Option;
import com.typesafe.config.Config;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;

public class MyDurableMailboxType implements MailboxType {

  public MyDurableMailboxType(ActorSystem.Settings settings, Config config) {
  }

  @Override 
  public MessageQueue create(Option<ActorRef> owner,
      Option<ActorSystem> system) {
    if (owner.isEmpty())
      throw new IllegalArgumentException("requires an owner " +
          "(i.e. does not work with BalancingDispatcher)");
    return new MyDurableMessageQueue(owner.get(), 
      (ExtendedActorSystem) system.get());
  }
}
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Callable;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ExtendedActorSystem;
import akka.actor.mailbox.DurableMessageQueueWithSerialization;
import akka.dispatch.Envelope;
import akka.dispatch.MessageQueue;
import akka.pattern.CircuitBreaker;

public class MyDurableMessageQueue extends DurableMessageQueueWithSerialization {

  public MyDurableMessageQueue(ActorRef owner, ExtendedActorSystem system) {
    super(owner, system);
  }

  private final QueueStorage storage = new QueueStorage();
  // A real-world implementation would use configuration to set the last 
  // three parameters below
  private final CircuitBreaker breaker = CircuitBreaker.create(system().scheduler(), 5, 
    Duration.create(30, "seconds"), Duration.create(1, "minute"));

  @Override
  public void enqueue(ActorRef receiver, final Envelope envelope) {
    breaker.callWithSyncCircuitBreaker(new Callable<Object>() {
      @Override
      public Object call() {
        byte[] data = serialize(envelope);
        storage.push(data);
        return null;
      }
    });
  }

  @Override
  public Envelope dequeue() {
    return breaker.callWithSyncCircuitBreaker(new Callable<Envelope>() {
      @Override
      public Envelope call() {
        byte[] data = storage.pull();
        if (data == null)
          return null;
        else
          return deserialize(data);
      }
    });
  }

  @Override
  public boolean hasMessages() {
    return breaker.callWithSyncCircuitBreaker(new Callable<Boolean>() {
      @Override
      public Boolean call() {
        return !storage.isEmpty();
      }
    });
  }

  @Override
  public int numberOfMessages() {
    return breaker.callWithSyncCircuitBreaker(new Callable<Integer>() {
      @Override
      public Integer call() {
        return storage.size();
      }
    });
  }

  /**
   * Called when the mailbox is disposed.
   * An ordinary mailbox would send remaining messages to deadLetters,
   * but the purpose of a durable mailbox is to continue
   * with the same message queue when the actor is started again.
   */
  @Override
  public void cleanUp(ActorRef owner, MessageQueue deadLetters) {}

  // dummy queue storage ...
}

For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, and ZooKeeper, which can be found in Akka git repository tag v2.0.1.

Contents