Dispatchers (Scala)

Dispatchers (Scala)

An Akka MessageDispatcher is what makes Akka Actors "tick", it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext, which means that they can be used to execute arbitrary code, for instance Futures (Scala).

Default dispatcher

Every ActorSystem will have a default dispatcher that will be used in case nothing else is configured for an Actor. The default dispatcher can be configured, and is by default a Dispatcher with a "fork-join-executor", which gives excellent performance in most cases.

Setting the dispatcher for an Actor

So in case you want to give your Actor a different dispatcher than the default, you need to do two things, of which the first is:

  1. import akka.actor.Props
  2. val myActor =
  3. context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

Note

The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. So in this example it's a top-level section, but you could for instance put it as a sub-section, where you'd use periods to denote sub-sections, like this: "foo.bar.my-dispatcher"

And then you just need to configure that dispatcher in your configuration:

  1. my-dispatcher {
  2. # Dispatcher is the name of the event-based dispatcher
  3. type = Dispatcher
  4. # What kind of ExecutionService to use
  5. executor = "fork-join-executor"
  6. # Configuration for the fork join pool
  7. fork-join-executor {
  8. # Min number of threads to cap factor-based parallelism number to
  9. parallelism-min = 2
  10. # Parallelism (threads) ... ceil(available processors * factor)
  11. parallelism-factor = 2.0
  12. # Max number of threads to cap factor-based parallelism number to
  13. parallelism-max = 10
  14. }
  15. # Throughput defines the maximum number of messages to be
  16. # processed per actor before the thread jumps to the next actor.
  17. # Set to 1 for as fair as possible.
  18. throughput = 100
  19. }

And here's another example that uses the "thread-pool-executor":

  1. my-thread-pool-dispatcher {
  2. # Dispatcher is the name of the event-based dispatcher
  3. type = Dispatcher
  4. # What kind of ExecutionService to use
  5. executor = "thread-pool-executor"
  6. # Configuration for the thread pool
  7. thread-pool-executor {
  8. # minimum number of threads to cap factor-based core number to
  9. core-pool-size-min = 2
  10. # No of core threads ... ceil(available processors * factor)
  11. core-pool-size-factor = 2.0
  12. # maximum number of threads to cap factor-based number to
  13. core-pool-size-max = 10
  14. }
  15. # Throughput defines the maximum number of messages to be
  16. # processed per actor before the thread jumps to the next actor.
  17. # Set to 1 for as fair as possible.
  18. throughput = 100
  19. }

For more options, see the default-dispatcher section of the Configuration.

Types of dispatchers

There are 4 different types of message dispatchers:

  • Dispatcher

    • This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher used if one is not specified.

    • Sharability: Unlimited

    • Mailboxes: Any, creates one per Actor

    • Use cases: Default dispatcher, Bulkheading

    • Driven by: java.util.concurrent.ExecutorService

      specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator

  • PinnedDispatcher

    • This dispatcher dedicates a unique thread for each actor using it; i.e. each actor will have its own thread pool with only one thread in the pool.

    • Sharability: None

    • Mailboxes: Any, creates one per Actor

    • Use cases: Bulkheading

    • Driven by: Any akka.dispatch.ThreadPoolExecutorConfigurator

      by default a "thread-pool-executor"

  • BalancingDispatcher

    • This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.

    • All the actors share a single Mailbox that they get their messages from.

    • It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.

    • Sharability: Actors of the same type only

    • Mailboxes: Any, creates one for all Actors

    • Use cases: Work-sharing

    • Driven by: java.util.concurrent.ExecutorService

      specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator

    • Note that you can not use a BalancingDispatcher as a Router Dispatcher. (You can however use it for the Routees)

  • CallingThreadDispatcher

    • This dispatcher runs invocations on the current thread only. This dispatcher does not create any new threads, but it can be used from different threads concurrently for the same actor. See CallingThreadDispatcher for details and restrictions.
    • Sharability: Unlimited
    • Mailboxes: Any, creates one per Actor per Thread (on demand)
    • Use cases: Testing
    • Driven by: The calling thread (duh)

