Mailboxes
Loading

Mailboxes

An Akka Mailbox holds the messages that are destined for an Actor. Normally each Actor has its own mailbox, but with for example a BalancingPool all routees will share a single mailbox instance.

§Mailbox Selection

§Requiring a Message Queue Type for an Actor

It is possible to require a certain type of message queue for a certain type of actor by having that actor implement the parameterized interface RequiresMessageQueue. Here is an example:

  1. import akka.dispatch.BoundedMessageQueueSemantics;
  2. import akka.dispatch.RequiresMessageQueue;
  3.  
  4. public class MyBoundedUntypedActor extends MyUntypedActor
  5. implements RequiresMessageQueue<BoundedMessageQueueSemantics> {
  6. }

The type parameter to the RequiresMessageQueue interface needs to be mapped to a mailbox in configuration like this:

  1. bounded-mailbox {
  2. mailbox-type = "akka.dispatch.BoundedMailbox"
  3. mailbox-capacity = 1000
  4. mailbox-push-timeout-time = 10s
  5. }
  6.  
  7. akka.actor.mailbox.requirements {
  8. "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
  9. }

Now every time you create an actor of type MyBoundedUntypedActor it will try to get a bounded mailbox. If the actor has a different mailbox configured in deployment, either directly or via a dispatcher with a specified mailbox type, then that will override this mapping.

Note

The type of the queue in the mailbox created for an actor will be checked against the required type in the interface and if the queue doesn't implement the required type then actor creation will fail.

§Requiring a Message Queue Type for a Dispatcher

A dispatcher may also have a requirement for the mailbox type used by the actors running on it. An example is the BalancingDispatcher which requires a message queue that is thread-safe for multiple concurrent consumers. Such a requirement is formulated within the dispatcher configuration section like this:

  1. my-dispatcher {
  2. mailbox-requirement = org.example.MyInterface
  3. }

The given requirement names a class or interface which will then be ensured to be a supertype of the message queue’s implementation. In case of a conflict—e.g. if the actor requires a mailbox type which does not satisfy this requirement—then actor creation will fail.

§How the Mailbox Type is Selected

When an actor is created, the ActorRefProvider first determines the dispatcher which will execute it. Then the mailbox is determined as follows:

  1. If the actor’s deployment configuration section contains a mailbox key then that names a configuration section describing the mailbox type to be used.
  2. If the actor’s Props contains a mailbox selection—i.e. withMailbox was called on it—then that names a configuration section describing the mailbox type to be used.
  3. If the dispatcher’s configuration section contains a mailbox-type key the same section will be used to configure the mailbox type.
  4. If the actor requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used; if that fails then the dispatcher’s requirement—if any—will be tried instead.
  5. If the dispatcher requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used.
  6. The default mailbox akka.actor.default-mailbox will be used.

§Default Mailbox

When the mailbox is not specified as described above the default mailbox is used. By default it is an unbounded mailbox, which is backed by a java.util.concurrent.ConcurrentLinkedQueue.

SingleConsumerOnlyUnboundedMailbox is an even more efficient mailbox, and it can be used as the default mailbox, but it cannot be used with a BalancingDispatcher.

Configuration of SingleConsumerOnlyUnboundedMailbox as default mailbox:

  1. akka.actor.default-mailbox {
  2. mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  3. }

§Which Configuration is passed to the Mailbox Type

Each mailbox type is implemented by a class which extends MailboxType and takes two constructor arguments: a ActorSystem.Settings object and a Config section. The latter is computed by obtaining the named configuration section from the actor system’s configuration, overriding its id key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section.

§Builtin Mailbox Implementations

Akka comes shipped with a number of mailbox implementations:

  • UnboundedMailbox - The default mailbox

    • Backed by a java.util.concurrent.ConcurrentLinkedQueue
    • Blocking: No
    • Bounded: No
    • Configuration name: "unbounded" or "akka.dispatch.UnboundedMailbox"
  • SingleConsumerOnlyUnboundedMailbox

    This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly!

    • Backed by a Multiple-Producer Single-Consumer queue, cannot be used with BalancingDispatcher
    • Blocking: No
    • Bounded: No
    • Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  • BoundedMailbox

    • Backed by a java.util.concurrent.LinkedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
    • Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
  • UnboundedPriorityMailbox

    • Backed by a java.util.concurrent.PriorityBlockingQueue
    • Blocking: Yes
    • Bounded: No
    • Configuration name: "akka.dispatch.UnboundedPriorityMailbox"
  • BoundedPriorityMailbox

    • Backed by a java.util.PriorityBlockingQueue wrapped in an akka.util.BoundedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
    • Configuration name: "akka.dispatch.BoundedPriorityMailbox"

§Mailbox configuration examples

How to create a PriorityMailbox:

  1. public class MyPrioMailbox extends UnboundedPriorityMailbox {
  2. // needed for reflective instantiation
  3. public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
  4. // Create a new PriorityGenerator, lower prio means more important
  5. super(new PriorityGenerator() {
  6. @Override
  7. public int gen(Object message) {
  8. if (message.equals("highpriority"))
  9. return 0; // 'highpriority messages should be treated first if possible
  10. else if (message.equals("lowpriority"))
  11. return 2; // 'lowpriority messages should be treated last if possible
  12. else if (message.equals(PoisonPill.getInstance()))
  13. return 3; // PoisonPill when no other left
  14. else
  15. return 1; // By default they go between high and low prio
  16. }
  17. });
  18. }
  19. }

