Cluster Sharding
Dependency
To use Akka Cluster Sharding Typed, you must add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.5.32"
- Maven
- Gradle
Introduction
For an introduction to Sharding concepts see Cluster Sharding. This documentation shows how to use the typed Cluster Sharding API.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Basic example
Sharding is accessed via the ClusterSharding
extension
- Scala
-
source
import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityRef val sharding = ClusterSharding(system)
- Java
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:
- Scala
-
source
trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] { case Increment => counter(entityId, value + 1) case GetValue(replyTo) => replyTo ! value Behaviors.same }
- Java
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
- Scala
-
source
val TypeKey = EntityTypeKey[CounterCommand]("Counter") val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.init(Entity(typeKey = TypeKey, createBehavior = ctx => counter(ctx.entityId, 0)))
- Java
Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a ShardingEnvelop
or define extractor functions and send messages directly to the shard region.
- Scala
-
source
// With an EntityRef val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1") counterOne ! Increment // Entity id is specified via an `ShardingEnvelope` shardRegion ! ShardingEnvelope("counter-1", Increment)
- Java
Persistence example
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of an actor after it has moved.
Akka Persistence is based on the single-writer principle, for a particular persitenceId
only one persistent actor instance should be active. If multiple instances were to persist events at the same time, the events would be interleaved and might not be interpreted correctly on replay. Cluster sharding is typically used together with persistence to ensure that there is only one active entity for each persistenceId
(entityId
).
Here is an example of a persistent actor that is used as a sharded entity:
- Scala
-
source
import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity import akka.persistence.typed.scaladsl.Effect object HelloWorld { // Command trait Command final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command // Response final case class Greeting(whom: String, numberOfPeople: Int) // Event final case class Greeted(whom: String) // State private final case class KnownPeople(names: Set[String]) { def add(name: String): KnownPeople = copy(names = names + name) def numberOfPeople: Int = names.size } private val commandHandler: (KnownPeople, Command) => Effect[Greeted, KnownPeople] = { (_, cmd) => cmd match { case cmd: Greet => greet(cmd) } } private def greet(cmd: Greet): Effect[Greeted, KnownPeople] = Effect.persist(Greeted(cmd.whom)).thenRun(state => cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople)) private val eventHandler: (KnownPeople, Greeted) => KnownPeople = { (state, evt) => state.add(evt.whom) } val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("HelloWorld") def persistentEntity(entityId: String): Behavior[Command] = EventSourcedEntity( entityTypeKey = entityTypeKey, entityId = entityId, emptyState = KnownPeople(Set.empty), commandHandler, eventHandler) }
- Java
Note that EventSourcedEntity
is used in this example. Any Behavior
can be used as a sharded entity actor, but the combination of sharding and persistent actors is very common and therefore the EventSourcedEntity
factory is provided as convenience. It selects the persistenceId
automatically from the EntityTypeKey
and entityId
parameters by using EntityTypeKey.persistenceIdFrom
.
To initialize and use the entity:
- Scala
-
source
import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.util.Timeout class HelloWorldService(system: ActorSystem[_]) { import system.executionContext // registration at startup private val sharding = ClusterSharding(system) sharding.init( Entity( typeKey = HelloWorld.entityTypeKey, createBehavior = entityContext => HelloWorld.persistentEntity(entityContext.entityId))) private implicit val askTimeout: Timeout = Timeout(5.seconds) def greet(worldId: String, whom: String): Future[Int] = { val entityRef = sharding.entityRefFor(HelloWorld.entityTypeKey, worldId) val greeting = entityRef ? HelloWorld.Greet(whom) greeting.map(_.numberOfPeople) } }
- Java
Sending messages to persistent entities is the same as if the entity wasn’t persistent. The only difference is when an entity is moved the state will be restored. In the above example ask is used but tell
or any of the other Interaction Patterns can be used.
See persistence for more details.
Passivation
If the state of the entities are persistent you may stop entities that are not used to reduce memory consumption. This is done by the application specific implementation of the entity actors for example by defining receive timeout (context.setReceiveTimeout
). If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without losing such messages the entity actor can send ClusterSharding.Passivate
to to the ActorRef[ShardCommand]
that was passed in to the factory method when creating the entity. The optional stopMessage
message will be sent back to the entity, which is then supposed to stop itself, otherwise it will be stopped automatically. Incoming messages will be buffered by the Shard
between reception of Passivate
and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity.
- Scala
-
source
trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand case object Idle extends CounterCommand case object GoodByeCounter extends CounterCommand @silent def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { Behaviors.setup { ctx => def become(value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] { case Increment => become(value + 1) case GetValue(replyTo) => replyTo ! value Behaviors.same case Idle => // after receive timeout shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case GoodByeCounter => // the stopMessage, used for rebalance and passivate Behaviors.stopped } ctx.setReceiveTimeout(30.seconds, Idle) become(0) } } sharding.init( Entity(typeKey = TypeKey, createBehavior = ctx => counter2(ctx.shard, ctx.entityId)) .withStopMessage(GoodByeCounter))
- Java
Note that in the above example the stopMessage
is specified as GoodByeCounter
. That message will be sent to the entity when it’s supposed to stop itself due to rebalance or passivation. If the stopMessage
is not defined it will be stopped automatically without receiving a specific message. It can be useful to define a custom stop message if the entity needs to perform some asynchronous cleanup or interactions before stopping.
Automatic Passivation
The entities can be configured to be automatically passivated if they haven’t received a message for a while using the akka.cluster.sharding.passivate-idle-entity-after
setting, or by explicitly setting ClusterShardingSettings.passivateIdleEntityAfter
to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the ActorRef
of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled.