Storage backends for state stores are pluggable in the Akka persistence extension. This documentation described how to build a new storage backend for durable state.
Applications can provide their own plugins by implementing a plugin API and activating them by configuration. Plugin development requires the following imports:
sourceclass MyStateStore[A](system:ExtendedActorSystem, config:Config, cfgPath:String)extendsDurableStateUpdateStore[A]{/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist will fail.
*/overridedef upsertObject(persistenceId:String, revision:Long, value: A, tag:String):Future[Done]=???/**
* Deprecated. Use the deleteObject overload with revision instead.
*/overridedef deleteObject(persistenceId:String):Future[Done]= deleteObject(persistenceId,0)/**
* Will delete the state by setting it to the empty state and the revision number will be incremented by 1.
*/overridedef deleteObject(persistenceId:String, revision:Long):Future[Done]=???/**
* Returns the current state for the given persistence id.
*/overridedef getObject(persistenceId:String):Future[GetObjectResult[A]]=???}
sourceclass MyJavaStateStore<A>implementsDurableStateUpdateStore<A>{privateExtendedActorSystem system;privateConfig config;privateString cfgPath;publicMyJavaStateStore(ExtendedActorSystem system,Config config,String cfgPath){this.system = system;this.config = config;this.cfgPath = cfgPath;}/** Returns the current state for the given persistence id. */@OverridepublicCompletionStage<GetObjectResult<A>> getObject(String persistenceId){// implement getObject herereturnnull;}/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* <p>In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist
* will fail.
*/@OverridepublicCompletionStage<Done> upsertObject(String persistenceId,long revision,Object value,String tag){// implement upsertObject herereturnnull;}/** Deprecated. Use the deleteObject overload with revision instead. */@OverridepublicCompletionStage<Done> deleteObject(String persistenceId){return deleteObject(persistenceId,0);}/**
* Will delete the state by setting it to the empty state and the revision number will be
* incremented by 1.
*/@OverridepublicCompletionStage<Done> deleteObject(String persistenceId,long revision){// implement deleteObject herereturnnull;}}
A durable state store plugin may also extend DurableStateUpdateWithChangeEventStore to store additional change event.
DurableStateUpdateWithChangeEventStore is an interface and the methods to be implemented are:
sourceclass MyChangeEventStateStore[A](system:ExtendedActorSystem, config:Config, cfgPath:String)extendsDurableStateUpdateWithChangeEventStore[A]{/**
* The `changeEvent` is written to the event journal.
* Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
*
* @param revision sequence number for optimistic locking. starts at 1.
*/overridedef upsertObject(
persistenceId:String,
revision:Long,
value: A,
tag:String,
changeEvent:Any):Future[Done]=???overridedef deleteObject(persistenceId:String, revision:Long, changeEvent:Any):Future[Done]=???/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist will fail.
*/overridedef upsertObject(persistenceId:String, revision:Long, value: A, tag:String):Future[Done]=???/**
* Deprecated. Use the deleteObject overload with revision instead.
*/overridedef deleteObject(persistenceId:String):Future[Done]= deleteObject(persistenceId,0)/**
* Will delete the state by setting it to the empty state and the revision number will be incremented by 1.
*/overridedef deleteObject(persistenceId:String, revision:Long):Future[Done]=???/**
* Returns the current state for the given persistence id.
*/overridedef getObject(persistenceId:String):Future[GetObjectResult[A]]=???}
sourceclass MyChangeEventJavaStateStore<A>implementsDurableStateUpdateWithChangeEventStore<A>{privateExtendedActorSystem system;privateConfig config;privateString cfgPath;publicMyChangeEventJavaStateStore(ExtendedActorSystem system,Config config,String cfgPath){this.system = system;this.config = config;this.cfgPath = cfgPath;}/**
* Will delete the state by setting it to the empty state and the revision number will be
* incremented by 1.
*/@OverridepublicCompletionStage<Done> deleteObject(String persistenceId,long revision){// implement deleteObject herereturnnull;}@OverridepublicCompletionStage<Done> deleteObject(String persistenceId,long revision,Object changeEvent){// implement deleteObject herereturnnull;}/** Returns the current state for the given persistence id. */@OverridepublicCompletionStage<GetObjectResult<A>> getObject(String persistenceId){// implement getObject herereturnnull;}/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* <p>In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist
* will fail.
*/@OverridepublicCompletionStage<Done> upsertObject(String persistenceId,long revision,Object value,String tag){// implement upsertObject herereturnnull;}/** Deprecated. Use the deleteObject overload with revision instead. */@OverridepublicCompletionStage<Done> deleteObject(String persistenceId){return deleteObject(persistenceId,0);}@OverridepublicCompletionStage<Done> upsertObject(String persistenceId,long revision, A value,String tag,Object changeEvent){// implement deleteObject herereturnnull;}}
State Store provider
A DurableStateStoreProvider needs to be implemented to be able to create the plugin itself:
sourceclass MyStateStoreProvider(system:ExtendedActorSystem, config:Config, cfgPath:String)extendsDurableStateStoreProvider{/**
* The `DurableStateStore` implementation for the Scala API.
* This corresponds to the instance that is returned by [[DurableStateStoreRegistry#durableStateStoreFor]].
*/overridedef scaladslDurableStateStore():DurableStateStore[Any]=newMyStateStore(system, config, cfgPath)/**
* The `DurableStateStore` implementation for the Java API.
* This corresponds to the instance that is returned by [[DurableStateStoreRegistry#getDurableStateStoreFor]].
*/overridedef javadslDurableStateStore():JDurableStateStore[AnyRef]=newMyJavaStateStore(system, config, cfgPath)}
source# Path to the state store plugin to be used
akka.persistence.state.plugin ="my-state-store"# My custom state store pluginmy-state-store {# Class name of the plugin.class="docs.persistence.state.MyStateStoreProvider"}
source# Path to the state store plugin to be used
akka.persistence.state.plugin ="my-java-state-store"# My custom state store pluginmy-java-state-store {# Class name of the plugin.class="docs.persistence.state.MyJavaStateStoreProvider"}