Mailboxes
Dependency
To use Mailboxes, you must add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-actor_2.12:2.5.32" }
Introduction
An Akka Mailbox
holds the messages that are destined for an Actor
. Normally each Actor
has its own mailbox, but with for example a BalancingPool
all routees will share a single mailbox instance.
Mailbox Selection
Requiring a Message Queue Type for an Actor
It is possible to require a certain type of message queue for a certain type of actor by having that actor extendimplement the parameterized traitinterface RequiresMessageQueue
. Here is an example:
- Scala
-
source
import akka.dispatch.RequiresMessageQueue import akka.dispatch.BoundedMessageQueueSemantics class MyBoundedActor extends MyActor with RequiresMessageQueue[BoundedMessageQueueSemantics]
- Java
-
source
import akka.dispatch.BoundedMessageQueueSemantics; import akka.dispatch.RequiresMessageQueue; public class MyBoundedActor extends MyActor implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}
The type parameter to the RequiresMessageQueue
traitinterface needs to be mapped to a mailbox in configuration like this:
sourcebounded-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1000
}
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
Now every time you create an actor of type MyBoundedActor
it will try to get a bounded mailbox. If the actor has a different mailbox configured in deployment, either directly or via a dispatcher with a specified mailbox type, then that will override this mapping.
The type of the queue in the mailbox created for an actor will be checked against the required type in the traitinterface and if the queue doesn’t implement the required type then actor creation will fail.
Requiring a Message Queue Type for a Dispatcher
A dispatcher may also have a requirement for the mailbox type used by the actors running on it. An example is the BalancingDispatcher which requires a message queue that is thread-safe for multiple concurrent consumers. Such a requirement is formulated within the dispatcher configuration section like this:
my-dispatcher {
mailbox-requirement = org.example.MyInterface
}
The given requirement names a class or interface which will then be ensured to be a supertype of the message queue’s implementation. In case of a conflict—e.g. if the actor requires a mailbox type which does not satisfy this requirement—then actor creation will fail.
How the Mailbox Type is Selected
When an actor is created, the ActorRefProvider
first determines the dispatcher which will execute it. Then the mailbox is determined as follows:
- If the actor’s deployment configuration section contains a
mailbox
key then that names a configuration section describing the mailbox type to be used. - If the actor’s
Props
contains a mailbox selection—i.e.withMailbox
was called on it—then that names a configuration section describing the mailbox type to be used (note that this needs to be an absolute config path, for examplemyapp.special-mailbox
, and is not nested inside theakka
namespace). - If the dispatcher’s configuration section contains a
mailbox-type
key the same section will be used to configure the mailbox type. - If the actor requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used; if that fails then the dispatcher’s requirement—if any—will be tried instead.
- If the dispatcher requires a mailbox type as described above then the mapping for that requirement will be used to determine the mailbox type to be used.
- The default mailbox
akka.actor.default-mailbox
will be used.
Default Mailbox
When the mailbox is not specified as described above the default mailbox is used. By default it is an unbounded mailbox, which is backed by a java.util.concurrent.ConcurrentLinkedQueue
.
SingleConsumerOnlyUnboundedMailbox
is an even more efficient mailbox, and it can be used as the default mailbox, but it cannot be used with a BalancingDispatcher.
Configuration of SingleConsumerOnlyUnboundedMailbox
as default mailbox:
akka.actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
Which Configuration is passed to the Mailbox Type
Each mailbox type is implemented by a class which extends MailboxType
and takes two constructor arguments: a ActorSystem.Settings
object and a Config
section. The latter is computed by obtaining the named configuration section from the actor system’s configuration, overriding its id
key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section.
Builtin Mailbox Implementations
Akka comes shipped with a number of mailbox implementations:
- UnboundedMailbox (default)
- The default mailbox
- Backed by a
java.util.concurrent.ConcurrentLinkedQueue
- Blocking: No
- Bounded: No
- Configuration name:
"unbounded"
or"akka.dispatch.UnboundedMailbox"
- SingleConsumerOnlyUnboundedMailbox This queue may or may not be faster than the default one depending on your use-case—be sure to benchmark properly!
- Backed by a Multiple-Producer Single-Consumer queue, cannot be used with
BalancingDispatcher
- Blocking: No
- Bounded: No
- Configuration name:
"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
- Backed by a Multiple-Producer Single-Consumer queue, cannot be used with
- NonBlockingBoundedMailbox
- Backed by a very efficient Multiple-Producer Single-Consumer queue
- Blocking: No (discards overflowing messages into deadLetters)
- Bounded: Yes
- Configuration name:
"akka.dispatch.NonBlockingBoundedMailbox"
- UnboundedControlAwareMailbox
- Delivers messages that extend
akka.dispatch.ControlMessage
with higher priority - Backed by two
java.util.concurrent.ConcurrentLinkedQueue
- Blocking: No
- Bounded: No
- Configuration name: “akka.dispatch.UnboundedControlAwareMailbox”
- Delivers messages that extend
- UnboundedPriorityMailbox
- Backed by a
java.util.concurrent.PriorityBlockingQueue
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: No
- Bounded: No
- Configuration name: “akka.dispatch.UnboundedPriorityMailbox”
- Backed by a
- UnboundedStablePriorityMailbox
- Backed by a
java.util.concurrent.PriorityBlockingQueue
wrapped in anakka.util.PriorityQueueStabilizer
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: No
- Bounded: No
- Configuration name: “akka.dispatch.UnboundedStablePriorityMailbox”
- Backed by a
Other bounded mailbox implementations which will block the sender if the capacity is reached and configured with non-zero mailbox-push-timeout-time
.
The following mailboxes should only be used with zero mailbox-push-timeout-time
.
- BoundedMailbox
- Backed by a
java.util.concurrent.LinkedBlockingQueue
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name: “bounded” or “akka.dispatch.BoundedMailbox”
- Backed by a
- BoundedPriorityMailbox
- Backed by a
java.util.PriorityQueue
wrapped in anakka.util.BoundedBlockingQueue
- Delivery order for messages of equal priority is undefined - contrast with the
BoundedStablePriorityMailbox
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name:
"akka.dispatch.BoundedPriorityMailbox"
- Backed by a
- BoundedStablePriorityMailbox
- Backed by a
java.util.PriorityQueue
wrapped in anakka.util.PriorityQueueStabilizer
and anakka.util.BoundedBlockingQueue
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name: “akka.dispatch.BoundedStablePriorityMailbox”
- Backed by a
- BoundedControlAwareMailbox
- Delivers messages that extend
akka.dispatch.ControlMessage
with higher priority - Backed by two
java.util.concurrent.ConcurrentLinkedQueue
and blocking on enqueue if capacity has been reached - Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name: “akka.dispatch.BoundedControlAwareMailbox”
- Delivers messages that extend
Mailbox configuration examples
PriorityMailbox
How to create a PriorityMailbox:
- Scala
-
source
import akka.dispatch.PriorityGenerator import akka.dispatch.UnboundedStablePriorityMailbox import com.typesafe.config.Config // We inherit, in this case, from UnboundedStablePriorityMailbox // and seed it with the priority generator class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedStablePriorityMailbox( // Create a new PriorityGenerator, lower prio means more important PriorityGenerator { // 'highpriority messages should be treated first if possible case 'highpriority => 0 // 'lowpriority messages should be treated last if possible case 'lowpriority => 2 // PoisonPill when no other left case PoisonPill => 3 // We default to 1, which is in between high and low case otherwise => 1 })
- Java
-
source
static class MyPrioMailbox extends UnboundedStablePriorityMailbox { // needed for reflective instantiation public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // Create a new PriorityGenerator, lower prio means more important super( new PriorityGenerator() { @Override public int gen(Object message) { if (message.equals("highpriority")) return 0; // 'highpriority messages should be treated first if possible else if (message.equals("lowpriority")) return 2; // 'lowpriority messages should be treated last if possible else if (message.equals(PoisonPill.getInstance())) return 3; // PoisonPill when no other left else return 1; // By default they go between high and low prio } }); } }
And then add it to the configuration:
sourceprio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other dispatcher configuration goes here
}
And then an example on how you would use it:
- Scala
-
source
// We create a new Actor that just prints out what it processes class Logger extends Actor { val log: LoggingAdapter = Logging(context.system, this) self ! 'lowpriority self ! 'lowpriority self ! 'highpriority self ! 'pigdog self ! 'pigdog2 self ! 'pigdog3 self ! 'highpriority self ! PoisonPill def receive = { case x => log.info(x.toString) } } val a = system.actorOf(Props(classOf[Logger], this).withDispatcher("prio-dispatcher")) /* * Logs: * 'highpriority * 'highpriority * 'pigdog * 'pigdog2 * 'pigdog3 * 'lowpriority * 'lowpriority */ - Java
-
source
class Demo extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); { for (Object msg : new Object[] { "lowpriority", "lowpriority", "highpriority", "pigdog", "pigdog2", "pigdog3", "highpriority", PoisonPill.getInstance() }) { getSelf().tell(msg, getSelf()); } } @Override public Receive createReceive() { return receiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); } } // We create a new Actor that just prints out what it processes ActorRef myActor = system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher")); /* Logs: 'highpriority 'highpriority 'pigdog 'pigdog2 'pigdog3 'lowpriority 'lowpriority */
It is also possible to configure a mailbox type directly like this (this is a top-level configuration entry):
- Scala
-
source
prio-mailbox { mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" //Other mailbox configuration goes here } akka.actor.deployment { /priomailboxactor { mailbox = prio-mailbox } }
- Java
-
source
prio-mailbox { mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox" //Other mailbox configuration goes here } akka.actor.deployment { /priomailboxactor { mailbox = prio-mailbox } }
And then use it either from deployment like this:
- Scala
-
source
import akka.actor.Props val myActor = context.actorOf(Props[MyActor], "priomailboxactor")
- Java
-
source
ActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");
Or code like this:
- Scala
-
source
import akka.actor.Props val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))
- Java
-
source
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));
ControlAwareMailbox
A ControlAwareMailbox
can be very useful if an actor needs to be able to receive control messages immediately no matter how many other messages are already in its mailbox.
It can be configured like this:
sourcecontrol-aware-dispatcher {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
//Other dispatcher configuration goes here
}
Control messages need to extend the ControlMessage
trait:
- Scala
-
source
import akka.dispatch.ControlMessage case object MyControlMessage extends ControlMessage
- Java
-
source
static class MyControlMessage implements ControlMessage {}
And then an example on how you would use it:
- Scala
-
source
// We create a new Actor that just prints out what it processes class Logger extends Actor { val log: LoggingAdapter = Logging(context.system, this) self ! 'foo self ! 'bar self ! MyControlMessage self ! PoisonPill def receive = { case x => log.info(x.toString) } } val a = system.actorOf(Props(classOf[Logger], this).withDispatcher("control-aware-dispatcher")) /* * Logs: * MyControlMessage * 'foo * 'bar */ - Java
-
source
class Demo extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); { for (Object msg : new Object[] {"foo", "bar", new MyControlMessage(), PoisonPill.getInstance()}) { getSelf().tell(msg, getSelf()); } } @Override public Receive createReceive() { return receiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); } } // We create a new Actor that just prints out what it processes ActorRef myActor = system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher")); /* Logs: 'MyControlMessage 'foo 'bar */
Creating your own Mailbox type
An example is worth a thousand quacks:
- Scala
-
source
// Marker trait used for mailbox requirements mapping trait MyUnboundedMessageQueueSemantics
- Java
-
source
// Marker interface used for mailbox requirements mapping public interface MyUnboundedMessageQueueSemantics {}
- Scala
-
source
import akka.actor.ActorRef import akka.actor.ActorSystem import akka.dispatch.Envelope import akka.dispatch.MailboxType import akka.dispatch.MessageQueue import akka.dispatch.ProducesMessageQueue import com.typesafe.config.Config import java.util.concurrent.ConcurrentLinkedQueue import scala.Option object MyUnboundedMailbox { // This is the MessageQueue implementation class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics { private final val queue = new ConcurrentLinkedQueue[Envelope]() // these should be implemented; queue used as example def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue.offer(handle) def dequeue(): Envelope = queue.poll() def numberOfMessages: Int = queue.size def hasMessages: Boolean = !queue.isEmpty def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { while (hasMessages) { deadLetters.enqueue(owner, dequeue()) } } } } // This is the Mailbox implementation class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] { import MyUnboundedMailbox._ // This constructor signature must exist, it will be called by Akka def this(settings: ActorSystem.Settings, config: Config) = { // put your initialization code here this() } // The create method is called to create the MessageQueue final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new MyMessageQueue() }
- Java
-
source
import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Envelope; import akka.dispatch.MailboxType; import akka.dispatch.MessageQueue; import akka.dispatch.ProducesMessageQueue; import com.typesafe.config.Config; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Queue; import scala.Option; public class MyUnboundedMailbox implements MailboxType, ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> { // This is the MessageQueue implementation public static class MyMessageQueue implements MessageQueue, MyUnboundedMessageQueueSemantics { private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>(); // these must be implemented; queue used as example public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } public Envelope dequeue() { return queue.poll(); } public int numberOfMessages() { return queue.size(); } public boolean hasMessages() { return !queue.isEmpty(); } public void cleanUp(ActorRef owner, MessageQueue deadLetters) { for (Envelope handle : queue) { deadLetters.enqueue(owner, handle); } } } // This constructor signature must exist, it will be called by Akka public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) { // put your initialization code here } // The create method is called to create the MessageQueue public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { return new MyMessageQueue(); } }
And then you specify the FQCN of your MailboxType as the value of the “mailbox-type” in the dispatcher configuration, or the mailbox configuration.
Make sure to include a constructor which takes akka.actor.ActorSystem.Settings
and com.typesafe.config.Config
arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from the configuration which describes the dispatcher or mailbox setting using this mailbox type; the mailbox type will be instantiated once for each dispatcher or mailbox setting using it.
You can also use the mailbox as a requirement on the dispatcher like this:
sourcecustom-dispatcher {
mailbox-requirement =
"jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
}
akka.actor.mailbox.requirements {
"jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
}
Or by defining the requirement on your actor class like this:
- Scala
-
source
class MySpecialActor extends Actor with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] { // ... }
- Java
-
source
static class MySpecialActor extends AbstractActor implements RequiresMessageQueue<MyUnboundedMessageQueueSemantics> { // ... }
Special Semantics of system.actorOf
In order to make system.actorOf
both synchronous and non-blocking while keeping the return type ActorRef
(and the semantics that the returned ref is fully functional), special handling takes place for this case. Behind the scenes, a hollow kind of actor reference is constructed, which is sent to the system’s guardian actor who actually creates the actor and its context and puts those inside the reference. Until that has happened, messages sent to the ActorRef
will be queued locally, and only upon swapping the real filling in will they be transferred into the real mailbox. Thus,
- Scala
-
val props: Props = ... // this actor uses MyCustomMailbox, which is assumed to be a singleton system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang" assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")
- Java
-
final Props props = ... // this actor uses MyCustomMailbox, which is assumed to be a singleton system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender); assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
will probably fail; you will have to allow for some time to pass and retry the check à la TestKit.awaitCond
.