Building a storage backend for Durable State
Storage backends for state stores are pluggable in the Akka persistence extension. This documentation described how to build a new storage backend for durable state.
Applications can provide their own plugins by implementing a plugin API and activating them by configuration. Plugin development requires the following imports:
- Scala
-
source
import akka.persistence._ import akka.persistence.state.scaladsl.DurableStateUpdateStore import akka.persistence.state.scaladsl.GetObjectResult
- Java
-
source
import akka.Done; import akka.actor.ExtendedActorSystem; import akka.persistence.state.javadsl.DurableStateUpdateStore; import akka.persistence.state.javadsl.GetObjectResult; import com.typesafe.config.Config; import java.util.concurrent.CompletionStage;
State Store plugin API
A durable state store plugin extends DurableStateUpdateStore
.
DurableStateUpdateStore
is an interface and the methods to be implemented are:
- Scala
-
source
class MyStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateUpdateStore[A] { /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ??? /** * Deprecated. Use the deleteObject overload with revision instead. */ override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0) /** * Will delete the state by setting it to the empty state and the revision number will be incremented by 1. */ override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ??? /** * Returns the current state for the given persistence id. */ override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ??? }
- Java
-
source
class MyJavaStateStore<A> implements DurableStateUpdateStore<A> { private ExtendedActorSystem system; private Config config; private String cfgPath; public MyJavaStateStore(ExtendedActorSystem system, Config config, String cfgPath) { this.system = system; this.config = config; this.cfgPath = cfgPath; } /** Returns the current state for the given persistence id. */ @Override public CompletionStage<GetObjectResult<A>> getObject(String persistenceId) { // implement getObject here return null; } /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * <p>In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist * will fail. */ @Override public CompletionStage<Done> upsertObject( String persistenceId, long revision, Object value, String tag) { // implement upsertObject here return null; } /** Deprecated. Use the deleteObject overload with revision instead. */ @Override public CompletionStage<Done> deleteObject(String persistenceId) { return deleteObject(persistenceId, 0); } /** * Will delete the state by setting it to the empty state and the revision number will be * incremented by 1. */ @Override public CompletionStage<Done> deleteObject(String persistenceId, long revision) { // implement deleteObject here return null; } }
A durable state store plugin may also extend DurableStateUpdateWithChangeEventStore
to store additional change event.
DurableStateUpdateWithChangeEventStore
is an interface and the methods to be implemented are:
- Scala
-
source
class MyChangeEventStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateUpdateWithChangeEventStore[A] { /** * The `changeEvent` is written to the event journal. * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`. * * @param revision sequence number for optimistic locking. starts at 1. */ override def upsertObject( persistenceId: String, revision: Long, value: A, tag: String, changeEvent: Any): Future[Done] = ??? override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = ??? /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail. */ override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ??? /** * Deprecated. Use the deleteObject overload with revision instead. */ override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0) /** * Will delete the state by setting it to the empty state and the revision number will be incremented by 1. */ override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ??? /** * Returns the current state for the given persistence id. */ override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ??? }
- Java
-
source
class MyChangeEventJavaStateStore<A> implements DurableStateUpdateWithChangeEventStore<A> { private ExtendedActorSystem system; private Config config; private String cfgPath; public MyChangeEventJavaStateStore(ExtendedActorSystem system, Config config, String cfgPath) { this.system = system; this.config = config; this.cfgPath = cfgPath; } /** * Will delete the state by setting it to the empty state and the revision number will be * incremented by 1. */ @Override public CompletionStage<Done> deleteObject(String persistenceId, long revision) { // implement deleteObject here return null; } @Override public CompletionStage<Done> deleteObject( String persistenceId, long revision, Object changeEvent) { // implement deleteObject here return null; } /** Returns the current state for the given persistence id. */ @Override public CompletionStage<GetObjectResult<A>> getObject(String persistenceId) { // implement getObject here return null; } /** * Will persist the latest state. If it’s a new persistence id, the record will be inserted. * * <p>In case of an existing persistence id, the record will be updated only if the revision * number of the incoming record is 1 more than the already existing record. Otherwise persist * will fail. */ @Override public CompletionStage<Done> upsertObject( String persistenceId, long revision, Object value, String tag) { // implement upsertObject here return null; } /** Deprecated. Use the deleteObject overload with revision instead. */ @Override public CompletionStage<Done> deleteObject(String persistenceId) { return deleteObject(persistenceId, 0); } @Override public CompletionStage<Done> upsertObject( String persistenceId, long revision, A value, String tag, Object changeEvent) { // implement deleteObject here return null; } }
State Store provider
A DurableStateStoreProvider
needs to be implemented to be able to create the plugin itself:
- Scala
-
source
class MyStateStoreProvider(system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateStoreProvider { /** * The `DurableStateStore` implementation for the Scala API. * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#durableStateStoreFor]]. */ override def scaladslDurableStateStore(): DurableStateStore[Any] = new MyStateStore(system, config, cfgPath) /** * The `DurableStateStore` implementation for the Java API. * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#getDurableStateStoreFor]]. */ override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = new MyJavaStateStore(system, config, cfgPath) }
- Java
-
source
class MyJavaStateStoreProvider implements DurableStateStoreProvider { private ExtendedActorSystem system; private Config config; private String cfgPath; public MyJavaStateStoreProvider(ExtendedActorSystem system, Config config, String cfgPath) { this.system = system; this.config = config; this.cfgPath = cfgPath; } @Override public DurableStateStore<Object> scaladslDurableStateStore() { return new MyStateStore<>(this.system, this.config, this.cfgPath); } @Override public akka.persistence.state.javadsl.DurableStateStore<Object> javadslDurableStateStore() { return new MyJavaStateStore<>(this.system, this.config, this.cfgPath); } }
Configure the State Store
A durable state store plugin can be activated with the following minimal configuration:
- Scala
-
source
# Path to the state store plugin to be used akka.persistence.state.plugin = "my-state-store" # My custom state store plugin my-state-store { # Class name of the plugin. class = "docs.persistence.state.MyStateStoreProvider" }
- Java
-
source
# Path to the state store plugin to be used akka.persistence.state.plugin = "my-java-state-store" # My custom state store plugin my-java-state-store { # Class name of the plugin. class = "docs.persistence.state.MyJavaStateStoreProvider" }