Building a storage backend for Durable State

Storage backends for state stores are pluggable in the Akka persistence extension. This documentation described how to build a new storage backend for durable state.

Applications can provide their own plugins by implementing a plugin API and activating them by configuration. Plugin development requires the following imports:

Scala
sourceimport akka.persistence._
import akka.persistence.state.scaladsl.DurableStateUpdateStore
import akka.persistence.state.scaladsl.GetObjectResult
Java
source
import akka.Done; import akka.actor.ExtendedActorSystem; import akka.persistence.state.javadsl.DurableStateUpdateStore; import akka.persistence.state.javadsl.GetObjectResult; import com.typesafe.config.Config; import java.util.concurrent.CompletionStage;

State Store plugin API

A durable state store plugin extends DurableStateUpdateStore.

DurableStateUpdateStore is an interface and the methods to be implemented are:

Scala
sourceclass MyStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String) extends DurableStateUpdateStore[A] {

  /**
   * Will persist the latest state. If it’s a new persistence id, the record will be inserted.
   *
   * In case of an existing persistence id, the record will be updated only if the revision
   * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail.
   */
  override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ???

  /**
   * Deprecated. Use the deleteObject overload with revision instead.
   */
  override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0)

  /**
   * Will delete the state by setting it to the empty state and the revision number will be incremented by 1.
   */
  override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ???

  /**
   * Returns the current state for the given persistence id.
   */
  override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ???
}
Java
sourceclass MyJavaStateStore<A> implements DurableStateUpdateStore<A> {

  private ExtendedActorSystem system;
  private Config config;
  private String cfgPath;

  public MyJavaStateStore(ExtendedActorSystem system, Config config, String cfgPath) {
    this.system = system;
    this.config = config;
    this.cfgPath = cfgPath;
  }

  /** Returns the current state for the given persistence id. */
  @Override
  public CompletionStage<GetObjectResult<A>> getObject(String persistenceId) {
    // implement getObject here
    return null;
  }

  /**
   * Will persist the latest state. If it’s a new persistence id, the record will be inserted.
   *
   * <p>In case of an existing persistence id, the record will be updated only if the revision
   * number of the incoming record is 1 more than the already existing record. Otherwise persist
   * will fail.
   */
  @Override
  public CompletionStage<Done> upsertObject(
      String persistenceId, long revision, Object value, String tag) {
    // implement upsertObject here
    return null;
  }

  /** Deprecated. Use the deleteObject overload with revision instead. */
  @Override
  public CompletionStage<Done> deleteObject(String persistenceId) {
    return deleteObject(persistenceId, 0);
  }

  /**
   * Will delete the state by setting it to the empty state and the revision number will be
   * incremented by 1.
   */
  @Override
  public CompletionStage<Done> deleteObject(String persistenceId, long revision) {
    // implement deleteObject here
    return null;
  }
}

A durable state store plugin may also extend DurableStateUpdateWithChangeEventStore to store additional change event.

DurableStateUpdateWithChangeEventStore is an interface and the methods to be implemented are:

