Persistence Query

Dependency

To use Persistence Query, you must add the following dependency in your project:

sbt
val AkkaVersion = "2.6.21"
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-bom_${scala.binary.version}</artifactId>
      <version>2.6.21</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-persistence-query_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.21")

  implementation "com.typesafe.akka:akka-persistence-query_${versions.ScalaBinary}"
}

This will also add dependency on the Akka Persistence module.

Introduction

Akka persistence query provides a query interface to Durable State Behaviors. These queries are based on asynchronous streams. These streams are similar to the ones offered in the Event Sourcing based implementation. Various state store plugins can implement these interfaces to expose their query capabilities.

One of the rationales behind having a separate query module for Akka Persistence is for implementing the so-called query side or read side in the popular CQRS architecture pattern - in which the writing side of the application implemented using Akka persistence, is completely separated from the query side.

Using query with Akka Projections

Akka Persistence and Akka Projections together can be used to develop a CQRS application. In the application the durable state is stored in a database and fetched as an asynchronous stream to the user. Currently queries on durable state, provided by the DurableStateStoreQuery interface, is used to implement tag based searches in Akka Projections.

At present the query is based on tags. So if you have not tagged your objects, this query cannot be used.

The example below shows how to get the DurableStateStoreQuery from the DurableStateStoreRegistry extension.

Scala
sourceimport akka.persistence.state.DurableStateStoreRegistry
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.DurableStateChange
import akka.persistence.query.UpdatedDurableState

val durableStateStoreQuery =
  DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
  case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
  case _: DeletedDurableState[_]                                              => None
}
Java
sourceimport akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.UpdatedDurableState;

DurableStateStoreQuery<Record> durableStateStoreQuery =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId);
Source<DurableStateChange<Record>, NotUsed> source =
    durableStateStoreQuery.changes("tag", offset);
source.map(
    chg -> {
      if (chg instanceof UpdatedDurableState) {
        UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg;
        return upd.value();
      } else {
        throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass());
      }
    });

The DurableStateChangeDurableStateChange elements can be UpdatedDurableState or DeletedDurableState.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.