Cluster Sharding

Dependency

To use Akka Cluster Sharding Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-sharding-typed_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-cluster-sharding-typed_2.12:2.5.32"
}

Introduction

For an introduction to Sharding concepts see Cluster Sharding. This documentation shows how to use the typed Cluster Sharding API.

Note

This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.

Basic example

Sharding is accessed via the ClusterSharding extension

Scala
sourceimport akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EntityRef

val sharding = ClusterSharding(system)
Java
sourceimport akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;

ClusterSharding sharding = ClusterSharding.get(system);

It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:

Scala
sourcetrait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand

def counter(entityId: String, value: Int): Behavior[CounterCommand] =
  Behaviors.receiveMessage[CounterCommand] {
    case Increment =>
      counter(entityId, value + 1)
    case GetValue(replyTo) =>
      replyTo ! value
      Behaviors.same
  }
Java
sourceinterface CounterCommand {}

public static class Increment implements CounterCommand {}

public static class GetValue implements CounterCommand {
  private final ActorRef<Integer> replyTo;

  public GetValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;
  }
}

public static Behavior<CounterCommand> counter(String entityId, Integer value) {
  return Behaviors.receive(CounterCommand.class)
      .onMessage(
          Increment.class,
          (ctx, msg) -> {
            return counter(entityId, value + 1);
          })
      .onMessage(
          GetValue.class,
          (ctx, msg) -> {
            msg.replyTo.tell(value);
            return Behaviors.same();
          })
      .build();
}

Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.

Scala
sourceval TypeKey = EntityTypeKey[CounterCommand]("Counter")

val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] =
  sharding.init(Entity(typeKey = TypeKey, createBehavior = ctx => counter(ctx.entityId, 0)))
Java
sourceEntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");

ActorRef<ShardingEnvelope<CounterCommand>> shardRegion =
    sharding.init(Entity.of(typeKey, ctx -> counter(ctx.getEntityId(), 0)));

Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a ShardingEnvelop or define extractor functions and send messages directly to the shard region.

Scala
source// With an EntityRef
val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1")
counterOne ! Increment

// Entity id is specified via an `ShardingEnvelope`
shardRegion ! ShardingEnvelope("counter-1", Increment)
Java
sourceEntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
counterOne.tell(new Increment());

shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));

Persistence example

When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of an actor after it has moved.

Akka Persistence is based on the single-writer principle, for a particular persitenceId only one persistent actor instance should be active. If multiple instances were to persist events at the same time, the events would be interleaved and might not be interpreted correctly on replay. Cluster sharding is typically used together with persistence to ensure that there is only one active entity for each persistenceId (entityId).

Here is an example of a persistent actor that is used as a sharded entity:

Scala
sourceimport akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.scaladsl.Effect

object HelloWorld {

  // Command
  trait Command
  final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command
  // Response
  final case class Greeting(whom: String, numberOfPeople: Int)

  // Event
  final case class Greeted(whom: String)

  // State
  private final case class KnownPeople(names: Set[String]) {
    def add(name: String): KnownPeople = copy(names = names + name)

    def numberOfPeople: Int = names.size
  }

  private val commandHandler: (KnownPeople, Command) => Effect[Greeted, KnownPeople] = { (_, cmd) =>
    cmd match {
      case cmd: Greet => greet(cmd)
    }
  }

  private def greet(cmd: Greet): Effect[Greeted, KnownPeople] =
    Effect.persist(Greeted(cmd.whom)).thenRun(state => cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople))

  private val eventHandler: (KnownPeople, Greeted) => KnownPeople = { (state, evt) =>
    state.add(evt.whom)
  }

  val entityTypeKey: EntityTypeKey[Command] =
    EntityTypeKey[Command]("HelloWorld")

  def persistentEntity(entityId: String): Behavior[Command] =
    EventSourcedEntity(
      entityTypeKey = entityTypeKey,
      entityId = entityId,
      emptyState = KnownPeople(Set.empty),
      commandHandler,
      eventHandler)

}
Java
sourceimport akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntity;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;