Scala
sourceclass MyChangeEventStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String)
    extends DurableStateUpdateWithChangeEventStore[A] {

  /**
   * The `changeEvent` is written to the event journal.
   * Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
   *
   * @param revision sequence number for optimistic locking. starts at 1.
   */
  override def upsertObject(
      persistenceId: String,
      revision: Long,
      value: A,
      tag: String,
      changeEvent: Any): Future[Done] = ???

  override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = ???

  /**
   * Will persist the latest state. If it’s a new persistence id, the record will be inserted.
   *
   * In case of an existing persistence id, the record will be updated only if the revision
   * number of the incoming record is 1 more than the already existing record. Otherwise persist will fail.
   */
  override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ???

  /**
   * Deprecated. Use the deleteObject overload with revision instead.
   */
  override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0)

  /**
   * Will delete the state by setting it to the empty state and the revision number will be incremented by 1.
   */
  override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ???

  /**
   * Returns the current state for the given persistence id.
   */
  override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ???

}
Java
sourceclass MyChangeEventJavaStateStore<A> implements DurableStateUpdateWithChangeEventStore<A> {

  private ExtendedActorSystem system;
  private Config config;
  private String cfgPath;

  public MyChangeEventJavaStateStore(ExtendedActorSystem system, Config config, String cfgPath) {
    this.system = system;
    this.config = config;
    this.cfgPath = cfgPath;
  }

  /**
   * Will delete the state by setting it to the empty state and the revision number will be
   * incremented by 1.
   */
  @Override
  public CompletionStage<Done> deleteObject(String persistenceId, long revision) {
    // implement deleteObject here
    return null;
  }

  @Override
  public CompletionStage<Done> deleteObject(
      String persistenceId, long revision, Object changeEvent) {
    // implement deleteObject here
    return null;
  }

  /** Returns the current state for the given persistence id. */
  @Override
  public CompletionStage<GetObjectResult<A>> getObject(String persistenceId) {
    // implement getObject here
    return null;
  }

  /**
   * Will persist the latest state. If it’s a new persistence id, the record will be inserted.
   *
   * <p>In case of an existing persistence id, the record will be updated only if the revision
   * number of the incoming record is 1 more than the already existing record. Otherwise persist
   * will fail.
   */
  @Override
  public CompletionStage<Done> upsertObject(
      String persistenceId, long revision, Object value, String tag) {
    // implement upsertObject here
    return null;
  }

  /** Deprecated. Use the deleteObject overload with revision instead. */
  @Override
  public CompletionStage<Done> deleteObject(String persistenceId) {
    return deleteObject(persistenceId, 0);
  }

  @Override
  public CompletionStage<Done> upsertObject(
      String persistenceId, long revision, A value, String tag, Object changeEvent) {
    // implement deleteObject here
    return null;
  }
}

State Store provider

A DurableStateStoreProvider needs to be implemented to be able to create the plugin itself:

Scala
sourceclass MyStateStoreProvider(system: ExtendedActorSystem, config: Config, cfgPath: String)
    extends DurableStateStoreProvider {

  /**
   * The `DurableStateStore` implementation for the Scala API.
   * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#durableStateStoreFor]].
   */
  override def scaladslDurableStateStore(): DurableStateStore[Any] = new MyStateStore(system, config, cfgPath)

  /**
   * The `DurableStateStore` implementation for the Java API.
   * This corresponds to the instance that is returned by [[DurableStateStoreRegistry#getDurableStateStoreFor]].
   */
  override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = new MyJavaStateStore(system, config, cfgPath)
}
Java
sourceclass MyJavaStateStoreProvider implements DurableStateStoreProvider {

  private ExtendedActorSystem system;
  private Config config;
  private String cfgPath;

  public MyJavaStateStoreProvider(ExtendedActorSystem system, Config config, String cfgPath) {
    this.system = system;
    this.config = config;
    this.cfgPath = cfgPath;
  }

  @Override
  public DurableStateStore<Object> scaladslDurableStateStore() {
    return new MyStateStore<>(this.system, this.config, this.cfgPath);
  }

  @Override
  public akka.persistence.state.javadsl.DurableStateStore<Object> javadslDurableStateStore() {
    return new MyJavaStateStore<>(this.system, this.config, this.cfgPath);
  }
}

Configure the State Store

A durable state store plugin can be activated with the following minimal configuration:

Scala
source# Path to the state store plugin to be used
akka.persistence.state.plugin = "my-state-store"

# My custom state store plugin
my-state-store {
  # Class name of the plugin.
  class = "docs.persistence.state.MyStateStoreProvider"
}
Java
source# Path to the state store plugin to be used
akka.persistence.state.plugin = "my-java-state-store"

# My custom state store plugin
my-java-state-store {
  # Class name of the plugin.
  class = "docs.persistence.state.MyJavaStateStoreProvider"
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.