Persistence Query
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To use Persistence Query, you must add the following dependency in your project:
- sbt
val AkkaVersion = "2.9.1" libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-bom_${scala.binary.version}</artifactId> <version>2.9.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-persistence-query_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.9.1") implementation "com.typesafe.akka:akka-persistence-query_${versions.ScalaBinary}" }
This will also add dependency on the Akka Persistence module.
Introduction
Akka persistence query provides a query interface to Durable State Behaviors. These queries are based on asynchronous streams. These streams are similar to the ones offered in the Event Sourcing based implementation. Various state store plugins can implement these interfaces to expose their query capabilities.
One of the rationales behind having a separate query module for Akka Persistence is for implementing the so-called query side or read side in the popular CQRS architecture pattern - in which the writing side of the application implemented using Akka persistence, is completely separated from the query side.
When using the R2DBC plugin an alternative to using Akka persistence query or Projection is to store the query representation directly from the write side.
Using query with Akka Projections
Akka Persistence and Akka Projections together can be used to develop a CQRS application. In the application the durable state is stored in a database and fetched as an asynchronous stream to the user. Currently queries on durable state, provided by the DurableStateStoreQuery
interface, is used to implement tag based searches in Akka Projections.
At present the query is based on tags. So if you have not tagged your objects, this query cannot be used.
The example below shows how to get the DurableStateStoreQuery
from the DurableStateStoreRegistry
extension.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.query.scaladsl.DurableStateStoreQuery import akka.persistence.query.DurableStateChange import akka.persistence.query.UpdatedDurableState val durableStateStoreQuery = DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId) val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset) source.map { case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value) case _: DeletedDurableState[_] => None }
- Java
-
source
import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.query.javadsl.DurableStateStoreQuery; import akka.persistence.query.DurableStateChange; import akka.persistence.query.UpdatedDurableState; DurableStateStoreQuery<Record> durableStateStoreQuery = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor(DurableStateStoreQuery.class, pluginId); Source<DurableStateChange<Record>, NotUsed> source = durableStateStoreQuery.changes("tag", offset); source.map( chg -> { if (chg instanceof UpdatedDurableState) { UpdatedDurableState<Record> upd = (UpdatedDurableState<Record>) chg; return upd.value(); } else { throw new IllegalArgumentException("Unexpected DurableStateChange " + chg.getClass()); } });
The DurableStateChange
DurableStateChange
elements can be UpdatedDurableState
or DeletedDurableState
.