public static class HelloWorld
    extends EventSourcedEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {

  // Command
  interface Command {}

  public static final class Greet implements Command {
    public final String whom;
    public final ActorRef<Greeting> replyTo;

    public Greet(String whom, ActorRef<Greeting> replyTo) {
      this.whom = whom;
      this.replyTo = replyTo;
    }
  }

  // Response
  public static final class Greeting {
    public final String whom;
    public final int numberOfPeople;

    public Greeting(String whom, int numberOfPeople) {
      this.whom = whom;
      this.numberOfPeople = numberOfPeople;
    }
  }

  // Event
  public static final class Greeted {
    public final String whom;

    public Greeted(String whom) {
      this.whom = whom;
    }
  }

  // State
  static final class KnownPeople {
    private Set<String> names = Collections.emptySet();

    KnownPeople() {}

    private KnownPeople(Set<String> names) {
      this.names = names;
    }

    KnownPeople add(String name) {
      Set<String> newNames = new HashSet<>(names);
      newNames.add(name);
      return new KnownPeople(newNames);
    }

    int numberOfPeople() {
      return names.size();
    }
  }

  public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
      EntityTypeKey.create(Command.class, "HelloWorld");

  public HelloWorld(ActorContext<Command> ctx, String entityId) {
    super(ENTITY_TYPE_KEY, entityId);
  }

  @Override
  public KnownPeople emptyState() {
    return new KnownPeople();
  }

  @Override
  public CommandHandler<Command, Greeted, KnownPeople> commandHandler() {
    return newCommandHandlerBuilder().forAnyState().onCommand(Greet.class, this::greet).build();
  }

  private Effect<Greeted, KnownPeople> greet(KnownPeople state, Greet cmd) {
    return Effect()
        .persist(new Greeted(cmd.whom))
        .thenRun(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople())));
  }

  @Override
  public EventHandler<KnownPeople, Greeted> eventHandler() {
    return (state, evt) -> state.add(evt.whom);
  }
}

Note that EventSourcedEntity is used in this example. Any Behavior can be used as a sharded entity actor, but the combination of sharding and persistent actors is very common and therefore the EventSourcedEntity factoryclass is provided as convenience. It selects the persistenceId automatically from the EntityTypeKey and entityId constructor parameters by using EntityTypeKey.persistenceIdFrom.

To initialize and use the entity:

Scala
sourceimport akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.util.Timeout

class HelloWorldService(system: ActorSystem[_]) {
  import system.executionContext

  // registration at startup
  private val sharding = ClusterSharding(system)

  sharding.init(
    Entity(
      typeKey = HelloWorld.entityTypeKey,
      createBehavior = entityContext => HelloWorld.persistentEntity(entityContext.entityId)))

  private implicit val askTimeout: Timeout = Timeout(5.seconds)

  def greet(worldId: String, whom: String): Future[Int] = {
    val entityRef = sharding.entityRefFor(HelloWorld.entityTypeKey, worldId)
    val greeting = entityRef ? HelloWorld.Greet(whom)
    greeting.map(_.numberOfPeople)
  }

}
Java
sourceimport akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.util.Timeout;

public static class HelloWorldService {
  private final ActorSystem<?> system;
  private final ClusterSharding sharding;
  private final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5));

  // registration at startup
  public HelloWorldService(ActorSystem<?> system) {
    this.system = system;
    sharding = ClusterSharding.get(system);

    sharding.init(
        Entity.ofPersistentEntity(
            HelloWorld.ENTITY_TYPE_KEY,
            ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
  }

  // usage example
  public CompletionStage<Integer> sayHello(String worldId, String whom) {
    EntityRef<HelloWorld.Command> entityRef =
        sharding.entityRefFor(HelloWorld.ENTITY_TYPE_KEY, worldId);
    CompletionStage<HelloWorld.Greeting> result =
        entityRef.ask(replyTo -> new HelloWorld.Greet(whom, replyTo), askTimeout);
    return result.thenApply(greeting -> greeting.numberOfPeople);
  }
}

Sending messages to persistent entities is the same as if the entity wasn’t persistent. The only difference is when an entity is moved the state will be restored. In the above example ask is used but tell or any of the other Interaction Patterns can be used.

See persistence for more details.

Passivation

If the state of the entities are persistent you may stop entities that are not used to reduce memory consumption. This is done by the application specific implementation of the entity actors for example by defining receive timeout (context.setReceiveTimeout). If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without losing such messages the entity actor can send ClusterSharding.Passivate to to the ActorRef[ShardCommand]ActorRef<ShardCommand> that was passed in to the factory method when creating the entity. The optional stopMessage message will be sent back to the entity, which is then supposed to stop itself, otherwise it will be stopped automatically. Incoming messages will be buffered by the Shard between reception of Passivate and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity.

Scala
sourcetrait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand

case object Idle extends CounterCommand
case object GoodByeCounter extends CounterCommand

@silent
def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
  Behaviors.setup { ctx =>
    def become(value: Int): Behavior[CounterCommand] =
      Behaviors.receiveMessage[CounterCommand] {
        case Increment =>
          become(value + 1)
        case GetValue(replyTo) =>
          replyTo ! value
          Behaviors.same
        case Idle =>
          // after receive timeout
          shard ! ClusterSharding.Passivate(ctx.self)
          Behaviors.same
        case GoodByeCounter =>
          // the stopMessage, used for rebalance and passivate
          Behaviors.stopped
      }

    ctx.setReceiveTimeout(30.seconds, Idle)
    become(0)
  }
}

sharding.init(
  Entity(typeKey = TypeKey, createBehavior = ctx => counter2(ctx.shard, ctx.entityId))
    .withStopMessage(GoodByeCounter))
Java
sourceinterface CounterCommand {}

public static class Increment implements CounterCommand {}

public static class GetValue implements CounterCommand {
  private final ActorRef<Integer> replyTo;

  public GetValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;
  }
}
public static class Idle implements CounterCommand {}

public static class GoodByeCounter implements CounterCommand {}

public static Behavior<CounterCommand> counter2(
    ActorRef<ClusterSharding.ShardCommand> shard, String entityId) {
  return Behaviors.setup(
      ctx -> {
        ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle());
        return counter2(shard, entityId, 0);
      });
}

private static Behavior<CounterCommand> counter2(
    ActorRef<ClusterSharding.ShardCommand> shard, String entityId, Integer value) {
  return Behaviors.receive(CounterCommand.class)
      .onMessage(
          Increment.class,
          (ctx, msg) -> {
            return counter(entityId, value + 1);
          })
      .onMessage(
          GetValue.class,
          (ctx, msg) -> {
            msg.replyTo.tell(value);
            return Behaviors.same();
          })
      .onMessage(
          Idle.class,
          (ctx, msg) -> {
            // after receive timeout
            shard.tell(new ClusterSharding.Passivate<>(ctx.getSelf()));
            return Behaviors.same();
          })
      .onMessage(
          GoodByeCounter.class,
          (ctx, msg) -> {
            // the stopMessage, used for rebalance and passivate
            return Behaviors.stopped();
          })
      .build();
}

EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");

sharding.init(
    Entity.of(typeKey, ctx -> counter2(ctx.getShard(), ctx.getEntityId()))
        .withStopMessage(new GoodByeCounter()));

Note that in the above example the stopMessage is specified as GoodByeCounter. That message will be sent to the entity when it’s supposed to stop itself due to rebalance or passivation. If the stopMessage is not defined it will be stopped automatically without receiving a specific message. It can be useful to define a custom stop message if the entity needs to perform some asynchronous cleanup or interactions before stopping.

Automatic Passivation

The entities can be configured to be automatically passivated if they haven’t received a message for a while using the akka.cluster.sharding.passivate-idle-entity-after setting, or by explicitly setting ClusterShardingSettings.passivateIdleEntityAfter to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the ActorRef of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.