Sharded Daemon Process

Module info

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

To use Akka Sharded Daemon Process, you must add the following dependency in your project:

sbt
val AkkaVersion = "2.9.2"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-bom_${scala.binary.version}</artifactId>
      <version>2.9.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.9.2")

  implementation "com.typesafe.akka:akka-cluster-sharding-typed_${versions.ScalaBinary}"
}
Project Info: Akka Cluster Sharding (typed)
Artifact
com.typesafe.akka
akka-cluster-sharding-typed
2.9.2
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.1
JPMS module nameakka.cluster.sharding.typed
License
Readiness level
Supported, support is available from Lightbend
Since 2.6.0, 2019-11-06
Home pagehttps://akka.io/
API documentation
Forums
Release notesakka.io blog
IssuesGithub issues
Sourceshttps://github.com/akka/akka

Introduction

Sharded Daemon Process provides a way to run N actors, each given a numeric id starting from 0 that are then kept alive and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive from a Cluster Singleton (the keep alive should be seen as an implementation detail and may change in future versions).

The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset of the data that needs to be processed. This is commonly needed to create projections based on the event streams available from all the EventSourcedBehaviors in a CQRS application. Events are tagged with one out of N tags used to split the workload of consuming and updating a projection between N workers.

For cases where a single actor needs to be kept alive see Cluster Singleton

Basic example

To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization when starting up:

Scala
sourceval tags = Vector("tag-1", "tag-2", "tag-3")
ShardedDaemonProcess(system).init("TagProcessors", tags.size, id => TagProcessor(tags(id)))
Java
sourceList<String> tags = Arrays.asList("tag-1", "tag-2", "tag-3");
ShardedDaemonProcess.get(system)
    .init(
        TagProcessor.Command.class,
        "TagProcessors",
        tags.size(),
        id -> TagProcessor.create(tags.get(id)));

An additional factory method is provided for further configurability and providing a graceful stop message for the actor.

Addressing the actors

In use cases where you need to send messages to the daemon process actors it is recommended to use the system receptionist either with a single ServiceKey which all daemon process actors register themeselves to for broadcasts or individual keys if more fine grained messaging is needed.

Dynamic scaling of number of workers

Starting the sharded daemon process with initWithContext returns an ActorRef[ShardedDaemonProcessCommand] that accepts a ChangeNumberOfProcessesChangeNumberOfProcesses command to rescale the process to a new number of workers.

The rescaling process among other things includes the process actors stopping themselves in response to a stop message so may be a relatively slow operation. If a subsequent request to rescale is sent while one is in progress it is responded to with a failure response.

A rolling upgrade switching from a static number of workers to a dynamic number is possible. It is not safe to do a rolling upgrade from dynamic number of workers to static without a full cluster shutdown.

Colocate processes

When using the default shard allocation strategy the processes for different names are allocated independent of each other, i.e. the same process index for different process names may be allocated to different nodes. Colocating processes can be useful to share resources, such as Projections with EventsBySliceFirehoseQuery

To colocate such processes you can use the ConsistentHashingShardAllocationStrategyConsistentHashingShardAllocationStrategy as shardAllocationStrategy parameter of the init or initWithContext methods.

Note

Create a new instance of the ConsistentHashingShardAllocationStrategy for each Sharded Daemon Process name, i.e. a ConsistentHashingShardAllocationStrategy instance must not be shared.

The shard identifier that is used by Sharded Daemon Process is the same as the process index, i.e. processes with the same index will be colocated.

The allocation strategy is using Consistent Hashing of the Cluster membership ring to assign a shard to a node. When adding or removing nodes it will rebalance according to the new consistent hashing, but that means that only a few shards will be rebalanced and others remain on the same location. When there are changes to the Cluster membership the shards may be on different nodes for a while, but eventually, when the membership is stable, the shards with the same identifier will end up on the same node.

Scalability

This cluster tool is intended for up to thousands of processes. Running with larger sets of processes might cause problems with Akka Distributed Data replication or process keepalive messages.

Configuration

The following configuration properties are read by the ShardedDaemonProcessSettingsShardedDaemonProcessSettings when created with a ActorSystemActorSystem parameter:

sourceakka.cluster.sharded-daemon-process {
  # Settings for the sharded daemon process internal usage of sharding are using the akka.cluste.sharding defaults.
  # Some of the settings can be overridden specifically for the sharded daemon process here. For example can the
  # `role` setting limit what nodes the daemon processes and the keep alive pingers will run on.
  # Some settings can not be changed (remember-entities and related settings, passivation, number-of-shards),
  # overriding those settings will be ignored.
  sharding = ${akka.cluster.sharding}

  # Each entity is pinged at this interval from a few nodes in the
  # cluster to trigger a start if it has stopped, for example during
  # rebalancing.
  # See also keep-alive-from-number-of-nodes and keep-alive-throttle-interval
  # Note: How the set of actors is kept alive may change in the future meaning this setting may go away.
  keep-alive-interval = 10s

  # Keep alive messages from this number of nodes.
  keep-alive-from-number-of-nodes = 3

  # Keep alive messages are sent with this delay between each message.
  keep-alive-throttle-interval = 100 ms
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.