Dispatchers (Java)

Dispatchers (Java)

An Akka MessageDispatcher is what makes Akka Actors “tick”, it is the engine of the machine so to speak. All MessageDispatcher implementations are also an ExecutionContext, which means that they can be used to execute arbitrary code, for instance Futures (Java).

Default dispatcher

Every ActorSystem will have a default dispatcher that will be used in case nothing else is configured for an Actor. The default dispatcher can be configured, and is by default a Dispatcher with a “fork-join-executor”, which gives excellent performance in most cases.

Setting the dispatcher for an Actor

So in case you want to give your Actor a different dispatcher than the default, you need to do two things, of which the first is:

ActorRef myActor =
  system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
    "myactor3");

Note

The “dispatcherId” you specify in withDispatcher is in fact a path into your configuration. So in this example it’s a top-level section, but you could for instance put it as a sub-section, where you’d use periods to denote sub-sections, like this: "foo.bar.my-dispatcher"

And then you just need to configure that dispatcher in your configuration:

my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

And here’s another example that uses the “thread-pool-executor”:

my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

For more options, see the default-dispatcher section of the Configuration.

Types of dispatchers

There are 4 different types of message dispatchers:

  • Dispatcher

    • Sharability: Unlimited

    • Mailboxes: Any, creates one per Actor

    • Use cases: Default dispatcher, Bulkheading

    • Driven by: java.util.concurrent.ExecutorService

      specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator

  • PinnedDispatcher

    • Sharability: None

    • Mailboxes: Any, creates one per Actor

    • Use cases: Bulkheading

    • Driven by: Any akka.dispatch.ThreadPoolExecutorConfigurator

      by default a “thread-pool-executor”

  • BalancingDispatcher

    • Sharability: Actors of the same type only

    • Mailboxes: Any, creates one for all Actors

    • Use cases: Work-sharing

    • Driven by: java.util.concurrent.ExecutorService

      specify using “executor” using “fork-join-executor”, “thread-pool-executor” or the FQCN of an akka.dispatcher.ExecutorServiceConfigurator

  • CallingThreadDispatcher

    • Sharability: Unlimited
    • Mailboxes: Any, creates one per Actor per Thread (on demand)
    • Use cases: Testing
    • Driven by: The calling thread (duh)

More dispatcher configuration examples

Configuring a PinnedDispatcher:

my-pinned-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

And then using it:

ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
    .withDispatcher("my-pinned-dispatcher"));

Mailboxes

An Akka Mailbox holds the messages that are destined for an Actor. Normally each Actor has its own mailbox, but with example a BalancingDispatcher all actors with the same BalancingDispatcher will share a single instance.

Builtin implementations

Akka comes shipped with a number of default mailbox implementations:

  • UnboundedMailbox
    • Backed by a java.util.concurrent.ConcurrentLinkedQueue
    • Blocking: No
    • Bounded: No
  • BoundedMailbox
    • Backed by a java.util.concurrent.LinkedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
  • UnboundedPriorityMailbox
    • Backed by a java.util.concurrent.PriorityBlockingQueue
    • Blocking: Yes
    • Bounded: No
  • BoundedPriorityMailbox
    • Backed by a java.util.PriorityBlockingQueue wrapped in an akka.util.BoundedBlockingQueue
    • Blocking: Yes
    • Bounded: Yes
  • Durable mailboxes, see Durable Mailboxes.

Mailbox configuration examples

How to create a PriorityMailbox:

public static class MyPrioMailbox extends UnboundedPriorityMailbox {
  public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation
    // Create a new PriorityGenerator, lower prio means more important
    super(new PriorityGenerator() {
      @Override
      public int gen(Object message) {
        if (message.equals("highpriority"))
          return 0; // 'highpriority messages should be treated first if possible
        else if (message.equals("lowpriority"))
          return 2; // 'lowpriority messages should be treated last if possible
        else if (message.equals(Actors.poisonPill()))
          return 3; // PoisonPill when no other left
        else
          return 1; // By default they go between high and low prio
      }
    });
  }
}

And then add it to the configuration:

prio-dispatcher {
  mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
}

And then an example on how you would use it:

  // We create a new Actor that just prints out what it processes
ActorRef myActor = system.actorOf(
    new Props().withCreator(new UntypedActorFactory() {
      public UntypedActor create() {
        return new UntypedActor() {
          LoggingAdapter log =
                  Logging.getLogger(getContext().system(), this);
          {
            getSelf().tell("lowpriority");
            getSelf().tell("lowpriority");
            getSelf().tell("highpriority");
            getSelf().tell("pigdog");
            getSelf().tell("pigdog2");
            getSelf().tell("pigdog3");
            getSelf().tell("highpriority");
            getSelf().tell(Actors.poisonPill());
          }

          public void onReceive(Object message) {
            log.info(message.toString());
          }
        };
      }
    }).withDispatcher("prio-dispatcher"));

/*
Logs:
  'highpriority
  'highpriority
  'pigdog
  'pigdog2
  'pigdog3
  'lowpriority
  'lowpriority
*/

Note

Make sure to include a constructor which takes akka.actor.ActorSystem.Settings and com.typesafe.config.Config arguments, as this constructor is invoked reflectively to construct your mailbox type. The config passed in as second argument is that section from the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it.

Contents