And then add it to the configuration:

  1. prio-dispatcher {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other dispatcher configuration goes here
  4. }

And then an example on how you would use it:

  1. class Demo extends UntypedActor {
  2. LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  3. {
  4. for (Object msg : new Object[] { "lowpriority", "lowpriority",
  5. "highpriority", "pigdog", "pigdog2", "pigdog3", "highpriority",
  6. PoisonPill.getInstance() }) {
  7. getSelf().tell(msg, getSelf());
  8. }
  9. }
  10.  
  11. public void onReceive(Object message) {
  12. log.info(message.toString());
  13. }
  14. }
  15.  
  16. // We create a new Actor that just prints out what it processes
  17. ActorRef myActor = system.actorOf(Props.create(Demo.class, this)
  18. .withDispatcher("prio-dispatcher"));
  19.  
  20. /*
  21. Logs:
  22. 'highpriority
  23. 'highpriority
  24. 'pigdog
  25. 'pigdog2
  26. 'pigdog3
  27. 'lowpriority
  28. 'lowpriority
  29. */

It is also possible to configure a mailbox type directly like this:

  1. prio-mailbox {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other mailbox configuration goes here
  4. }
  5.  
  6. akka.actor.deployment {
  7. /priomailboxactor {
  8. mailbox = prio-mailbox
  9. }
  10. }

And then use it either from deployment like this:

  1. ActorRef myActor =
  2. system.actorOf(Props.create(MyUntypedActor.class),
  3. "priomailboxactor");

Or code like this:

  1. ActorRef myActor =
  2. system.actorOf(Props.create(MyUntypedActor.class)
  3. .withMailbox("prio-mailbox"));

§Creating your own Mailbox type

An example is worth a thousand quacks:

  1. import akka.actor.ActorRef;
  2. import akka.actor.ActorSystem;
  3. import akka.dispatch.Envelope;
  4. import akka.dispatch.MailboxType;
  5. import akka.dispatch.MessageQueue;
  6. import akka.dispatch.ProducesMessageQueue;
  7. import com.typesafe.config.Config;
  8. import java.util.concurrent.ConcurrentLinkedQueue;
  9. import java.util.Queue;
  10. import scala.Option;
  11.  
  12. public class MyUnboundedJMailbox implements MailboxType,
  13. ProducesMessageQueue<MyUnboundedJMailbox.MyMessageQueue> {
  14.  
  15. // This is the MessageQueue implementation
  16. public static class MyMessageQueue implements MessageQueue,
  17. MyUnboundedJMessageQueueSemantics {
  18. private final Queue<Envelope> queue =
  19. new ConcurrentLinkedQueue<Envelope>();
  20.  
  21. // these must be implemented; queue used as example
  22. public void enqueue(ActorRef receiver, Envelope handle) {
  23. queue.offer(handle);
  24. }
  25. public Envelope dequeue() { return queue.poll(); }
  26. public int numberOfMessages() { return queue.size(); }
  27. public boolean hasMessages() { return !queue.isEmpty(); }
  28. public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
  29. for (Envelope handle: queue) {
  30. deadLetters.enqueue(owner, handle);
  31. }
  32. }
  33. }
  34.  
  35. // This constructor signature must exist, it will be called by Akka
  36. public MyUnboundedJMailbox(ActorSystem.Settings settings, Config config) {
  37. // put your initialization code here
  38. }
  39.  
  40. // The create method is called to create the MessageQueue
  41. public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
  42. return new MyMessageQueue();
  43. }
  44. }
  1. // Marker interface used for mailbox requirements mapping
  2. public interface MyUnboundedJMessageQueueSemantics {
  3. }

And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration, or the mailbox configuration.

Note

Make sure to include a constructor which takes akka.actor.ActorSystem.Settings and com.typesafe.config.Config arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from the configuration which describes the dispatcher or mailbox setting using this mailbox type; the mailbox type will be instantiated once for each dispatcher or mailbox setting using it.

You can also use the mailbox as a requirement on the dispatcher like this:

  1. custom-dispatcher {
  2. mailbox-requirement =
  3. "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
  4. }
  5.  
  6. akka.actor.mailbox.requirements {
  7. "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
  8. custom-dispatcher-mailbox
  9. }
  10.  
  11. custom-dispatcher-mailbox {
  12. mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
  13. }

Or by defining the requirement on your actor class like this:

  1. public class MySpecialActor extends UntypedActor implements
  2. RequiresMessageQueue<MyUnboundedJMessageQueueSemantics> {
  3. // ...
  4. }

§Special Semantics of system.actorOf

In order to make system.actorOf both synchronous and non-blocking while keeping the return type ActorRef (and the semantics that the returned ref is fully functional), special handling takes place for this case. Behind the scenes, a hollow kind of actor reference is constructed, which is sent to the system’s guardian actor who actually creates the actor and its context and puts those inside the reference. Until that has happened, messages sent to the ActorRef will be queued locally, and only upon swapping the real filling in will they be transferred into the real mailbox. Thus,

  1. final Props props = ...
  2. // this actor uses MyCustomMailbox, which is assumed to be a singleton
  3. system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
  4. assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));

will probably fail; you will have to allow for some time to pass and retry the check à la TestKit.awaitCond.