Persistence Query

Dependency

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

To use Persistence Query, you must add the following dependency in your project:

sbt
val AkkaVersion = "2.9.2"
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-bom_${scala.binary.version}</artifactId>
      <version>2.9.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-persistence-query_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.9.2")

  implementation "com.typesafe.akka:akka-persistence-query_${versions.ScalaBinary}"
}

This will also add dependency on the Akka Persistence module.

Introduction

Akka persistence query provides a query interface to Durable State Behaviors. These queries are based on asynchronous streams. These streams are similar to the ones offered in the Event Sourcing based implementation. Various state store plugins can implement these interfaces to expose their query capabilities.

One of the rationales behind having a separate query module for Akka Persistence is for implementing the so-called query side or read side in the popular CQRS architecture pattern - in which the writing side of the application implemented using Akka persistence, is completely separated from the query side.

Alternative

When using the R2DBC plugin an alternative to using Akka persistence query or Projection is to store the query representation directly from the write side.

Using query with Akka Projections

Akka Persistence and Akka Projections together can be used to develop a CQRS application. In the application the durable state is stored in a database and fetched as an asynchronous stream to the user. Currently queries on durable state, provided by the DurableStateStoreQuery interface, is used to implement tag based searches in Akka Projections.

At present the query is based on tags. So if you have not tagged your objects, this query cannot be used.

The example below shows how to get the DurableStateStoreQuery from the DurableStateStoreRegistry extension.

Scala
sourceimport akka.persistence.state.DurableStateStoreRegistry
import akka.persistence.query.scaladsl.DurableStateStoreQuery
import akka.persistence.query.DurableStateChange
import akka.persistence.query.UpdatedDurableState

val durableStateStoreQuery =
  DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
  case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
  case _: DeletedDurableState[_]                                              => None
}
Java
sourceimport akka.persistence.state.DurableStateStoreRegistry;
import akka.persistence.query.javadsl.DurableStateStoreQuery;
import akka.persistence.query.DurableStateChange;
import akka.persistence.query.UpdatedDurableState;

DurableStateStoreQuery<Record> durableStateStoreQuery =
    DurableStateStoreRegistry.get(system)
        .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId);
Source<DurableStateChange<Record>, NotUsed> source =
    durableStateStoreQuery.changes("tag", offset);
source.map(
    chg -> {
      if (chg instanceof UpdatedDurableState) {
        UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg;
        return upd.value();
      } else {
        throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass());
      }
    });

The DurableStateChangeDurableStateChange elements can be UpdatedDurableState or DeletedDurableState.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.