Running a Projection
Once you have decided how you want to build your projection, the next step is to run it. Typically, you run it in a distributed fashion in order to spread the load over the different nodes in an Akka Cluster. However, it’s also possible to run it as a single instance (when not clustered) or as single instance in a Cluster Singleton.
Dependencies
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To distribute the projection over the cluster we recommend the use of ShardedDaemonProcess. Add the following dependency in your project if not yet using Akka Cluster Sharding:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.10.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId> <version>2.10.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.typesafe.akka:akka-cluster-sharding-typed_${versions.ScalaBinary}:2.10.0" }
Akka Projections require Akka 2.10.0 or later, see Akka version.
For more information on using Akka Cluster consult Akka’s reference documentation on Akka Cluster and Akka Cluster Sharding.
Running with Sharded Daemon Process
The Sharded Daemon Process can be used to distribute n
instances of a given Projection across the cluster. Therefore, it’s important that each Projection instance consumes a subset of the stream of envelopes.
How the subset is created depends on the kind of source we consume. If it’s an Alpakka Kafka source, this is done by Kafka consumer groups. When consuming from Akka Persistence Journal, the events must be partitioned by tagging them as demonstrated in the example below, or by the built-in slices in Projections R2DBC.
Tagging Events in EventSourcedBehavior
Before we can consume the events, the EventSourcedBehavior
must tag the events with a slice number.
- Scala
-
source
import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey val tags = Vector.tabulate(5)(i => s"carts-$i") val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart") def init(system: ActorSystem[_]): Unit = { ClusterSharding(system).init(Entity(EntityKey) { entityContext => val i = math.abs(entityContext.entityId.hashCode % tags.size) val selectedTag = tags(i) ShoppingCart(entityContext.entityId, selectedTag) }.withRole("write-model")) } def apply(cartId: String, projectionTag: String): Behavior[Command] = { EventSourcedBehavior .withEnforcedReplies[Command, Event, State]( PersistenceId(EntityKey.name, cartId), State.empty, (state, command) => //The shopping cart behavior changes if it's checked out or not. // The commands are handled differently for each case. if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command) else openShoppingCart(cartId, state, command), (state, event) => handleEvent(state, event)) .withTagger(_ => Set(projectionTag)) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100)) .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) }
- Java
-
source
public static final List<String> tags = Collections.unmodifiableList( Arrays.asList("carts-0", "carts-1", "carts-2", "carts-3", "carts-4")); @Override public Set<String> tagsFor(Event event) { int n = Math.abs(cartId.hashCode() % tags.size()); String selectedTag = tags.get(n); return Collections.singleton(selectedTag); }
In the above example, we created a Vector[String]
List<String>
of tags from carts-0 to carts-4. Each entity instance will tag its events using one of those tags. The tag is selected based on the modulo of the entity id’s hash code (stable identifier) and the total number of tags. As a matter of fact, this will create a journal sliced with different tags (ie: from carts-0 to carts-4). Note the .withTagger
in the initialization of the EventSourcedBehavior
.
The number of tags should be more than the number of nodes that you want to distribute the load over. It’s not easy to change this afterwards without system downtime. Fewer tags than the number of nodes will result in not hosting a Projection instance on some nodes. More tags than the number of nodes means that each node is hosting more than one Projection instance, which is fine. It’s good to start with more tags than nodes to have some room for scaling out to more nodes later if needed. As a rule of thumb, the number of tags should be a factor of ten greater than the planned maximum number of cluster nodes. It doesn’t have to be exact.
When using slices with Projections R2DBC it is possible to dynamically change the number of projection instances at runtime.
We will use those tags to query the journal and create as many Projections instances, and distribute them in the cluster.
When using Akka Persistence Cassandra plugin you should not use too many tags for each event. Each tag will result in a copy of the event in a separate table and that can impact write performance. Typically, you would use 1 tag per event as illustrated here. Additional filtering of events can be done in the Projection handler if it doesn’t have to act on certain events. The JDBC plugin don’t have this constraint.
See also the Akka reference documentation for tagging.
Event Sourced Provider per tag
We can use the EventSourcedProvider to consume the ShoppingCart
events.
- Scala
-
source
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.projection.eventsourced.scaladsl.EventSourcedProvider import docs.eventsourced.ShoppingCart def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag)
- Java
-
source
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.javadsl.SourceProvider; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.eventsourced.EventEnvelope; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); }
Note that we define a method that builds a new SourceProvider
for each passed tag
.
Building the Projection instances
Next we create a method to return Projection instances. Again, we pass a tag that is used to initialize the SourceProvider
and as the key in ProjectionId
.
- Scala
-
source
import akka.projection.ProjectionId import akka.projection.cassandra.scaladsl.CassandraProjection def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) .withSaveOffset(100, 500.millis)
- Java
-
source
import akka.projection.cassandra.javadsl.CassandraProjection; import akka.projection.Projection; import akka.projection.ProjectionId; int saveOffsetAfterEnvelopes = 100; Duration saveOffsetAfterDuration = Duration.ofMillis(500); Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new) .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration); }
Initializing the Sharded Daemon
Once we have the tags, the SourceProvider
and the Projection
of our choice, we can glue all the pieces together using the Sharded Daemon Process and let it be distributed across the cluster.
- Scala
-
source
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.projection.ProjectionBehavior ShardedDaemonProcess(system).init[ProjectionBehavior.Command]( name = "shopping-carts", numberOfInstances = ShoppingCart.tags.size, behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCart.tags(i))), stopMessage = ProjectionBehavior.Stop)
- Java
-
source
import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.projection.ProjectionBehavior; ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "shopping-carts", ShoppingCart.tags.size(), id -> ProjectionBehavior.create(projection(ShoppingCart.tags.get((Integer) id))), ProjectionBehavior.stopMessage());
For this example, we configure as many ShardedDaemonProcess
as tags and we define the behavior factory to return ProjectionBehavior
wrapping each time a different Projection
instance. Finally, the ShardedDaemon
is configured to use the ProjectionBehavior.Stop
as its control stop message.
For graceful stop it is recommended to use ProjectionBehavior.Stop
ProjectionBehavior.stop()
message.
Projection Behavior
The ProjectionBehavior
is an Actor Behavior
that knows how to manage the Projection lifecyle. The Projection starts to consume the events as soon as the actor is spawned and will restart the source in case of failures (see Projection Settings).
For graceful stop it is recommended to use ProjectionBehavior.Stop
ProjectionBehavior.stop()
message.
Running with local Actor
You can spawn the ProjectionBehavior
as any other Behavior
. This can be useful for testing or when running a local ActorSystem
without Akka Cluster.
- Scala
-
source
def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag) def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) val projection1 = projection("carts-1") context.spawn(ProjectionBehavior(projection1), projection1.projectionId.id)
- Java
-
source
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); } Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new); } Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1"); ActorRef<ProjectionBehavior.Command> projection1Ref = context.spawn(ProjectionBehavior.create(projection1), projection1.projectionId().id());
Be aware of that the projection and its offset storage is based on the ProjectionId
. If more than one instance with the same ProjectionId
are running concurrently they will overwrite each others offset storage with undefined and unpredictable results.
Running in Cluster Singleton
If you know that you only need one or a few projection instances an alternative to Sharded Daemon Process is to use Akka Cluster Singleton
- Scala
-
source
import akka.cluster.typed.ClusterSingleton import akka.cluster.typed.SingletonActor def sourceProvider(tag: String) = EventSourcedProvider .eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, tag = tag) def projection(tag: String) = CassandraProjection .atLeastOnce( projectionId = ProjectionId("shopping-carts", tag), sourceProvider(tag), handler = () => new ShoppingCartHandler) val projection1 = projection("carts-1") ClusterSingleton(system).init( SingletonActor(ProjectionBehavior(projection1), projection1.projectionId.id) .withStopMessage(ProjectionBehavior.Stop))
- Java
-
source
import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.SingletonActor; SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) { return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); } Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) { return CassandraProjection.atLeastOnce( ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new); } Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1"); ActorRef<ProjectionBehavior.Command> projection1Ref = ClusterSingleton.get(system) .init( SingletonActor.of( ProjectionBehavior.create(projection1), projection1.projectionId().id()) .withStopMessage(ProjectionBehavior.stopMessage()));
Be aware of that all projection instances that are running with Cluster Singleton will be running on the same node in the Cluster.