Cluster Usage (Scala)
Loading

Cluster Usage (Scala)

Note

This module is experimental. This document describes how to use the features implemented so far. More features are coming in Akka Coltrane. Track progress of the Coltrane milestone in Assembla and the Roadmap.

For introduction to the Akka Cluster concepts please see Cluster Specification.

Preparing Your Project for Clustering

The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:

  1. "com.typesafe.akka" %% "akka-cluster-experimental" % "2.1.4"

If you are using the latest nightly build you should pick a timestamped Akka version from http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster-experimental_2.10/. We recommend against using SNAPSHOT in order to obtain stable builds.

A Simple Cluster Example

The following small program together with its configuration starts an ActorSystem with the Cluster enabled. It joins the cluster and logs some membership events.

Try it out:

  1. Add the following application.conf in your project, place it in src/main/resources:
  1. akka {
  2. actor {
  3. provider = "akka.cluster.ClusterActorRefProvider"
  4. }
  5. remote {
  6. transport = "akka.remote.netty.NettyRemoteTransport"
  7. log-remote-lifecycle-events = off
  8. netty {
  9. hostname = "127.0.0.1"
  10. port = 0
  11. }
  12. }
  13.  
  14. cluster {
  15. seed-nodes = [
  16. "akka://ClusterSystem@127.0.0.1:2551",
  17. "akka://ClusterSystem@127.0.0.1:2552"]
  18.  
  19. auto-down = on
  20. }
  21. }

To enable cluster capabilities in your Akka project you should, at a minimum, add the Remoting (Scala) settings, but with akka.cluster.ClusterActorRefProvider. The akka.cluster.seed-nodes should normally also be added to your application.conf file.

The seed nodes are configured contact points for initial, automatic, join of the cluster.

Note that if you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in application.conf instead of 127.0.0.1

  1. Add the following main program to your project, place it in src/main/scala:
  1. package sample.cluster.simple
  2.  
  3. import akka.actor._
  4. import akka.cluster.Cluster
  5. import akka.cluster.ClusterEvent._
  6.  
  7. object SimpleClusterApp {
  8.  
  9. def main(args: Array[String]): Unit = {
  10.  
  11. // Override the configuration of the port
  12. // when specified as program argument
  13. if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))
  14.  
  15. // Create an Akka system
  16. val system = ActorSystem("ClusterSystem")
  17. val clusterListener = system.actorOf(Props(new Actor with ActorLogging {
  18. def receive = {
  19. case state: CurrentClusterState
  20. log.info("Current members: {}", state.members)
  21. case MemberJoined(member)
  22. log.info("Member joined: {}", member)
  23. case MemberUp(member)
  24. log.info("Member is Up: {}", member)
  25. case UnreachableMember(member)
  26. log.info("Member detected as unreachable: {}", member)
  27. case _: ClusterDomainEvent // ignore
  28.  
  29. }
  30. }), name = "clusterListener")
  31.  
  32. Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])
  33. }
  34.  
  35. }
  1. Start the first seed node. Open a sbt session in one terminal window and run:

    1. run-main sample.cluster.simple.SimpleClusterApp 2551

2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.

  1. Start the second seed node. Open a sbt session in another terminal window and run:

    1. run-main sample.cluster.simple.SimpleClusterApp 2552

2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'.

Switch over to the first terminal window and see in the log output that the member joined.

  1. Start another node. Open a sbt session in yet another terminal window and run:

    1. run-main sample.cluster.simple.SimpleClusterApp

Now you don't need to specify the port number, and it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.

Start even more nodes in the same way, if you like.

6. Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals.

Look at the source code of the program again. What it does is to create an actor and register it as subscriber of certain cluster events. It gets notified with an snapshot event, CurrentClusterState that holds full state information of the cluster. After that it receives events for changes that happen in the cluster.

Automatic vs. Manual Joining

You may decide if joining to the cluster should be done automatically or manually. By default it is automatic and you need to define the seed nodes in configuration so that a new node has an initial contact point. When a new node is started it sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown.

The seed nodes can be started in any order and it is not necessary to have all seed nodes running, but the node configured as the first element in the seed-nodes configuration list must be started when initially starting a cluster, otherwise the other seed-nodes will not become initialized and no other node can join the cluster. It is quickest to start all configured seed nodes at the same time (order doesn't matter), otherwise it can take up to the configured seed-node-timeout until the nodes can join.

Once more than two seed nodes have been started it is no problem to shut down the first seed node. If the first seed node is restarted it will first try join the other seed nodes in the existing cluster.

You can disable automatic joining with configuration:

  1. akka.cluster.auto-join = off

Then you need to join manually, using JMX or Command Line Management. You can join to any node in the cluster. It doesn't have to be configured as seed node. If you are not using auto-join there is no need to configure seed nodes at all.

Joining can also be performed programatically with Cluster(system).join(address).

Automatic vs. Manual Downing

When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of new joining members to 'Up'. The status of the unreachable member must be changed to 'Down'. This can be performed automatically or manually. By default it must be done manually, using using JMX or Command Line Management.

It can also be performed programatically with Cluster(system).down(address).

You can enable automatic downing with configuration:

  1. akka.cluster.auto-down = on

Be aware of that using auto-down implies that two separate clusters will automatically be formed in case of network partition. That might be desired by some applications but not by others.

Subscribe to Cluster Events

You can subscribe to change notifications of the cluster membership by using Cluster(system).subscribe(subscriber, to). A snapshot of the full state, akka.cluster.ClusterEvent.CurrentClusterState, is sent to the subscriber as the first event, followed by events for incremental updates.

There are several types of change events, consult the API documentation of classes that extends akka.cluster.ClusterEvent.ClusterDomainEvent for details about the events.

Worker Dial-in Example

Let's take a look at an example that illustrates how workers, here named backend, can detect and register to new master nodes, here named frontend.

The example application provides a service to transform text. When some text is sent to one of the frontend services, it will be delegated to one of the backend workers, which performs the transformation job, and sends the result back to the original client. New backend nodes, as well as new frontend nodes, can be added or removed to the cluster dynamically.

In this example the following imports are used:

  1. import language.postfixOps
  2. import scala.concurrent.duration._
  3.  
  4. import akka.actor.Actor
  5. import akka.actor.ActorRef
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. import akka.actor.RootActorPath
  9. import akka.actor.Terminated
  10. import akka.cluster.Cluster
  11. import akka.cluster.ClusterEvent.CurrentClusterState
  12. import akka.cluster.ClusterEvent.MemberUp
  13. import akka.cluster.Member
  14. import akka.cluster.MemberStatus
  15. import akka.pattern.ask
  16. import akka.util.Timeout

Messages:

  1. case class TransformationJob(text: String)
  2. case class TransformationResult(text: String)
  3. case class JobFailed(reason: String, job: TransformationJob)
  4. case object BackendRegistration

The backend worker that performs the transformation job:

  1. class TransformationBackend extends Actor {
  2.  
  3. val cluster = Cluster(context.system)
  4.  
  5. // subscribe to cluster changes, MemberUp
  6. // re-subscribe when restart
  7. override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  8. override def postStop(): Unit = cluster.unsubscribe(self)
  9.  
  10. def receive = {
  11. case TransformationJob(text) sender ! TransformationResult(text.toUpperCase)
  12. case state: CurrentClusterState
  13. state.members.filter(_.status == MemberStatus.Up) foreach register
  14. case MemberUp(m) register(m)
  15. }
  16.  
  17. // try to register to all nodes, even though there
  18. // might not be any frontend on all nodes
  19. def register(member: Member): Unit =
  20. context.actorFor(RootActorPath(member.address) / "user" / "frontend") !
  21. BackendRegistration
  22. }

Note that the TransformationBackend actor subscribes to cluster events to detect new, potential, frontend nodes, and send them a registration message so that they know that they can use the backend worker.

The frontend that receives user jobs and delegates to one of the registered backend workers:

  1. class TransformationFrontend extends Actor {
  2.  
  3. var backends = IndexedSeq.empty[ActorRef]
  4. var jobCounter = 0
  5.  
  6. def receive = {
  7. case job: TransformationJob if backends.isEmpty
  8. sender ! JobFailed("Service unavailable, try again later", job)
  9.  
  10. case job: TransformationJob
  11. jobCounter += 1
  12. backends(jobCounter % backends.size) forward job
  13.  
  14. case BackendRegistration if !backends.contains(sender)
  15. context watch sender
  16. backends = backends :+ sender
  17.  
  18. case Terminated(a)
  19. backends = backends.filterNot(_ == a)
  20. }
  21. }

Note that the TransformationFrontend actor watch the registered backend to be able to remove it from its list of availble backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched actor.

This example is included in akka-samples/akka-sample-cluster and you can try by starting nodes in different terminal windows. For example, starting 2 frontend nodes and 3 backend nodes:

  1. sbt
  2.  
  3. project akka-sample-cluster-experimental
  4.  
  5. run-main sample.cluster.transformation.TransformationFrontend 2551
  6.  
  7. run-main sample.cluster.transformation.TransformationBackend 2552
  8.  
  9. run-main sample.cluster.transformation.TransformationBackend
  10.  
  11. run-main sample.cluster.transformation.TransformationBackend
  12.  
  13. run-main sample.cluster.transformation.TransformationFrontend

Note

The above example should probably be designed as two separate, frontend/backend, clusters, when there is a cluster client for decoupling clusters.

How To Startup when Cluster Size Reached

A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.

With a configuration option you can define required number of members before the leader changes member status of 'Joining' members to 'Up'.

  1. akka.cluster.min-nr-of-members = 3

You can start the actors in a registerOnMemberUp callback, which will be invoked when the current member status is changed tp 'Up', i.e. the cluster has at least the defined number of members.

  1. Cluster(system) registerOnMemberUp {
  2. system.actorOf(Props(new FactorialFrontend(upToN, repeat = true)),
  3. name = "factorialFrontend")
  4. }

This callback can be used for other things than starting actors.

Cluster Singleton Pattern

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

This can be implemented by subscribing to LeaderChanged events, but there are several corner cases to consider. Therefore, this specific use case is made easily accessible by the Cluster Singleton Pattern in the contrib module. You can use it as is, or adjust to fit your specific needs.

Failure Detector

The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. The heartbeat arrival times is interpreted by an implementation of The Phi Accrual Failure Detector.

The suspicion level of failure is given by a value called phi. The basic idea of the phi failure detector is to express the value of phi on a scale that is dynamically adjusted to reflect current network conditions.

The value of phi is calculated as:

  1. phi = -log10(1 - F(timeSinceLastHeartbeat))

where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times.

In the Configuration you can adjust the akka.cluster.failure-detector.threshold to define when a phi value is considered to be a failure.

A low threshold is prone to generate many false positives but ensures a quick detection in the event of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect actual crashes. The default threshold is 8 and is appropriate for most situations. However in cloud environments, such as Amazon EC2, the value could be increased to 12 in order to account for network issues that sometimes occur on such platforms.

The following chart illustrates how phi increase with increasing time since the previous heartbeat.

../_images/phi1.png

Phi is calculated from the mean and standard deviation of historical inter arrival times. The previous chart is an example for standard deviation of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper, i.e. it's possible to determine failure more quickly. The curve looks like this for a standard deviation of 100 ms.

../_images/phi2.png

To be able to survive sudden abnormalities, such as garbage collection pauses and transient network failures the failure detector is configured with a margin, akka.cluster.failure-detector.acceptable-heartbeat-pause. You may want to adjust the Configuration of this depending on you environment. This is how the curve looks like for acceptable-heartbeat-pause configured to 3 seconds.

../_images/phi3.png

Cluster Aware Routers

All routers can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. When a node becomes unavailble or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster additional routees are added to the router, according to the configuration.

When using a router with routees looked up on the cluster member nodes, i.e. the routees are already running, the configuration for a router looks like this:

  1. akka.actor.deployment {
  2. /statsService/workerRouter {
  3. router = consistent-hashing
  4. nr-of-instances = 100
  5. cluster {
  6. enabled = on
  7. routees-path = "/user/statsWorker"
  8. allow-local-routees = on
  9. }
  10. }
  11. }

It's the relative actor path defined in routees-path that identify what actor to lookup.

nr-of-instances defines total number of routees in the cluster, but there will not be more than one per node. Setting nr-of-instances to a high value will result in new routees added to the router when nodes join the cluster.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterConfig
  2. import akka.cluster.routing.ClusterRouterSettings
  3. import akka.routing.ConsistentHashingRouter
  4.  
  5. val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
  6. ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
  7. totalInstances = 100, routeesPath = "/user/statsWorker",
  8. allowLocalRoutees = true))),
  9. name = "workerRouter2")

When using a router with routees created and deployed on the cluster member nodes the configuration for a router looks like this:

  1. akka.actor.deployment {
  2. /singleton/statsService/workerRouter {
  3. router = consistent-hashing
  4. nr-of-instances = 100
  5. cluster {
  6. enabled = on
  7. max-nr-of-instances-per-node = 3
  8. allow-local-routees = off
  9. }
  10. }
  11. }

nr-of-instances defines total number of routees in the cluster, but the number of routees per node, max-nr-of-instances-per-node, will not be exceeded. Setting nr-of-instances to a high value will result in creating and deploying additional routees when new nodes join the cluster.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterConfig
  2. import akka.cluster.routing.ClusterRouterSettings
  3. import akka.routing.ConsistentHashingRouter
  4.  
  5. val workerRouter = context.actorOf(Props[StatsWorker].withRouter(
  6. ClusterRouterConfig(ConsistentHashingRouter(), ClusterRouterSettings(
  7. totalInstances = 100, maxInstancesPerNode = 3,
  8. allowLocalRoutees = false))),
  9. name = "workerRouter3")

See Configuration section for further descriptions of the settings.

Router Example with Lookup of Routees

Let's take a look at how to use cluster aware routers.

The example application provides a service to calculate statistics for a text. When some text is sent to the service it splits it into words, and delegates the task to count number of characters in each word to a separate worker, a routee of a router. The character count for each word is sent back to an aggregator that calculates the average number of characters per word when all results have been collected.

In this example we use the following imports:

  1. import language.postfixOps
  2. import scala.concurrent.forkjoin.ThreadLocalRandom
  3. import scala.concurrent.duration._
  4. import com.typesafe.config.ConfigFactory
  5. import akka.actor.Actor
  6. import akka.actor.ActorLogging
  7. import akka.actor.ActorRef
  8. import akka.actor.ActorSystem
  9. import akka.actor.Address
  10. import akka.actor.PoisonPill
  11. import akka.actor.Props
  12. import akka.actor.ReceiveTimeout
  13. import akka.actor.RelativeActorPath
  14. import akka.actor.RootActorPath
  15. import akka.cluster.Cluster
  16. import akka.cluster.ClusterEvent._
  17. import akka.cluster.MemberStatus
  18. import akka.contrib.pattern.ClusterSingletonManager
  19. import akka.routing.FromConfig
  20. import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
  21. import akka.pattern.ask
  22. import akka.pattern.pipe
  23. import akka.util.Timeout

Messages:

  1. case class StatsJob(text: String)
  2. case class StatsResult(meanWordLength: Double)
  3. case class JobFailed(reason: String)

The worker that counts number of characters in each word:

  1. class StatsWorker extends Actor {
  2. var cache = Map.empty[String, Int]
  3. def receive = {
  4. case word: String
  5. val length = cache.get(word) match {
  6. case Some(x) x
  7. case None
  8. val x = word.length
  9. cache += (word -> x)
  10. x
  11. }
  12.  
  13. sender ! length
  14. }
  15. }

The service that receives text from users and splits it up into words, delegates to workers and aggregates:

  1. class StatsService extends Actor {
  2. val workerRouter = context.actorOf(Props[StatsWorker].withRouter(FromConfig),
  3. name = "workerRouter")
  4.  
  5. def receive = {
  6. case StatsJob(text) if text != ""
  7. val words = text.split(" ")
  8. val replyTo = sender // important to not close over sender
  9. // create actor that collects replies from workers
  10. val aggregator = context.actorOf(Props(
  11. new StatsAggregator(words.size, replyTo)))
  12. words foreach { word
  13. workerRouter.tell(
  14. ConsistentHashableEnvelope(word, word), aggregator)
  15. }
  16. }
  17. }
  18.  
  19. class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
  20. var results = IndexedSeq.empty[Int]
  21. context.setReceiveTimeout(3 seconds)
  22.  
  23. def receive = {
  24. case wordCount: Int
  25. results = results :+ wordCount
  26. if (results.size == expectedResults) {
  27. val meanWordLength = results.sum.toDouble / results.size
  28. replyTo ! StatsResult(meanWordLength)
  29. context.stop(self)
  30. }
  31. case ReceiveTimeout
  32. replyTo ! JobFailed("Service unavailable, try again later")
  33. context.stop(self)
  34. }
  35. }

Note, nothing cluster specific so far, just plain actors.

We can use these actors with two different types of router setup. Either with lookup of routees, or with create and deploy of routees. Remember, routees are the workers in this case.

We start with the router setup with lookup of routees. All nodes start StatsService and StatsWorker actors and the router is configured with routees-path:

  1. val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
  2. akka.actor.deployment {
  3. /statsService/workerRouter {
  4. router = consistent-hashing
  5. nr-of-instances = 100
  6. cluster {
  7. enabled = on
  8. routees-path = "/user/statsWorker"
  9. allow-local-routees = on
  10. }
  11. }
  12. }
  13. """).withFallback(ConfigFactory.load()))
  14.  
  15. system.actorOf(Props[StatsWorker], name = "statsWorker")
  16. system.actorOf(Props[StatsService], name = "statsService")

This means that user requests can be sent to StatsService on any node and it will use StatsWorker on all nodes. There can only be one worker per node, but that worker could easily fan out to local children if more parallelism is needed.

This example is included in akka-samples/akka-sample-cluster and you can try by starting nodes in different terminal windows. For example, starting 3 service nodes and 1 client:

  1. sbt
  2.  
  3. project akka-sample-cluster-experimental
  4.  
  5. run-main sample.cluster.stats.StatsSample 2551
  6.  
  7. run-main sample.cluster.stats.StatsSample 2552
  8.  
  9. run-main sample.cluster.stats.StatsSampleClient
  10.  
  11. run-main sample.cluster.stats.StatsSample

Router Example with Remote Deployed Routees

The above setup is nice for this example, but we will also take a look at how to use a single master node that creates and deploys workers. To keep track of a single master we use the Cluster Singleton Pattern in the contrib module. The ClusterSingletonManager is started on each node.

  1. system.actorOf(Props(new ClusterSingletonManager(
  2. singletonProps = _ Props[StatsService], singletonName = "statsService",
  3. terminationMessage = PoisonPill)), name = "singleton")

We also need an actor on each node that keeps track of where current single master exists and delegates jobs to the StatsService.

  1. class StatsFacade extends Actor with ActorLogging {
  2. import context.dispatcher
  3. val cluster = Cluster(context.system)
  4.  
  5. var currentMaster: Option[Address] = None
  6.  
  7. // subscribe to cluster changes, LeaderChanged
  8. // re-subscribe when restart
  9. override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged])
  10. override def postStop(): Unit = cluster.unsubscribe(self)
  11.  
  12. def receive = {
  13. case job: StatsJob if currentMaster.isEmpty
  14. sender ! JobFailed("Service unavailable, try again later")
  15. case job: StatsJob
  16. implicit val timeout = Timeout(5.seconds)
  17. currentMaster foreach { address
  18. val service = context.actorFor(RootActorPath(address) /
  19. "user" / "singleton" / "statsService")
  20. service ? job recover {
  21. case _ JobFailed("Service unavailable, try again later")
  22. } pipeTo sender
  23. }
  24. case state: CurrentClusterState currentMaster = state.leader
  25. case LeaderChanged(leader) currentMaster = leader
  26. }
  27.  
  28. }

The StatsFacade receives text from users and delegates to the current StatsService, the single master. It listens to cluster events to lookup the StatsService on the leader node. The master runs on the same node as the leader of the cluster members, which is nothing more than the address currently sorted first in the member ring, i.e. it can change when new nodes join or when current leader leaves.

All nodes start StatsFacade and the ClusterSingletonManager. The router is now configured like this:

  1. val system = ActorSystem("ClusterSystem", ConfigFactory.parseString("""
  2. akka.actor.deployment {
  3. /singleton/statsService/workerRouter {
  4. router = consistent-hashing
  5. nr-of-instances = 100
  6. cluster {
  7. enabled = on
  8. max-nr-of-instances-per-node = 3
  9. allow-local-routees = off
  10. }
  11. }
  12. }
  13. """).withFallback(ConfigFactory.load()))

This example is included in akka-samples/akka-sample-cluster and you can try by starting nodes in different terminal windows. For example, starting 3 service nodes and 1 client:

  1. run-main sample.cluster.stats.StatsSampleOneMaster 2551
  2.  
  3. run-main sample.cluster.stats.StatsSampleOneMaster 2552
  4.  
  5. run-main sample.cluster.stats.StatsSampleOneMasterClient
  6.  
  7. run-main sample.cluster.stats.StatsSampleOneMaster

Note

The above example will be simplified when the cluster handles automatic actor partitioning.

Cluster Metrics

The member nodes of the cluster collects system health metrics and publishes that to other nodes and to registered subscribers. This information is primarily used for load-balancing routers.

Hyperic Sigar

The built-in metrics is gathered from JMX MBeans, and optionally you can use Hyperic Sigar for a wider and more accurate range of metrics compared to what can be retrieved from ordinary MBeans. Sigar is using a native OS library. To enable usage of Sigar you need to add the directory of the native library to -Djava.libarary.path=<path_of_sigar_libs> add the following dependency:

  1. "org.hyperic" % "sigar" % "1.6.4"

Adaptive Load Balancing

The AdaptiveLoadBalancingRouter performs load balancing of messages to cluster nodes based on the cluster metrics data. It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node. It can be configured to use a specific MetricsSelector to produce the probabilities, a.k.a. weights:

  • heap / HeapMetricsSelector - Used and max JVM heap memory. Weights based on remaining heap capacity; (max - used) / max
  • load / SystemLoadAverageMetricsSelector - System load average for the past 1 minute, corresponding value can be found in top of Linux systems. The system is possibly nearing a bottleneck if the system load average is nearing number of cpus/cores. Weights based on remaining load capacity; 1 - (load / processors)
  • cpu / CpuMetricsSelector - CPU utilization in percentage, sum of User + Sys + Nice + Wait. Weights based on remaining cpu capacity; 1 - utilization
  • mix / MixMetricsSelector - Combines heap, cpu and load. Weights based on mean of remaining capacity of the combined selectors.
  • Any custom implementation of akka.cluster.routing.MetricsSelector

The collected metrics values are smoothed with exponential weighted moving average. In the Configuration you can adjust how quickly past data is decayed compared to new data.

Let's take a look at this router in action.

In this example the following imports are used:

  1. import scala.annotation.tailrec
  2. import scala.concurrent.Future
  3. import com.typesafe.config.ConfigFactory
  4. import akka.actor.Actor
  5. import akka.actor.ActorLogging
  6. import akka.actor.ActorRef
  7. import akka.actor.ActorSystem
  8. import akka.actor.Props
  9. import akka.pattern.pipe
  10. import akka.routing.FromConfig

The backend worker that performs the factorial calculation:

  1. class FactorialBackend extends Actor with ActorLogging {
  2.  
  3. import context.dispatcher
  4.  
  5. def receive = {
  6. case (n: Int)
  7. Future(factorial(n)) map { result (n, result) } pipeTo sender
  8. }
  9.  
  10. def factorial(n: Int): BigInt = {
  11. @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
  12. if (n <= 1) acc
  13. else factorialAcc(acc * n, n - 1)
  14. }
  15. factorialAcc(BigInt(1), n)
  16. }
  17.  
  18. }

The frontend that receives user jobs and delegates to the backends via the router:

  1. class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
  2.  
  3. val backend = context.actorOf(Props[FactorialBackend].withRouter(FromConfig),
  4. name = "factorialBackendRouter")
  5.  
  6. override def preStart(): Unit = sendJobs()
  7.  
  8. def receive = {
  9. case (n: Int, factorial: BigInt)
  10. if (n == upToN) {
  11. log.debug("{}! = {}", n, factorial)
  12. if (repeat) sendJobs()
  13. }
  14. }
  15.  
  16. def sendJobs(): Unit = {
  17. log.info("Starting batch of factorials up to [{}]", upToN)
  18. 1 to upToN foreach { backend ! _ }
  19. }
  20. }

As you can see, the router is defined in the same way as other routers, and in this case it's configured as follows:

  1. akka.actor.deployment {
  2. /factorialFrontend/factorialBackendRouter = {
  3. router = adaptive
  4. # metrics-selector = heap
  5. # metrics-selector = load
  6. # metrics-selector = cpu
  7. metrics-selector = mix
  8. nr-of-instances = 100
  9. cluster {
  10. enabled = on
  11. routees-path = "/user/factorialBackend"
  12. allow-local-routees = off
  13. }
  14. }
  15. }

It's only router type adaptive and the metrics-selector that is specific to this router, other things work in the same way as other routers.

The same type of router could also have been defined in code:

  1. import akka.cluster.routing.ClusterRouterConfig
  2. import akka.cluster.routing.ClusterRouterSettings
  3. import akka.cluster.routing.AdaptiveLoadBalancingRouter
  4. import akka.cluster.routing.HeapMetricsSelector
  5.  
  6. val backend = context.actorOf(Props[FactorialBackend].withRouter(
  7. ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
  8. ClusterRouterSettings(
  9. totalInstances = 100, routeesPath = "/user/statsWorker",
  10. allowLocalRoutees = true))),
  11. name = "factorialBackendRouter2")
  1. import akka.cluster.routing.ClusterRouterConfig
  2. import akka.cluster.routing.ClusterRouterSettings
  3. import akka.cluster.routing.AdaptiveLoadBalancingRouter
  4. import akka.cluster.routing.SystemLoadAverageMetricsSelector
  5.  
  6. val backend = context.actorOf(Props[FactorialBackend].withRouter(
  7. ClusterRouterConfig(AdaptiveLoadBalancingRouter(
  8. SystemLoadAverageMetricsSelector), ClusterRouterSettings(
  9. totalInstances = 100, maxInstancesPerNode = 3,
  10. allowLocalRoutees = false))),
  11. name = "factorialBackendRouter3")

This example is included in akka-samples/akka-sample-cluster and you can try by starting nodes in different terminal windows. For example, starting 3 backend nodes and one frontend:

  1. sbt
  2.  
  3. project akka-sample-cluster-experimental
  4.  
  5. run-main sample.cluster.factorial.FactorialBackend 2551
  6.  
  7. run-main sample.cluster.factorial.FactorialBackend 2552
  8.  
  9. run-main sample.cluster.factorial.FactorialBackend
  10.  
  11. run-main sample.cluster.factorial.FactorialFrontend

Press ctrl-c in the terminal window of the frontend to stop the factorial calculations.

Subscribe to Metrics Events

It's possible to subscribe to the metrics events directly to implement other functionality.

  1. import akka.cluster.Cluster
  2. import akka.cluster.ClusterEvent.ClusterMetricsChanged
  3. import akka.cluster.ClusterEvent.CurrentClusterState
  4. import akka.cluster.NodeMetrics
  5. import akka.cluster.StandardMetrics.HeapMemory
  6. import akka.cluster.StandardMetrics.Cpu
  7.  
  8. class MetricsListener extends Actor with ActorLogging {
  9. val selfAddress = Cluster(context.system).selfAddress
  10.  
  11. // subscribe to ClusterMetricsChanged
  12. // re-subscribe when restart
  13. override def preStart(): Unit =
  14. Cluster(context.system).subscribe(self, classOf[ClusterMetricsChanged])
  15. override def postStop(): Unit =
  16. Cluster(context.system).unsubscribe(self)
  17.  
  18. def receive = {
  19. case ClusterMetricsChanged(clusterMetrics)
  20. clusterMetrics.filter(_.address == selfAddress) foreach { nodeMetrics
  21. logHeap(nodeMetrics)
  22. logCpu(nodeMetrics)
  23. }
  24. case state: CurrentClusterState // ignore
  25. }
  26.  
  27. def logHeap(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
  28. case HeapMemory(address, timestamp, used, committed, max)
  29. log.info("Used heap: {} MB", used.doubleValue / 1024 / 1024)
  30. case _ // no heap info
  31. }
  32.  
  33. def logCpu(nodeMetrics: NodeMetrics): Unit = nodeMetrics match {
  34. case Cpu(address, timestamp, Some(systemLoadAverage), cpuCombined, processors)
  35. log.info("Load: {} ({} processors)", systemLoadAverage, processors)
  36. case _ // no cpu info
  37. }
  38. }

Custom Metrics Collector

You can plug-in your own metrics collector instead of akka.cluster.SigarMetricsCollector or akka.cluster.JmxMetricsCollector. Look at those two implementations for inspiration. The implementation class can be defined in the Configuration.

How to Test

Multi Node Testing is useful for testing cluster applications.

Set up your project according to the instructions in Multi Node Testing and Multi JVM Testing, i.e. add the sbt-multi-jvm plugin and the dependency to akka-remote-tests-experimental.

First, as described in Multi Node Testing, we need some scaffolding to configure the MultiNodeSpec. Define the participating roles and their Configuration in an object extending MultiNodeConfig:

  1. import akka.remote.testkit.MultiNodeConfig
  2. import com.typesafe.config.ConfigFactory
  3.  
  4. object StatsSampleSpecConfig extends MultiNodeConfig {
  5. // register the named roles (nodes) of the test
  6. val first = role("first")
  7. val second = role("second")
  8. val third = role("thrid")
  9.  
  10. // this configuration will be used for all nodes
  11. // note that no fixed host names and ports are used
  12. commonConfig(ConfigFactory.parseString("""
  13. akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
  14. akka.remote.log-remote-lifecycle-events = off
  15. akka.cluster.auto-join = off
  16. # don't use sigar for tests, native lib not in path
  17. akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
  18. // router lookup config ...
  19. """))
  20.  
  21. }

Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be implemented differently, but often they are the same and extend an abstract test class, as illustrated here.

  1. // need one concrete test class per node
  2. class StatsSampleSpecMultiJvmNode1 extends StatsSampleSpec
  3. class StatsSampleSpecMultiJvmNode2 extends StatsSampleSpec
  4. class StatsSampleSpecMultiJvmNode3 extends StatsSampleSpec

Note the naming convention of these classes. The name of the classes must end with MultiJvmNode1, MultiJvmNode2 and so on. It's possible to define another suffix to be used by the sbt-multi-jvm, but the default should be fine in most cases.

Then the abstract MultiNodeSpec, which takes the MultiNodeConfig as constructor parameter.

  1. import org.scalatest.BeforeAndAfterAll
  2. import org.scalatest.WordSpec
  3. import org.scalatest.matchers.MustMatchers
  4. import akka.remote.testkit.MultiNodeSpec
  5. import akka.testkit.ImplicitSender
  6.  
  7. abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
  8. with WordSpec with MustMatchers with BeforeAndAfterAll
  9. with ImplicitSender {
  10.  
  11. import StatsSampleSpecConfig._
  12.  
  13. override def initialParticipants = roles.size
  14.  
  15. override def beforeAll() = multiNodeSpecBeforeAll()
  16.  
  17. override def afterAll() = multiNodeSpecAfterAll()

Most of this can of course be extracted to a separate trait to avoid repeating this in all your tests.

Typically you begin your test by starting up the cluster and let the members join, and create some actors. That can be done like this:

  1. "illustrate how to startup cluster" in within(15 seconds) {
  2. Cluster(system).subscribe(testActor, classOf[MemberUp])
  3. expectMsgClass(classOf[CurrentClusterState])
  4.  
  5. val firstAddress = node(first).address
  6. val secondAddress = node(second).address
  7. val thirdAddress = node(third).address
  8.  
  9. Cluster(system) join firstAddress
  10.  
  11. system.actorOf(Props[StatsWorker], "statsWorker")
  12. system.actorOf(Props[StatsService], "statsService")
  13.  
  14. expectMsgAllOf(
  15. MemberUp(Member(firstAddress, MemberStatus.Up)),
  16. MemberUp(Member(secondAddress, MemberStatus.Up)),
  17. MemberUp(Member(thirdAddress, MemberStatus.Up)))
  18.  
  19. Cluster(system).unsubscribe(testActor)
  20.  
  21. testConductor.enter("all-up")
  22. }

From the test you interact with the cluster using the Cluster extension, e.g. join.

  1. Cluster(system) join firstAddress

Notice how the testActor from testkit is added as subscriber to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.

The above code was running for all roles (JVMs). runOn is a convenient utility to declare that a certain block of code should only run for a specific role.

  1. "show usage of the statsService from one node" in within(15 seconds) {
  2. runOn(second) {
  3. assertServiceOk
  4. }
  5.  
  6. testConductor.enter("done-2")
  7. }
  8.  
  9. def assertServiceOk: Unit = {
  10. val service = system.actorFor(node(third) / "user" / "statsService")
  11. // eventually the service should be ok,
  12. // first attempts might fail because worker actors not started yet
  13. awaitCond {
  14. service ! StatsJob("this is the text that will be analyzed")
  15. expectMsgPF() {
  16. case unavailble: JobFailed false
  17. case StatsResult(meanWordLength)
  18. meanWordLength must be(3.875 plusOrMinus 0.001)
  19. true
  20. }
  21. }
  22.  
  23. }

Once again we take advantage of the facilities in testkit to verify expected behavior. Here using testActor as sender (via ImplicitSender) and verifing the reply with expectMsgPF.

In the above code you can see node(third), which is useful facility to get the root actor reference of the actor system for a specific role. This can also be used to grab the akka.actor.Address of that node.

  1. val firstAddress = node(first).address
  2. val secondAddress = node(second).address
  3. val thirdAddress = node(third).address

JMX

Information and management of the cluster is available as JMX MBeans with the root name akka.Cluster. The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM.

From JMX you can:

  • see what members that are part of the cluster
  • see status of this node
  • join this node to another node in cluster
  • mark any node in the cluster as down
  • tell any node in the cluster to leave

Member nodes are identified with their address, in format akka://actor-system-name@hostname:port.

Command Line Management

The cluster can be managed with the script bin/akka-cluster provided in the Akka distribution.

Run it without parameters to see instructions about how to use the script:

  1. Usage: bin/akka-cluster <node-hostname:jmx-port> <command> ...
  2.  
  3. Supported commands are:
  4. join <node-url> - Sends request a JOIN node with the specified URL
  5. leave <node-url> - Sends a request for node with URL to LEAVE the cluster
  6. down <node-url> - Sends a request for marking node with URL as DOWN
  7. member-status - Asks the member node for its current status
  8. members - Asks the cluster for addresses of current members
  9. unreachable - Asks the cluster for addresses of unreachable members
  10. cluster-status - Asks the cluster for its current status (member ring,
  11. unavailable nodes, meta data etc.)
  12. leader - Asks the cluster who the current leader is
  13. is-singleton - Checks if the cluster is a singleton cluster (single
  14. node cluster)
  15. is-available - Checks if the member node is available
  16. Where the <node-url> should be on the format of
  17. 'akka://actor-system-name@hostname:port'
  18.  
  19. Examples: bin/akka-cluster localhost:9999 is-available
  20. bin/akka-cluster localhost:9999 join akka://MySystem@darkstar:2552
  21. bin/akka-cluster localhost:9999 cluster-status

To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, as described in Monitoring and Management Using JMX Technology

Example of system properties to enable remote monitoring and management:

  1. java -Dcom.sun.management.jmxremote.port=9999 \
  2. -Dcom.sun.management.jmxremote.authenticate=false \
  3. -Dcom.sun.management.jmxremote.ssl=false

Configuration

There are several configuration properties for the cluster. We refer to the following reference file for more information:

  1. ######################################
  2. # Akka Cluster Reference Config File #
  3. ######################################
  4.  
  5. # This is the reference config file that contains all the default settings.
  6. # Make your edits/overrides in your application.conf.
  7.  
  8. akka {
  9.  
  10. cluster {
  11. # Initial contact points of the cluster.
  12. # The nodes to join at startup if auto-join = on.
  13. # Comma separated full URIs defined by a string on the form of
  14. # "akka://system@hostname:port"
  15. # Leave as empty if the node should be a singleton cluster.
  16. seed-nodes = []
  17.  
  18. # how long to wait for one of the seed nodes to reply to initial join request
  19. seed-node-timeout = 5s
  20.  
  21. # Automatic join the seed-nodes at startup.
  22. # If seed-nodes is empty it will join itself and become a single node cluster.
  23. auto-join = on
  24.  
  25. # Should the 'leader' in the cluster be allowed to automatically mark
  26. # unreachable nodes as DOWN?
  27. # Using auto-down implies that two separate clusters will automatically be
  28. # formed in case of network partition.
  29. auto-down = off
  30.  
  31. # Minimum required number of members before the leader changes member status
  32. # of 'Joining' members to 'Up'. Typically used together with
  33. # 'Cluster.registerOnMemberUp' to defer some action, such as starting actors,
  34. # until the cluster has reached a certain size.
  35. min-nr-of-members = 1
  36.  
  37. # Enable or disable JMX MBeans for management of the cluster
  38. jmx.enabled = on
  39.  
  40. # how long should the node wait before starting the periodic tasks
  41. # maintenance tasks?
  42. periodic-tasks-initial-delay = 1s
  43.  
  44. # how often should the node send out gossip information?
  45. gossip-interval = 1s
  46.  
  47. # how often should the leader perform maintenance tasks?
  48. leader-actions-interval = 1s
  49.  
  50. # how often should the node move nodes, marked as unreachable by the failure
  51. # detector, out of the membership ring?
  52. unreachable-nodes-reaper-interval = 1s
  53.  
  54. # How often the current internal stats should be published.
  55. # A value of 0 s can be used to always publish the stats, when it happens.
  56. publish-stats-interval = 10s
  57.  
  58. # The id of the dispatcher to use for cluster actors. If not specified
  59. # default dispatcher is used.
  60. # If specified you need to define the settings of the actual dispatcher.
  61. use-dispatcher = ""
  62.  
  63. # Gossip to random node with newer or older state information, if any with
  64. # this probability. Otherwise Gossip to any random live node.
  65. # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always.
  66. gossip-different-view-probability = 0.8
  67.  
  68. # Limit number of merge conflicts per second that are handled. If the limit is
  69. # exceeded the conflicting gossip messages are dropped and will reappear later.
  70. max-gossip-merge-rate = 5.0
  71.  
  72. failure-detector {
  73.  
  74. # FQCN of the failure detector implementation.
  75. # It must implement akka.cluster.FailureDetector and
  76. # have constructor with akka.actor.ActorSystem and
  77. # akka.cluster.ClusterSettings parameters
  78. implementation-class = "akka.cluster.AccrualFailureDetector"
  79.  
  80. # how often should the node send out heartbeats?
  81. heartbeat-interval = 1s
  82.  
  83. # Number of member nodes that each member will send heartbeat messages to,
  84. # i.e. each node will be monitored by this number of other nodes.
  85. monitored-by-nr-of-members = 5
  86.  
  87. # defines the failure detector threshold
  88. # A low threshold is prone to generate many wrong suspicions but ensures
  89. # a quick detection in the event of a real crash. Conversely, a high
  90. # threshold generates fewer mistakes but needs more time to detect
  91. # actual crashes
  92. threshold = 8.0
  93.  
  94. # Minimum standard deviation to use for the normal distribution in
  95. # AccrualFailureDetector. Too low standard deviation might result in
  96. # too much sensitivity for sudden, but normal, deviations in heartbeat
  97. # inter arrival times.
  98. min-std-deviation = 100 ms
  99.  
  100. # Number of potentially lost/delayed heartbeats that will be
  101. # accepted before considering it to be an anomaly.
  102. # It is a factor of heartbeat-interval.
  103. # This margin is important to be able to survive sudden, occasional,
  104. # pauses in heartbeat arrivals, due to for example garbage collect or
  105. # network drop.
  106. acceptable-heartbeat-pause = 3s
  107.  
  108. # Number of samples to use for calculation of mean and standard deviation of
  109. # inter-arrival times.
  110. max-sample-size = 1000
  111.  
  112. # When a node stops sending heartbeats to another node it will end that
  113. # with this number of EndHeartbeat messages, which will remove the
  114. # monitoring from the failure detector.
  115. nr-of-end-heartbeats = 8
  116.  
  117. # When no expected heartbeat message has been received an explicit
  118. # heartbeat request is sent to the node that should emit heartbeats.
  119. heartbeat-request {
  120. # Grace period until an explicit heartbeat request is sent
  121. grace-period = 10 s
  122.  
  123. # After the heartbeat request has been sent the first failure detection
  124. # will start after this period, even though no heartbeat mesage has
  125. # been received.
  126. expected-response-after = 3 s
  127.  
  128. # Cleanup of obsolete heartbeat requests
  129. time-to-live = 60 s
  130. }
  131. }
  132.  
  133. metrics {
  134. # Enable or disable metrics collector for load-balancing nodes.
  135. enabled = on
  136.  
  137. # FQCN of the metrics collector implementation.
  138. # It must implement akka.cluster.cluster.MetricsCollector and
  139. # have constructor with akka.actor.ActorSystem parameter.
  140. # The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR
  141. # is on the classpath, otherwise only JMX.
  142. collector-class = "akka.cluster.SigarMetricsCollector"
  143.  
  144. # How often metrics are sampled on a node.
  145. # Shorter interval will collect the metrics more often.
  146. collect-interval = 3s
  147.  
  148. # How often a node publishes metrics information.
  149. gossip-interval = 3s
  150.  
  151. # How quickly the exponential weighting of past data is decayed compared to
  152. # new data. Set lower to increase the bias toward newer values.
  153. # The relevance of each data sample is halved for every passing half-life duration,
  154. # i.e. after 4 times the half-life, a data sample’s relevance is reduced to 6% of
  155. # its original relevance. The initial relevance of a data sample is given by
  156. # 1 – 0.5 ^ (collect-interval / half-life).
  157. # See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
  158. moving-average-half-life = 12s
  159. }
  160.  
  161. # If the tick-duration of the default scheduler is longer than the
  162. # tick-duration configured here a dedicated scheduler will be used for
  163. # periodic tasks of the cluster, otherwise the default scheduler is used.
  164. # See akka.scheduler settings for more details about the HashedWheelTimer.
  165. scheduler {
  166. tick-duration = 33ms
  167. ticks-per-wheel = 512
  168. }
  169.  
  170. # Netty blocks when sending to broken connections, and this circuit breaker
  171. # is used to reduce connect attempts to broken connections.
  172. send-circuit-breaker {
  173. max-failures = 3
  174. call-timeout = 2 s
  175. reset-timeout = 30 s
  176. }
  177. }
  178.  
  179. # Default configuration for routers
  180. actor.deployment.default {
  181. # MetricsSelector to use
  182. # - available: "mix", "heap", "cpu", "load"
  183. # - or: Fully qualified class name of the MetricsSelector class.
  184. # The class must extend akka.cluster.routing.MetricsSelector
  185. # and have a constructor with com.typesafe.config.Config
  186. # parameter.
  187. # - default is "mix"
  188. metrics-selector = mix
  189. }
  190. actor.deployment.default.cluster {
  191. # enable cluster aware router that deploys to nodes in the cluster
  192. enabled = off
  193.  
  194. # Maximum number of routees that will be deployed on each cluster
  195. # member node.
  196. # Note that nr-of-instances defines total number of routees, but
  197. # number of routees per node will not be exceeded, i.e. if you
  198. # define nr-of-instances = 50 and max-nr-of-instances-per-node = 2
  199. # it will deploy 2 routees per new member in the cluster, up to
  200. # 25 members.
  201. max-nr-of-instances-per-node = 1
  202.  
  203. # Defines if routees are allowed to be located on the same node as
  204. # the head router actor, or only on remote nodes.
  205. # Useful for master-worker scenario where all routees are remote.
  206. allow-local-routees = on
  207.  
  208. # Actor path of the routees to lookup with actorFor on the member
  209. # nodes in the cluster. E.g. "/user/myservice". If this isn't defined
  210. # the routees will be deployed instead of looked up.
  211. # max-nr-of-instances-per-node should not be configured (default value is 1)
  212. # when routees-path is defined.
  213. routees-path = ""
  214.  
  215. }
  216.  
  217. }

Cluster Scheduler

It is recommended that you change the tick-duration to 33 ms or less of the default scheduler when using cluster, if you don't need to have it configured to a longer duration for other reasons. If you don't do this a dedicated scheduler will be used for periodic tasks of the cluster, which introduce the extra overhead of another thread.

  1. # shorter tick-duration of default scheduler when using cluster
  2. akka.scheduler.tick-duration = 33ms