DurableStateStore
How to get the DurableStateStore
The DurableStateStore for JDBC plugin is obtained through the DurableStateStoreRegistry extension.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) - Java
-
source
import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier());
APIs supported by DurableStateStore
The plugin supports the following APIs:
getObject
getObject(persistenceId) returns GetObjectResult(value, revision), where value is an Option (Optional in Java) and is set to the value of the object if it exists with the passed in persistenceId. Otherwise value is empty.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId") futureResult.futureValue.value shouldBe None - Java
-
source
import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import akka.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> futureResult = store.getObject("InvalidPersistenceId"); try { GetObjectResult<String> result = futureResult.toCompletableFuture().get(); assert !result.value().isPresent(); } catch (Exception e) { // handle exceptions }
upsertObject
upsertObject(persistenceId, revision, value, tag) inserts the record if the persistenceId does not exist in the database. Or else it updates the record with the latest revision passed as revision. The update succeeds only if the incoming revision is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject and getObject.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val v: Future[GetObjectResult[String]] = for { n <- store.upsertObject("p234", 1, "a valid string", "t123") _ = n shouldBe akka.Done g <- store.getObject("p234") _ = g.value shouldBe Some("a valid string") u <- store.upsertObject("p234", 2, "updated valid string", "t123") _ = u shouldBe akka.Done h <- store.getObject("p234") } yield h v.futureValue.value shouldBe Some("updated valid string") - Java
-
source
import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import akka.persistence.state.javadsl.GetObjectResult; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<GetObjectResult<String>> r = store .upsertObject("p234", 1, "a valid string", "t123") .thenCompose(d -> store.getObject("p234")) .thenCompose(o -> store.upsertObject("p234", 2, "updated valid string", "t123")) .thenCompose(d -> store.getObject("p234")); try { assert r.toCompletableFuture().get().value().get().equals("updated valid string"); } catch (Exception e) { // handle exceptions }
deleteObject
deleteObject(persistenceId) deletes the record with the input persistenceId.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) store.deleteObject("p123", 0L).futureValue shouldBe Done store.getObject("p123").futureValue.value shouldBe None - Java
-
source
import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); CompletionStage<Done> futureResult = store.deleteObject("p123"); try { assert futureResult.toCompletableFuture().get().equals(Done.getInstance()); } catch (Exception e) { // handle exceptions }
currentChanges
currentChanges(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. This api returns changes that occurred up to when the Source returned by this call is materialized.
- Scala
-
source
import akka.NotUsed import akka.stream.scaladsl.Source import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.currentChanges("tag-1", NoOffset) - Java
-
source
import akka.NotUsed; import akka.stream.javadsl.Source; import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import akka.persistence.query.DurableStateChange; import akka.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willCompleteTheStream = store.currentChanges("tag-1", NoOffset.getInstance());
changes
changes(tag, offset) gets a source of the most recent changes made to objects with the given tag since the passed in offset. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.
- Scala
-
source
import akka.NotUsed import akka.stream.scaladsl.Source import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.changes("tag-1", NoOffset) - Java
-
source
import akka.NotUsed; import akka.stream.javadsl.Source; import akka.persistence.state.DurableStateStoreRegistry; import akka.persistence.jdbc.state.javadsl.JdbcDurableStateStore; import akka.persistence.query.DurableStateChange; import akka.persistence.query.NoOffset; @SuppressWarnings("unchecked") JdbcDurableStateStore<String> store = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( JdbcDurableStateStore.class, JdbcDurableStateStore.Identifier()); Source<DurableStateChange<String>, NotUsed> willNotCompleteTheStream = store.changes("tag-1", NoOffset.getInstance());