DurableStateStore
How to get the DurableStateStore
The DurableStateStore
for JDBC plugin is obtained through the DurableStateStoreRegistry
extension.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
- Java
APIs supported by DurableStateStore
The plugin supports the following APIs:
getObject
getObject(persistenceId)
returns GetObjectResult(value, revision)
, where value
is an Option
(Optional
in Java) and is set to the value of the object if it exists with the passed in persistenceId
. Otherwise value
is empty.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val futureResult: Future[GetObjectResult[String]] = store.getObject("InvalidPersistenceId") futureResult.futureValue.value shouldBe None
- Java
upsertObject
upsertObject(persistenceId, revision, value, tag)
inserts the record if the persistenceId
does not exist in the database. Or else it updates the record with the latest revision passed as revision
. The update succeeds only if the incoming revision
is 1 more than the already existing one. This snippet is an example of a sequnece of upsertObject
and getObject
.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.state.scaladsl.GetObjectResult val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val v: Future[GetObjectResult[String]] = for { n <- store.upsertObject("p234", 1, "a valid string", "t123") _ = n shouldBe akka.Done g <- store.getObject("p234") _ = g.value shouldBe Some("a valid string") u <- store.upsertObject("p234", 2, "updated valid string", "t123") _ = u shouldBe akka.Done h <- store.getObject("p234") } yield h v.futureValue.value shouldBe Some("updated valid string")
- Java
deleteObject
deleteObject(persistenceId)
deletes the record with the input persistenceId
.
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) store.deleteObject("p123", 0L).futureValue shouldBe Done store.getObject("p123").futureValue.value shouldBe None
- Java
currentChanges
currentChanges(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. This api returns changes that occurred up to when the Source
returned by this call is materialized.
- Scala
-
source
import akka.NotUsed import akka.stream.scaladsl.Source import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.currentChanges("tag-1", NoOffset)
- Java
changes
changes(tag, offset)
gets a source of the most recent changes made to objects with the given tag
since the passed in offset
. The returned source will never terminate, it effectively watches for changes to the objects and emits changes as they happen.
- Scala
-
source
import akka.NotUsed import akka.stream.scaladsl.Source import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore import akka.persistence.query.{ DurableStateChange, NoOffset } val store = DurableStateStoreRegistry .get(system) .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) val willNotCompleteTheStream: Source[DurableStateChange[String], NotUsed] = store.changes("tag-1", NoOffset)
- Java