This documentation regards version 2.10.4+3-f7095f01-SNAPSHOT, however the current version is 2.10.4.
Building a storage backend for Durable State
Storage backends for state stores are pluggable in the Akka persistence extension. This documentation described how to build a new storage backend for durable state.
Applications can provide their own plugins by implementing a plugin API and activating them by configuration. Plugin development requires the following imports:
- Scala
-
source
import akka.persistence._ import akka.persistence.state.scaladsl.DurableStateUpdateStore import akka.persistence.state.scaladsl.GetObjectResult
- Java
State Store plugin API
A durable state store plugin extends DurableStateUpdateStore
.
DurableStateUpdateStore
is an interface and the methods to be implemented are:
- Scala
-
source
class MyStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateUpdateStore[A] { /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ??? /** * Deprecated. Use the deleteObject overload with revision instead. */ override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0) /** * Will delete the state by setting it to the empty state and the revision number will be incremented by 1. */ override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ??? /** * Returns the current state for the given persistence id. */ override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ??? }
- Java
A durable state store plugin may also extend DurableStateUpdateWithChangeEventStore
to store additional change event.
DurableStateUpdateWithChangeEventStore
is an interface and the methods to be implemented are:
- Scala
-
source
class MyChangeEventStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateUpdateWithChangeEventStore[A] { /** * The `changeEvent` is written to the event journal. * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. * * @param revision sequence number for optimistic locking. starts at 1. */ override def upsertObject( persistenceId: String, revision: Long, value: A, tag: String, changeEvent: Any): Future[Done] = ??? override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = ??? /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ??? /** * Deprecated. Use the deleteObject overload with revision instead. */ override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0) /** * Will delete the state by setting it to the empty state and the revision number will be incremented by 1. */ override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ??? /** * Returns the current state for the given persistence id. */ override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ??? }
- Java
State Store provider
A DurableStateStoreProvider
needs to be implemented to be able to create the plugin itself:
- Scala
-
source
class MyStateStoreProvider(system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateStoreProvider { /** * The `DurableStateStore` implementation for the Scala API. * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#durableStateStoreFor]]. */ override def scaladslDurableStateStore(): DurableStateStore[Any] = new MyStateStore(system, config, cfgPath) /** * The `DurableStateStore` implementation for the Java API. * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#getDurableStateStoreFor]]. */ override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = new MyJavaStateStore(system, config, cfgPath) }
- Java
Configure the State Store
A durable state store plugin can be activated with the following minimal configuration: