Cluster Sharding

Dependency

To use Akka Cluster Sharding Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.5.16"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-sharding-typed_2.11</artifactId>
  <version>2.5.16</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-sharding-typed_2.11', version: '2.5.16'
}

Introduction

For an introduction to Sharding concepts see Cluster Sharding. This documentation shows how to use the typed Cluster Sharding API.

Warning

This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.

Basic example

Sharding is accessed via the ClusterSharding extension

Scala
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EntityRef

val sharding = ClusterSharding(system)
Full source at GitHub
Java
import akka.cluster.sharding.typed.ClusterShardingSettings;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;

ClusterSharding sharding = ClusterSharding.get(system);
Full source at GitHub

It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:

Scala
trait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand

def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] {
  case Increment ⇒
    counter(entityId, value + 1)
  case GetValue(replyTo) ⇒
    replyTo ! value
    Behaviors.same
}
Full source at GitHub
Java
interface CounterCommand {}
public static class Increment implements CounterCommand { }
public static class GoodByeCounter implements CounterCommand { }

public static class GetValue implements CounterCommand {
  private final ActorRef<Integer> replyTo;
  public GetValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;
  }
}

public static Behavior<CounterCommand> counter(String entityId, Integer value) {
  return Behaviors.receive(CounterCommand.class)
    .onMessage(Increment.class, (ctx, msg) -> {
      return counter(entityId,value + 1);
    })
    .onMessage(GetValue.class, (ctx, msg) -> {
      msg.replyTo.tell(value);
      return Behaviors.same();
    })
    .build();
}
Full source at GitHub

Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.

Scala
val TypeKey = EntityTypeKey[CounterCommand]("Counter")
// if a extractor is defined then the type would be ActorRef[BasicCommand]
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn(
  behavior = entityId ⇒ counter(entityId, 0),
  props = Props.empty,
  typeKey = TypeKey,
  settings = ClusterShardingSettings(system),
  maxNumberOfShards = 10,
  handOffStopMessage = GoodByeCounter)
Full source at GitHub
Java
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
  entityId -> counter(entityId,0),
  Props.empty(),
  typeKey,
  ClusterShardingSettings.create(system),
  10,
  new GoodByeCounter());
Full source at GitHub

Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a ShardingEnvelop or define extractor functions and send messages directly to the shard region.

Scala
// With an EntityRef
val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1")
counterOne ! Increment

// Entity id is specified via an `ShardingEnvelope`
shardRegion ! ShardingEnvelope("counter-1", Increment)
Full source at GitHub
Java
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
counterOne.tell(new Increment());

shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
Full source at GitHub

Persistence example

When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the Java API here.

Taking the larger example from the persistence documentation and making it into a sharded entity is the same as for a non persistent behavior. The behavior:

Scala
def behavior(entityId: String): Behavior[BlogCommand] =
  PersistentBehaviors.receive[BlogCommand, BlogEvent, BlogState](
    persistenceId = "Blog-" + entityId,
    emptyState = BlogState.empty,
    commandHandler,
    eventHandler)
Full source at GitHub

To create the entity:

Scala
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).spawn[BlogCommand](
  behavior = entityId ⇒ InDepthPersistentBehaviorSpec.behavior(entityId),
  props = Props.empty,
  typeKey = ShardingTypeName,
  settings = ClusterShardingSettings(system),
  maxNumberOfShards = 100,
  handOffStopMessage = PassivatePost)
Full source at GitHub

Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. See persistence for more details.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.