State changes from Akka Persistence Durable State

A typical source for Projections is the change stored with DurableStateBehaviorDurableStateBehavior in Akka Persistence. Durable state changes can be tagged and then consumed with the changes query.

Akka Projections has integration with changes, which is described here.

Dependencies

To use the Durable State module of Akka Projections, add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-durable-state" % "1.2.2"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-durable-state_${scala.binary.version}</artifactId>
    <version>1.2.2</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-durable-state_${versions.ScalaBinary}:1.2.2"
}

Akka Projections requires Akka 2.6.16 or later, see Akka version.

Project Info: Akka Projections Durable State
Artifact
com.lightbend.akka
akka-projection-durable-state
1.2.2
JDK versions
AdoptOpenJDK 8
AdoptOpenJDK 11
Scala versions2.13.3, 2.12.13
JPMS module nameakka.projection.durable-state
License
Readiness level
Since 1.2.2, 2021-08-19
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection.git

Transitive dependencies

The table below shows the akka-projection-durable-state direct dependencies.The second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akkaakka-projection-core_2.131.2.2
com.typesafe.akkaakka-persistence-query_2.132.6.16
org.scala-langscala-library2.13.3
Dependency tree
com.lightbend.akka    akka-projection-core_2.13    1.2.2
    com.typesafe.akka    akka-actor-typed_2.13    2.6.16    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.6.16    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.slf4j    slf4j-api    1.7.31
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.31
    com.typesafe.akka    akka-persistence-query_2.13    2.6.16    Apache-2.0
        com.typesafe.akka    akka-persistence_2.13    2.6.16    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.6.16    Apache-2.0
                com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
                com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.3    CC0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.16    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.16    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
com.typesafe.akka    akka-persistence-query_2.13    2.6.16    Apache-2.0
    com.typesafe.akka    akka-persistence_2.13    2.6.16    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.16    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.16    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.16    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.16    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
org.scala-lang    scala-library    2.13.3    Apache-2.0

SourceProvider for changesByTag

A SourceProviderSourceProvider defines the source of the event envelopes that the Projection will process. A SourceProvider for the changes query can be defined with the DurableStateStoreProviderDurableStateStoreProvider like this:

Scala
sourceimport akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.projection.state.scaladsl.DurableStateSourceProvider
import akka.projection.scaladsl.SourceProvider
val sourceProvider: SourceProvider[Offset, DurableStateChange[AccountEntity.Account]] =
  DurableStateSourceProvider
    .changesByTag[AccountEntity.Account](system, JdbcDurableStateStore.Identifier, "bank-accounts-1")
Java
sourceimport akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.Offset;
import akka.projection.state.javadsl.DurableStateSourceProvider;
import akka.projection.javadsl.SourceProvider;
SourceProvider<Offset, DurableStateChange<AccountEntity.Account>> sourceProvider =
    DurableStateSourceProvider.changesByTag(
        system, JdbcDurableStateStore.Identifier(), "bank-accounts-1");

This example is using the DurableStateStore JDBC plugin for Akka Persistence. You will use the same plugin that you configured for the write side. The one that is used by the DurableStateBehavior.

This source is consuming all the changes from the Account DurableStateBehavior that are tagged with "bank-accounts-1". In a production application, you would need to start as many instances as the number of different tags you used. That way you consume the changes in all entities.

The DurableStateChange[AccountEntity.Account]DurableStateChange<AccountEntity.Account> is what the Projection handler will process. It contains the State and additional meta data, such as the offset that will be stored by the Projection. See DurableStateChangeDurableStateChange for full details of what it contains.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.