Changes from Durable State
A typical source for Projections is the change stored with DurableStateBehavior
DurableStateBehavior
in Akka Persistence. Durable state changes can be tagged and then consumed with the changes query.
Akka Projections has integration with changes
, which is described here.
When using the R2DBC plugin an alternative to using a Projection is to store the query representation directly from the write side.
Dependencies
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To use the Durable State module of Akka Projections, add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-durable-state" % "1.5.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-durable-state_${scala.binary.version}</artifactId> <version>1.5.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-durable-state_${versions.ScalaBinary}:1.5.0" }
Akka Projections requires Akka 2.9.0 or later, see Akka version.
Project Info: Akka Projections Durable State | |
---|---|
Artifact | com.lightbend.akka
akka-projection-durable-state
1.5.0
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.1 |
JPMS module name | akka.projection.durable-state |
License | |
Readiness level | Supported, Lightbend Subscription provides support
Since 1.2.2, 2021-08-19
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows the akka-projection-durable-state
direct dependencies.The second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka akka-projection-core_2.13 1.5.0 com.typesafe.akka akka-persistence-query_2.13 2.9.0 org.scala-lang scala-library 2.13.12 - Dependency tree
com.lightbend.akka akka-projection-core_2.13 1.5.0 com.typesafe.akka akka-actor-typed_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.9.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.9.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.2 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.9.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
SourceProvider for changesByTag
A SourceProvider
SourceProvider
defines the source of the envelopes that the Projection
will process. A SourceProvider
for the changes
query can be defined with the DurableStateStoreProvider
DurableStateStoreProvider
like this:
- Scala
-
source
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset import akka.projection.state.scaladsl.DurableStateSourceProvider import akka.projection.scaladsl.SourceProvider val sourceProvider: SourceProvider[Offset, DurableStateChange[AccountEntity.Account]] = DurableStateSourceProvider .changesByTag[AccountEntity.Account](system, JdbcDurableStateStore.Identifier, "bank-accounts-1")
- Java
-
source
import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import akka.persistence.query.DurableStateChange; import akka.persistence.query.Offset; import akka.projection.state.javadsl.DurableStateSourceProvider; import akka.projection.javadsl.SourceProvider; SourceProvider<Offset, DurableStateChange<AccountEntity.Account>> sourceProvider = DurableStateSourceProvider.changesByTag( system, JdbcDurableStateStore.Identifier(), "bank-accounts-1");
This example is using the DurableStateStore JDBC plugin for Akka Persistence. You will use the same plugin that you configured for the write side. The one that is used by the DurableStateBehavior
.
This source is consuming all the changes from the Account
DurableStateBehavior
that are tagged with "bank-accounts-1"
. In a production application, you would need to start as many instances as the number of different tags you used. That way you consume the changes from all entities.
The DurableStateChange[AccountEntity.Account]
DurableStateChange<AccountEntity.Account>
is what the Projection
handler will process. It contains the State
and additional meta data, such as the offset that will be stored by the Projection
. See DurableStateChange
DurableStateChange
for full details of what it contains.
SourceProvider for changesBySlices
A SourceProvider
SourceProvider
defines the source of the envelopes that the Projection
will process. A SourceProvider
for the changesBySlices
query can be defined with the DurableStateStoreProvider
DurableStateStoreProvider
like this:
- Scala
-
source
import akka.persistence.query.DurableStateChange import akka.persistence.query.Offset import akka.projection.state.scaladsl.DurableStateSourceProvider import akka.projection.scaladsl.SourceProvider // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = DurableStateSourceProvider.sliceRanges(system, R2dbcDurableStateStore.Identifier, numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "Account" val sourceProvider: SourceProvider[Offset, DurableStateChange[AccountEntity.Account]] = DurableStateSourceProvider .changesBySlices[AccountEntity.Account]( system, R2dbcDurableStateStore.Identifier, entityType, minSlice, maxSlice)
- Java
-
source
import akka.japi.Pair; import akka.persistence.query.DurableStateChange; import akka.persistence.query.Offset; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider; import akka.projection.state.javadsl.DurableStateSourceProvider; // Slit the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcDurableStateStore.Identifier(), numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; SourceProvider<Offset, DurableStateChange<AccountEntity.Account>> sourceProvider = DurableStateSourceProvider.changesBySlices( system, R2dbcDurableStateStore.Identifier(), entityType, minSlice, maxSlice);
This example is using the R2DBC plugin for Akka Persistence. You will use the same plugin that you configured for the write side. The one that is used by the DurableStateBehavior
.
This source is consuming all the changes from the Account
DurableStateBehavior
for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the changes from all entities.
The DurableStateChange[AccountEntity.Account]
DurableStateChange<AccountEntity.Account>
is what the Projection
handler will process. It contains the State
and additional meta data, such as the offset that will be stored by the Projection
. See DurableStateChange
DurableStateChange
for full details of what it contains.