Mailboxes
You are viewing the documentation for the new actor APIs, to view the Akka Classic documentation, see Classic Mailboxes.
Dependency
Mailboxes are part of core Akka, which means that they are part of the akka-actor
dependency. This page describes how to use mailboxes with akka-actor-typed
.
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependency as below.
- sbt
val AkkaVersion = "2.10.0+31-e778606c-SNAPSHOT" libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-bom_${scala.binary.version}</artifactId> <version>2.10.0+31-e778606c-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.10.0+31-e778606c-SNAPSHOT") implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}" }
Introduction
Each actor in Akka has a Mailbox
, this is where the messages are enqueued before being processed by the actor.
By default an unbounded mailbox is used, this means any number of messages can be enqueued into the mailbox.
The unbounded mailbox is a convenient default but in a scenario where messages are added to the mailbox faster than the actor can process them, this can lead to the application running out of memory. For this reason a bounded mailbox can be specified, the bounded mailbox will pass new messages to deadletters
when the mailbox is full.
For advanced use cases it is also possible to defer mailbox selection to config by pointing to a config path.
Selecting what mailbox is used
Selecting a Mailbox Type for an Actor
To select a specific mailbox for an actor use MailboxSelector
MailboxSelector
to create a Props
Props
instance for spawning your actor:
- Scala
-
source
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100)) val props = MailboxSelector.fromConfig("my-app.my-special-mailbox") context.spawn(childBehavior, "from-config-mailbox-child", props)
- Java
-
source
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100)); context.spawn( childBehavior, "from-config-mailbox-child", MailboxSelector.fromConfig("my-app.my-special-mailbox"));
fromConfig
fromConfig
takes an absolute config path to a block defining the dispatcher in the config file:
sourcemy-app {
my-special-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
}
Default Mailbox
The default mailbox is used when the mailbox is not specified and is the SingleConsumerOnlyUnboundedMailbox
SingleConsumerOnlyUnboundedMailbox
Which Configuration is passed to the Mailbox Type
Each mailbox type is implemented by a class which extends MailboxType
MailboxType
and takes two constructor arguments: a ActorSystem.Settings
ActorSystem.Settings
object and a Config section. The latter is computed by obtaining the named configuration section from the ActorSystem
ActorSystem
configuration, overriding its id
key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section.
Mailbox Implementations
Akka ships with a number of mailbox implementations:
-
SingleConsumerOnlyUnboundedMailbox
SingleConsumerOnlyUnboundedMailbox
(default)- This is the default
- Backed by a Multiple-Producer Single-Consumer queue, cannot be used with
BalancingDispatcher
- Blocking: No
- Bounded: No
- Configuration name:
"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
-
UnboundedMailbox
UnboundedMailbox
- Backed by a
java.util.concurrent.ConcurrentLinkedQueue
- Blocking: No
- Bounded: No
- Configuration name:
"unbounded"
or"akka.dispatch.UnboundedMailbox"
- Backed by a
-
NonBlockingBoundedMailbox
NonBlockingBoundedMailbox
- Backed by a very efficient Multiple-Producer Single-Consumer queue
- Blocking: No (discards overflowing messages into deadLetters)
- Bounded: Yes
- Configuration name:
"akka.dispatch.NonBlockingBoundedMailbox"
-
UnboundedControlAwareMailbox
UnboundedControlAwareMailbox
- Delivers messages that extend
akka.dispatch.ControlMessage
akka.dispatch.ControlMessage
with higher priority - Backed by two
java.util.concurrent.ConcurrentLinkedQueue
- Blocking: No
- Bounded: No
- Configuration name:
"akka.dispatch.UnboundedControlAwareMailbox"
- Delivers messages that extend
-
UnboundedPriorityMailbox
UnboundedPriorityMailbox
- Backed by a
java.util.concurrent.PriorityBlockingQueue
- Delivery order for messages of equal priority is undefined - contrast with the
UnboundedStablePriorityMailbox
- Blocking: No
- Bounded: No
- Configuration name:
"akka.dispatch.UnboundedPriorityMailbox"
- Backed by a
-
UnboundedStablePriorityMailbox
UnboundedStablePriorityMailbox
- Backed by a
java.util.concurrent.PriorityBlockingQueue
wrapped in anakka.util.PriorityQueueStabilizer
akka.util.PriorityQueueStabilizer
- FIFO order is preserved for messages of equal priority - contrast with the
UnboundedPriorityMailbox
- Blocking: No
- Bounded: No
- Configuration name:
"akka.dispatch.UnboundedStablePriorityMailbox"
- Backed by a
Other bounded mailbox implementations which will block the sender if the capacity is reached and configured with non-zero mailbox-push-timeout-time
.
The following mailboxes should only be used with zero mailbox-push-timeout-time
.
BoundedMailbox
BoundedMailbox
- Backed by a
java.util.concurrent.LinkedBlockingQueue
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name:
"bounded"
or"akka.dispatch.BoundedMailbox"
- Backed by a
BoundedPriorityMailbox
BoundedPriorityMailbox
- Backed by a
java.util.PriorityQueue
wrapped in anakka.util.BoundedBlockingQueue
akka.util.BoundedBlockingQueue
- Delivery order for messages of equal priority is undefined - contrast with the
BoundedStablePriorityMailbox
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name:
"akka.dispatch.BoundedPriorityMailbox"
- Backed by a
BoundedStablePriorityMailbox
BoundedStablePriorityMailbox
- Backed by a
java.util.PriorityQueue
wrapped in anakka.util.PriorityQueueStabilizer
akka.util.PriorityQueueStabilizer
and anakka.util.BoundedBlockingQueue
akka.util.BoundedBlockingQueue
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name:
"akka.dispatch.BoundedStablePriorityMailbox"
- Backed by a
BoundedControlAwareMailbox
BoundedControlAwareMailbox
- Delivers messages that extend
akka.dispatch.ControlMessage
akka.dispatch.ControlMessage
with higher priority - Backed by two
java.util.concurrent.ConcurrentLinkedQueue
and blocking on enqueue if capacity has been reached - Blocking: Yes if used with non-zero
mailbox-push-timeout-time
, otherwise No - Bounded: Yes
- Configuration name:
"akka.dispatch.BoundedControlAwareMailbox"
- Delivers messages that extend
Custom Mailbox type
The best way to show how to create your own Mailbox type is by example:
- Scala
-
source
// Marker trait used for mailbox requirements mapping trait MyUnboundedMessageQueueSemantics
- Java
-
source
// Marker interface used for mailbox requirements mapping public interface MyUnboundedMessageQueueSemantics {}
- Scala
-
source
import akka.actor.ActorRef import akka.actor.ActorSystem import akka.dispatch.Envelope import akka.dispatch.MailboxType import akka.dispatch.MessageQueue import akka.dispatch.ProducesMessageQueue import com.typesafe.config.Config import java.util.concurrent.ConcurrentLinkedQueue import scala.Option object MyUnboundedMailbox { // This is the MessageQueue implementation class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics { private final val queue = new ConcurrentLinkedQueue[Envelope]() // these should be implemented; queue used as example def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue.offer(handle) def dequeue(): Envelope = queue.poll() def numberOfMessages: Int = queue.size def hasMessages: Boolean = !queue.isEmpty def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { while (hasMessages) { deadLetters.enqueue(owner, dequeue()) } } } } // This is the Mailbox implementation class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] { import MyUnboundedMailbox._ // This constructor signature must exist, it will be called by Akka def this(settings: ActorSystem.Settings, config: Config) = { // put your initialization code here this() } // The create method is called to create the MessageQueue final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new MyMessageQueue() }
- Java
-
source
import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Envelope; import akka.dispatch.MailboxType; import akka.dispatch.MessageQueue; import akka.dispatch.ProducesMessageQueue; import com.typesafe.config.Config; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import scala.Option; public class MyUnboundedMailbox implements MailboxType, ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> { // This is the MessageQueue implementation public static class MyMessageQueue implements MessageQueue, MyUnboundedMessageQueueSemantics { private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>(); // these must be implemented; queue used as example public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } public Envelope dequeue() { return queue.poll(); } public int numberOfMessages() { return queue.size(); } public boolean hasMessages() { return !queue.isEmpty(); } public void cleanUp(ActorRef owner, MessageQueue deadLetters) { while (!queue.isEmpty()) { deadLetters.enqueue(owner, dequeue()); } } } // This constructor signature must exist, it will be called by Akka public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) { // put your initialization code here } // The create method is called to create the MessageQueue public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { return new MyMessageQueue(); } }
And then you specify the FQCN of your MailboxType as the value of the “mailbox-type” in the dispatcher configuration, or the mailbox configuration.
Make sure to include a constructor which takes akka.actor.ActorSystem.Settings
akka.actor.ActorSystem.Settings
and com.typesafe.config.Config arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from the configuration which describes the dispatcher or mailbox setting using this mailbox type; the mailbox type will be instantiated once for each dispatcher or mailbox setting using it.