More dispatcher configuration examples

Configuring a PinnedDispatcher:

  1. my-pinned-dispatcher {
  2. executor = "thread-pool-executor"
  3. type = PinnedDispatcher
  4. }

And then using it:

  1. val myActor =
  2. context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")

Note that thread-pool-executor configuration as per the above my-thread-pool-dispatcher example is NOT applicable. This is because every actor will have its own thread pool when using PinnedDispatcher,

Mailboxes

An Akka Mailbox holds the messages that are destined for an Actor. Normally each Actor has its own mailbox, but with example a BalancingDispatcher all actors with the same BalancingDispatcher will share a single instance.

Builtin implementations

Akka comes shipped with a number of default mailbox implementations:

  • UnboundedMailbox
    • Backed by a java.util.concurrent.ConcurrentLinkedQueue
    • Blocking: No
    • Bounded: No
  • BoundedMailbox
    • Backed by a java.util.concurrent.LinkedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
  • UnboundedPriorityMailbox
    • Backed by a java.util.concurrent.PriorityBlockingQueue
    • Blocking: Yes
    • Bounded: No
  • BoundedPriorityMailbox
    • Backed by a java.util.PriorityBlockingQueue wrapped in an akka.util.BoundedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
  • Durable mailboxes, see Durable Mailboxes.

Mailbox configuration examples

How to create a PriorityMailbox:

  1. import akka.dispatch.PriorityGenerator
  2. import akka.dispatch.UnboundedPriorityMailbox
  3. import com.typesafe.config.Config
  4.  
  5. // We inherit, in this case, from UnboundedPriorityMailbox
  6. // and seed it with the priority generator
  7. class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(
  8. // Create a new PriorityGenerator, lower prio means more important
  9. PriorityGenerator {
  10. // 'highpriority messages should be treated first if possible
  11. case 'highpriority 0
  12.  
  13. // 'lowpriority messages should be treated last if possible
  14. case 'lowpriority 2
  15.  
  16. // PoisonPill when no other left
  17. case PoisonPill 3
  18.  
  19. // We default to 1, which is in between high and low
  20. case otherwise 1
  21. })

And then add it to the configuration:

  1. prio-dispatcher {
  2. mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. }

And then an example on how you would use it:

  1. // We create a new Actor that just prints out what it processes
  2. val a = system.actorOf(
  3. Props(new Actor {
  4. val log: LoggingAdapter = Logging(context.system, this)
  5.  
  6. self ! 'lowpriority
  7. self ! 'lowpriority
  8. self ! 'highpriority
  9. self ! 'pigdog
  10. self ! 'pigdog2
  11. self ! 'pigdog3
  12. self ! 'highpriority
  13. self ! PoisonPill
  14.  
  15. def receive = {
  16. case x log.info(x.toString)
  17. }
  18. }).withDispatcher("prio-dispatcher"))
  19.  
  20. /*
  21. Logs:
  22. 'highpriority
  23. 'highpriority
  24. 'pigdog
  25. 'pigdog2
  26. 'pigdog3
  27. 'lowpriority
  28. 'lowpriority
  29. */

Creating your own Mailbox type

An example is worth a thousand quacks:

  1. case class MyUnboundedMailbox() extends akka.dispatch.MailboxType {
  2. import akka.actor.ActorContext
  3. import com.typesafe.config.Config
  4. import java.util.concurrent.ConcurrentLinkedQueue
  5. import akka.dispatch.{
  6. Envelope,
  7. MessageQueue,
  8. QueueBasedMessageQueue,
  9. UnboundedMessageQueueSemantics
  10. }
  11.  
  12. // This constructor signature must exist, it will be called by Akka
  13. def this(settings: ActorSystem.Settings, config: Config) = this()
  14.  
  15. // The create method is called to create the MessageQueue
  16. final override def create(owner: Option[ActorContext]): MessageQueue =
  17. new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
  18. final val queue = new ConcurrentLinkedQueue[Envelope]()
  19. }

And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration.

Note

Make sure to include a constructor which takes akka.actor.ActorSystem.Settings and com.typesafe.config.Config arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it.