Persistence
Loading

Persistence

Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication with at-least-once message delivery semantics.

Warning

This module is marked as “experimental” as of its introduction in Akka 2.3.0. We will continue to improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the akka.persistence package.

Akka persistence is inspired by and the official replacement of the eventsourced library. It follows the same concepts and architecture of eventsourced but significantly differs on API and implementation level. See also Migration Guide Eventsourced to Akka Persistence 2.3.x

Changes in Akka 2.3.4

In Akka 2.3.4 several of the concepts of the earlier versions were collapsed and simplified. In essence; Processor and EventsourcedProcessor are replaced by PersistentActor. Channel and PersistentChannel are replaced by AtLeastOnceDelivery. View is replaced by PersistentView.

See full details of the changes in the Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x). The old classes are still included, and deprecated, for a while to make the transition smooth. In case you need the old documentation it is located here.

Dependencies

Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:

"com.typesafe.akka" %% "akka-persistence-experimental" % "2.4-SNAPSHOT"

Akka persistence extension comes with few built-in persistence plugins, including in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal.

LevelDB based plugins will require the following additional dependency declaration:

"org.iq80.leveldb"            % "leveldb"          % "0.7"
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"

Architecture

  • PersistentActor: Is a persistent, stateful actor. It is able to persist events to a journal and can react to them in a thread-safe manner. It can be used to implement both command as well as event sourced actors. When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can recover internal state from these messages.
  • PersistentView: A view is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream.
  • AtLeastOnceDelivery: To send messages with at-least-once delivery semantics to destinations, also in case of sender and receiver JVM crashes.
  • Journal: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem, and replicated journals are available as Community plugins.
  • Snapshot store: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. Persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem, and replicated snapshot stores are available as Community plugins.

Event sourcing

The basic idea behind Event Sourcing is quite simple. A persistent actor receives a (non-persistent) command which is first validated if it can be applied to the current state. Here, validation can mean anything, from simple inspection of a command message's fields up to a conversation with several external services, for example. If validation succeeds, events are generated from the command, representing the effect of the command. These events are then persisted and, after successful persistence, used to change the actor's state. When the persistent actor needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced actors may of course also process commands that do not change application state, such as query commands, for example.

Akka persistence supports event sourcing with the PersistentActor trait. An actor that extends this trait uses the persist method to persist and handle events. The behavior of a PersistentActor is defined by implementing receiveRecover and receiveCommand. This is demonstrated in the following example.

import akka.actor._
import akka.persistence._

case class Cmd(data: String)
case class Evt(data: String)

case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}

class ExamplePersistentActor extends PersistentActor {
  override def persistenceId = "sample-id-1"

  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.updated(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt                                 => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case "print" => println(state)
  }

}

The example defines two data types, Cmd and Evt to represent commands and events, respectively. The state of the ExamplePersistentActor is a list of persisted event data contained in ExampleState.

The persistent actor's receiveRecover method defines how state is updated during recovery by handling Evt and SnapshotOffer messages. The persistent actor's receiveCommand method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling persist with an event (or a sequence of events) as first argument and an event handler as second argument.

The persist method persists events asynchronously and the event handler is executed for successfully persisted events. Successfully persisted events are internally sent back to the persistent actor as individual messages that trigger event handler executions. An event handler may close over persistent actor state and mutate it. The sender of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command (not shown).

The main responsibility of an event handler is changing persistent actor state using event data and notifying others about successful state changes by publishing events.

When persisting events with persist it is guaranteed that the persistent actor will not receive further commands between the persist call and the execution(s) of the associated event handler. This also holds for multiple persist calls in context of a single command.

If persistence of an event fails, onPersistFailure will be invoked (logging the error by default) and the actor will unconditionally be stopped. If persistence of an event is rejected before it is stored, e.g. due to serialization error, onPersistRejected will be invoked (logging a warning by default) and the actor continues with next message.

The easiest way to run this example yourself is to download Typesafe Activator and open the tutorial named Akka Persistence Samples with Scala. It contains instructions on how to run the PersistentActorExample.

Note

It's also possible to switch between different command handlers during normal processing and recovery with context.become() and context.unbecome(). To get the actor into the same state after recovery you need to take special care to perform the same state transitions with become and unbecome in the receiveRecover method as you would have done in the command handler. Note that when using become from receiveRecover it will still only use the receiveRecover behavior when replaying the events. When replay is completed it will use the new behavior.

Identifiers

A persistent actor must have an identifier that doesn't change across different actor incarnations. The identifier must be defined with the persistenceId method.

override def persistenceId = "my-stable-persistence-id"

Recovery

By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a persistent actor during recovery do not interfere with replayed messages. They are cached and received by a persistent actor after recovery phase completes.

Recovery customization

Applications may also customise how recovery is performed by returning a customised Recovery object in the recovery method of a PersistentActor, for example setting an upper bound to the replay, which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:

override def recovery = Recovery(toSequenceNr = 457L)

Recovery can be disabled by returning Recovery.none() in the recovery method of a PersistentActor:

override def recovery = Recovery.none

Recovery status

A persistent actor can query its own recovery status via the methods

def recoveryRunning: Boolean
def recoveryFinished: Boolean

Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the persistent actor. The persistent actor will receive a special RecoveryCompleted message right after recovery and before any other received messages.

override def receiveRecover: Receive = {
  case RecoveryCompleted =>
  // perform init after recovery, before any other messages
  //...
  case evt               => //...
}

override def receiveCommand: Receive = {
  case msg => //...
}

If there is a problem with recovering the state of the actor from the journal, onReplayFailure is called (logging the error by default) and the actor will be stopped.

Relaxed local consistency requirements and high throughput use-cases

If faced with relaxed local consistency requirements and high throughput demands sometimes PersistentActor and it's persist may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all Events related to a given Command are processed in order to start processing the next Command. While this abstraction is very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in the background and retroactively reacting to persistence failures if needed.

The persistAsync method provides a tool for implementing high-throughput persistent actors. It will not stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks.

In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.).

class MyPersistentActor extends PersistentActor {

  override def persistenceId = "my-stable-persistence-id"

  override def receiveRecover: Receive = {
    case _ => // handle recovery here
  }

  override def receiveCommand: Receive = {
    case c: String => {
      sender() ! c
      persistAsync(s"evt-$c-1") { e => sender() ! e }
      persistAsync(s"evt-$c-2") { e => sender() ! e }
    }
  }
}

// usage
persistentActor ! "a"
persistentActor ! "b"

// possible order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-b-1
// evt-b-2

Note

In order to implement the pattern known as "command sourcing" simply call persistAsync(cmd)(...) right away on all incoming messages, and handle them in the callback.

Warning

The callback will not be invoked if the actor is restarted (or stopped) in between the call to persistAsync and the journal has confirmed the write.

Deferring actions until preceding persist handlers have executed

Sometimes when working with persistAsync you may find that it would be nice to define some actions in terms of ''happens-after the previous persistAsync handlers have been invoked''. PersistentActor provides an utility method called deferAsync, which works similarly to persistAsync yet does not persist the passed in event. It is recommended to use it for read operations, and actions which do not have corresponding events in your domain model.

Using this method is very similar to the persist family of methods, yet it does not persist the passed in event. It will be kept in memory and used when invoking the handler.

class MyPersistentActor extends PersistentActor {

  override def persistenceId = "my-stable-persistence-id"

  override def receiveRecover: Receive = {
    case _ => // handle recovery here
  }

  override def receiveCommand: Receive = {
    case c: String => {
      sender() ! c
      persistAsync(s"evt-$c-1") { e => sender() ! e }
      persistAsync(s"evt-$c-2") { e => sender() ! e }
      deferAsync(s"evt-$c-3") { e => sender() ! e }
    }
  }
}

Notice that the sender() is safe to access in the handler callback, and will be pointing to the original sender of the command for which this deferAsync handler was called.

The calling side will get the responses in this (guaranteed) order:

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3

Warning

The callback will not be invoked if the actor is restarted (or stopped) in between the call to deferAsync and the journal has processed and confirmed all preceding writes.

Nested persist calls

It is possible to call persist and persistAsync inside their respective callback blocks and they will properly retain both the thread safety (including the right value of sender()) as well as stashing guarantees.

In general it is encouraged to create command handlers which do not need to resort to nested event persisting, however there are situations where it may be useful. It is important to understand the ordering of callback execution in those situations, as well as their implication on the stashing behaviour (that persist() enforces). In the following example two persist calls are issued, and each of them issues another persist inside its callback:

override def receiveCommand: Receive = {
  case c: String =>
    sender() ! c

    persist(s"$c-1-outer") { outer1 =>
      sender() ! outer1
      persist(s"$c-1-inner") { inner1 =>
        sender() ! inner1
      }
    }

    persist(s"$c-2-outer") { outer2 =>
      sender() ! outer2
      persist(s"$c-2-inner") { inner2 =>
        sender() ! inner2
      }
    }
}

When sending two commands to this PersistentActor, the persist handlers will be executed in the following order:

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// a-outer-1
// a-outer-2
// a-inner-1
// a-inner-2
// and only then process "b"
// b
// b-outer-1
// b-outer-2
// b-inner-1
// b-inner-2

First the "outer layer" of persist calls is issued and their callbacks applied, after these have successfully completed the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). And only after all these handlers have been successfully invoked, the next command will delivered to the persistent Actor. In other words, the stashing of incoming commands that is guaranteed by initially calling persist() on the outer layer is extended until all nested persist callbacks have been handled.

It is also possible to nest persistAsync calls, using the same pattern:

override def receiveCommand: Receive = {
  case c: String =>
    sender() ! c
    persistAsync(c + "-outer-1") { outer 
      sender() ! outer
      persistAsync(c + "-inner-1") { inner  sender() ! inner }
    }
    persistAsync(c + "-outer-2") { outer 
      sender() ! outer
      persistAsync(c + "-inner-2") { inner  sender() ! inner }
    }
}

In this case no stashing is happening, yet the events are still persisted and callbacks executed in the expected order:

persistentActor ! "a"
persistentActor ! "b"

// order of received messages:
// a
// b
// a-outer-1
// a-outer-2
// b-outer-1
// b-outer-2
// a-inner-1
// a-inner-2
// b-inner-1
// b-inner-2

// which can be seen as the following causal relationship:
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2

While it is possible to nest mixed persist and persistAsync with keeping their respective semantics it is not a recommended practice as it may lead to overly complex nesting.

Failures

If persistence of an event fails, onPersistFailure will be invoked (logging the error by default) and the actor will unconditionally be stopped.

The reason that it cannot resume when persist fails is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures will most likely fail anyway, since the journal is probably unavailable. It is better to stop the actor and after a back-off timeout start it again. The akka.persistence.BackoffSupervisor actor is provided to support such restarts.

val childProps = Props[MyPersistentActor]
val props = BackoffSupervisor.props(
  childProps,
  childName = "myActor",
  minBackoff = 3.seconds,
  maxBackoff = 30.seconds,
  randomFactor = 0.2)
context.actorOf(props, name = "mySupervisor")

If persistence of an event is rejected before it is stored, e.g. due to serialization error, onPersistRejected will be invoked (logging a warning by default) and the actor continues with next message.

If there is a problem with recovering the state of the actor from the journal when the actor is started, onReplayFailure is called (logging the error by default) and the actor will be stopped.

If the deleteMessages fails onDeleteMessagesFailure will be called (logging a warning by default) and the actor continues with next message.

Atomic writes

Each event is of course stored atomically, but it is also possible to store several events atomically by using the persistAll or persistAllAsync method. That means that all events passed to that method are stored or none of them are stored if there is an error.

The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by persistAll.

Some journals may not support atomic writes of several events and they will then reject the persistAll command, i.e. onPersistRejected is called with an exception (typically UnsupportedOperationException).

Batch writes

To optimize throughput, a persistent actor internally batches events to be stored under high load before writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads to a configurable maximum size (default is 200) under high load. When using persistAsync this increases the maximum throughput dramatically.

akka.persistence.journal.max-message-batch-size = 200

A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.

Message deletion

To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the deleteMessages method.

If the delete fails onDeleteMessagesFailure will be called (logging a warning by default) and the actor continues with next message.

Persistent Views

Persistent views can be implemented by extending the PersistentView trait and implementing the receive and the persistenceId methods.

class MyView extends PersistentView {
  override def persistenceId: String = "some-persistence-id"
  override def viewId: String = "some-persistence-id-view"

  def receive: Receive = {
    case payload if isPersistent =>
    // handle message from journal...
    case payload                 =>
    // handle message from user-land...
  }
}

The persistenceId identifies the persistent actor from which the view receives journaled messages. It is not necessary the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a persistent actor is started later and begins to write new messages, the corresponding view is updated automatically, by default.

It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the isPersistent method. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases (skip the if isPersistent check).

Updates

The default update interval of all views of an actor system is configurable:

akka.persistence.view.auto-update-interval = 5s

PersistentView implementation classes may also override the autoUpdateInterval method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an Update message.

val view = system.actorOf(Props[MyView])
view ! Update(await = true)

If the await parameter is set to true, messages that follow the Update request are processed when the incremental message replay, triggered by that update request, completed. If set to false (default), messages following the update request may interleave with the replayed message stream. Automated updates always run with await = false.

Automated updates of all persistent views of an actor system can be turned off by configuration:

akka.persistence.view.auto-update = off

Implementation classes may override the configured default value by overriding the autoUpdate method. To limit the number of replayed messages per update request, applications can configure a custom akka.persistence.view.auto-update-replay-max value or override the autoUpdateReplayMax method. The number of replayed messages for manual updates can be limited with the replayMax parameter of the Update message.

Recovery

Initial recovery of persistent views works in the very same way as for a persistent actor (i.e. by sending a Recover message to self). The maximum number of replayed messages during initial recovery is determined by autoUpdateReplayMax. Further possibilities to customize initial recovery are explained in section Recovery.

Identifiers

A persistent view must have an identifier that doesn't change across different actor incarnations. The identifier must be defined with the viewId method.

The viewId must differ from the referenced persistenceId, unless Snapshots of a view and its persistent actor shall be shared (which is what applications usually do not want).

Snapshots

Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots in context of persistent actors but this is also applicable to persistent views.

Persistent actors can save snapshots of internal state by calling the saveSnapshot method. If saving of a snapshot succeeds, the persistent actor receives a SaveSnapshotSuccess message, otherwise a SaveSnapshotFailure message

var state: Any = _

override def receiveCommand: Receive = {
  case "snap"                                => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata)         => // ...
  case SaveSnapshotFailure(metadata, reason) => // ...
}

where metadata is of type SnapshotMetadata:

final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

During recovery, the persistent actor is offered a previously saved snapshot via a SnapshotOffer message from which it can initialize internal state.

var state: Any = _

override def receiveRecover: Receive = {
  case SnapshotOffer(metadata, offeredSnapshot) => state = offeredSnapshot
  case RecoveryCompleted                        =>
  case event                                    => // ...
}

The replayed messages that follow the SnapshotOffer message, if any, are younger than the offered snapshot. They finally recover the persistent actor to its current (i.e. latest) state.

In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots and at least one of these snapshots matches the SnapshotSelectionCriteria that can be specified for recovery.

persistentActor ! Recovery(fromSnapshot = SnapshotSelectionCriteria(
  maxSequenceNr = 457L,
  maxTimestamp = System.currentTimeMillis))

If not specified, they default to SnapshotSelectionCriteria.Latest which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use SnapshotSelectionCriteria.None. A recovery where no saved snapshot matches the specified SnapshotSelectionCriteria will replay all journaled messages.

Snapshot deletion

A persistent actor can delete individual snapshots by calling the deleteSnapshot method with the sequence number of when the snapshot was taken.

To bulk-delete a range of snapshots matching SnapshotSelectionCriteria, persistent actors should use the deleteSnapshots method.

At-Least-Once Delivery

To send messages with at-least-once delivery semantics to destinations you can mix-in AtLeastOnceDelivery trait to your PersistentActor on the sending side. It takes care of re-sending messages when they have not been confirmed within a configurable timeout.

Note

At-least-once delivery implies that original message send order is not always preserved and the destination may receive duplicate messages. That means that the semantics do not match those of a normal ActorRef send operation:

  • it is not at-most-once delivery
  • message order for the same sender–receiver pair is not preserved due to possible resends
  • after a crash and restart of the destination messages are still delivered—to the new actor incarnation

These semantics is similar to what an ActorPath represents (see Actor Lifecycle), therefore you need to supply a path and not a reference when delivering messages. The messages are sent to the path with an actor selection.

Use the deliver method to send a message to a destination. Call the confirmDelivery method when the destination has replied with a confirmation message.

Relationship between deliver and confirmDelivery

To send messages to the destination path, use the deliver method. If the persistent actor is not currently recovering, this will send the message to the destination actor. When recovering, messages will be buffered until they have been confirmed using confirmDelivery. Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay), the persistent actor will resend these before sending any other messages.

Deliver requires a deliveryIdToMessage function to pass the provided deliveryId into the message so that correlation between deliver and confirmDelivery is possible. The deliveryId must do the round trip. Upon receipt of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender. The sender will then use it to call confirmDelivery method to complete delivery routine.

import akka.actor.{ Actor, ActorPath }
import akka.persistence.AtLeastOnceDelivery

case class Msg(deliveryId: Long, s: String)
case class Confirm(deliveryId: Long)

sealed trait Evt
case class MsgSent(s: String) extends Evt
case class MsgConfirmed(deliveryId: Long) extends Evt

class MyPersistentActor(destination: ActorPath)
  extends PersistentActor with AtLeastOnceDelivery {

  override def persistenceId: String = "persistence-id"

  override def receiveCommand: Receive = {
    case s: String           => persist(MsgSent(s))(updateState)
    case Confirm(deliveryId) => persist(MsgConfirmed(deliveryId))(updateState)
  }

  override def receiveRecover: Receive = {
    case evt: Evt => updateState(evt)
  }

  def updateState(evt: Evt): Unit = evt match {
    case MsgSent(s) =>
      deliver(destination, deliveryId => Msg(deliveryId, s))

    case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId)
  }
}

class MyDestination extends Actor {
  def receive = {
    case Msg(deliveryId, s) =>
      // ...
      sender() ! Confirm(deliveryId)
  }
}

The deliveryId generated by the persistence module is a strictly monotonically increasing sequence number without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see gaps in the sequence. It is not possible to use custom deliveryId. However, you can send a custom correlation identifier in the message to the destination. You must then retain a mapping between the internal deliveryId (passed into the deliveryIdToMessage function) and your custom correlation id (passed into the message). You can do this by storing such mapping in a Map(correlationId -> deliveryId) from which you can retrieve the deliveryId to be passed into the confirmDelivery method once the receiver of your message has replied with your custom correlation id.

The AtLeastOnceDelivery trait has a state consisting of unconfirmed messages and a sequence number. It does not store this state itself. You must persist events corresponding to the deliver and confirmDelivery invocations from your PersistentActor so that the state can be restored by calling the same methods during the recovery phase of the PersistentActor. Sometimes these events can be derived from other business level events, and sometimes you must create separate events. During recovery calls to deliver will not send out the message, but it will be sent later if no matching confirmDelivery was performed.

Support for snapshots is provided by getDeliverySnapshot and setDeliverySnapshot. The AtLeastOnceDeliverySnapshot contains the full delivery state, including unconfirmed messages. If you need a custom snapshot for other parts of the actor state you must also include the AtLeastOnceDeliverySnapshot. It is serialized using protobuf with the ordinary Akka serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot as a blob in your custom snapshot.

The interval between redelivery attempts is defined by the redeliverInterval method. The default value can be configured with the akka.persistence.at-least-once-delivery.redeliver-interval configuration key. The method can be overridden by implementation classes to return non-default values.

The maximum number of messages that will be sent at each redelivery burst is defined by the redeliveryBurstLimit method (burst frequency is half of the redelivery interval). If there's a lot of unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming amount of messages to be sent at once. The default value can be configured with the akka.persistence.at-least-once-delivery.redelivery-burst-limit configuration key. The method can be overridden by implementation classes to return non-default values.

After a number of delivery attempts a AtLeastOnceDelivery.UnconfirmedWarning message will be sent to self. The re-sending will still continue, but you can choose to call confirmDelivery to cancel the re-sending. The number of delivery attempts before emitting the warning is defined by the warnAfterNumberOfUnconfirmedAttempts method. The default value can be configured with the akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts configuration key. The method can be overridden by implementation classes to return non-default values.

The AtLeastOnceDelivery trait holds messages in memory until their successful delivery has been confirmed. The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory is defined by the maxUnconfirmedMessages method. If this limit is exceed the deliver method will not accept more messages and it will throw AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException. The default value can be configured with the akka.persistence.at-least-once-delivery.max-unconfirmed-messages configuration key. The method can be overridden by implementation classes to return non-default values.

Event Adapters

Note

Complete documentation featuring use-cases and implementation examples for this feature will follow shortly.

In long running projects using event sourcing sometimes the need arrises to detach the data model from the domain model completely.

Event Adapters help in situations where:

  • Version Migrations – existing events stored in Version 1 should be "upcasted" to a new Version 2 representation, and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios the toJournal function is usually an identity function, however the fromJournal is implemented as v1.Event=>v2.Event, performing the neccessary mapping inside the fromJournal method. This technique is sometimes refered to as "upcasting" in other CQRS libraries.
  • Separating Domain and Data models – thanks to EventAdapters it is possible to completely separate the domain model from the model used to persist data in the Journals. For example one may want to use case classes in the domain model, however persist their protocol-buffer (or any other binary serialization format) counter-parts to the Journal. A simple toJournal:MyModel=>MyDataModel and fromJournal:MyDataModel=>MyModel adapter can be used to implement this feature.
  • Journal Specialized Data Types – exposing data types understood by the underlying Journal, for example for data stores which understand JSON it is possible to write an EventAdapter toJournal:Any=>JSON such that the Journal can directly store the json instead of serializing the object to its binary representation.

Implementing an EventAdapter is rather stright forward:

class MyEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
  override def manifest(event: Any): String =
    "" // when no manifest needed, return ""

  override def toJournal(event: Any): Any =
    event // identity

  override def fromJournal(event: Any, manifest: String): EventSeq =
    EventSeq.single(event) // identity
}

Then in order for it to be used on events coming to and from the journal you must bind it using the below configuration syntax:

akka.persistence.journal {
  inmem {
    event-adapters {
      tagging        = "docs.persistence.MyTaggingEventAdapter"
      user-upcasting = "docs.persistence.UserUpcastingEventAdapter"
      item-upcasting = "docs.persistence.ItemUpcastingEventAdapter"
    }

    event-adapter-bindings {
      "docs.persistence.Item"        = tagging
      "docs.persistence.TaggedEvent" = tagging
      "docs.persistence.v1.Event"    = [user-upcasting, item-upcasting]
    }
  }
}

It is possible to bind multiple adapters to one class for recovery, in which case the fromJournal methods of all bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may return from 0 to n adapted events (called as EventSeq), each adapter can investigate the event and if it should indeed adapt it return the adapted event(s) for it, other adapters which do not have anything to contribute during this adaptation simply return EventSeq.empty. The adapted events are then delivered in-order to the PersistentActor during replay.

Note

More advanced techniques utilising advanced binary serialization formats such as protocol buffers or kryo / thrift / avro will be documented very soon. These schema evolutions often may need to reach into the serialization layer, however are much more powerful in terms of flexibly removing unused/deprecated classes from your classpath etc.

Persistent FSM

PersistentFSMActor handles the incoming messages in an FSM like fashion. Its internal state is persisted as a sequence of changes, later referred to as domain events. Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL.

A Simple Example

To demonstrate the features of the PersistentFSMActor trait, consider an actor which represents a Web store customer. The contract of our "WebStoreCustomerFSMActor" is that it accepts the following commands:

sealed trait Command
case class AddItem(item: Item) extends Command
case object Buy extends Command
case object Leave extends Command
case object GetCurrentCart extends Command

AddItem sent when the customer adds an item to a shopping cart Buy - when the customer finishes the purchase Leave - when the customer leaves the store without purchasing anything GetCurrentCart allows to query the current state of customer's shopping cart

The customer can be in one of the following states:

sealed trait UserState extends FSMState
case object LookingAround extends UserState {
  override def identifier: String = "Looking Around"
}
case object Shopping extends UserState {
  override def identifier: String = "Shopping"
}
case object Inactive extends UserState {
  override def identifier: String = "Inactive"
}
case object Paid extends UserState {
  override def identifier: String = "Paid"
}

LookingAround customer is browsing the site, but hasn't added anything to the shopping cart Shopping customer has recently added items to the shopping cart Inactive customer has items in the shopping cart, but hasn't added anything recently, Paid customer has purchased the items

Note

PersistentFSMActor states must inherit from trait PersistentFsmActor.FSMState and implement the def identifier: String method. This is required in order to simplify the serialization of FSM states. String identifiers should be unique!

Customer's actions are "recorded" as a sequence of "domain events", which are persisted. Those events are replayed on actor's start in order to restore the latest customer's state:

sealed trait DomainEvent
case class ItemAdded(item: Item) extends DomainEvent
case object OrderExecuted extends DomainEvent
case object OrderDiscarded extends DomainEvent

Customer state data represents the items in customer's shopping cart:

case class Item(id: String, name: String, price: Float)

sealed trait ShoppingCart {
  def addItem(item: Item): ShoppingCart
  def empty(): ShoppingCart
}
case object EmptyShoppingCart extends ShoppingCart {
  def addItem(item: Item) = NonEmptyShoppingCart(item :: Nil)
  def empty() = this
}
case class NonEmptyShoppingCart(items: Seq[Item]) extends ShoppingCart {
  def addItem(item: Item) = NonEmptyShoppingCart(items :+ item)
  def empty() = EmptyShoppingCart
}

Here is how everything is wired together:

startWith(LookingAround, EmptyShoppingCart)

when(LookingAround) {
  case Event(AddItem(item), _) 
    goto(Shopping) applying ItemAdded(item) forMax (1 seconds)
  case Event(GetCurrentCart, data) 
    stay replying data
}

when(Shopping) {
  case Event(AddItem(item), _) 
    stay applying ItemAdded(item) forMax (1 seconds)
  case Event(Buy, _) 
    goto(Paid) applying OrderExecuted andThen {
      case NonEmptyShoppingCart(items)  reportActor ! PurchaseWasMade(items)
      case EmptyShoppingCart            // do nothing...
    }
  case Event(Leave, _) 
    stop applying OrderDiscarded andThen {
      case _  reportActor ! ShoppingCardDiscarded
    }
  case Event(GetCurrentCart, data) 
    stay replying data
  case Event(StateTimeout, _) 
    goto(Inactive) forMax (2 seconds)
}

when(Inactive) {
  case Event(AddItem(item), _) 
    goto(Shopping) applying ItemAdded(item) forMax (1 seconds)
  case Event(StateTimeout, _) 
    stop applying OrderDiscarded andThen {
      case _  reportActor ! ShoppingCardDiscarded
    }
}

when(Paid) {
  case Event(Leave, _)  stop()
  case Event(GetCurrentCart, data) 
    stay replying data
}

Note

State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. Override the applyEvent method to define how state data is affected by domain events, see the example below

override def applyEvent(event: DomainEvent, cartBeforeEvent: ShoppingCart): ShoppingCart = {
  event match {
    case ItemAdded(item)  cartBeforeEvent.addItem(item)
    case OrderExecuted    cartBeforeEvent
    case OrderDiscarded   cartBeforeEvent.empty()
  }
}

Storage plugins

Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension.

Directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see Community plugins

Plugins can be selected either by "default", for all persistent actors and views, or "individually", when persistent actor or view defines it's own set of plugins.

When persistent actor or view does NOT override journalPluginId and snapshotPluginId methods, persistence extension will use "default" journal and snapshot-store plugins configured in the reference.conf:

akka.persistence.journal.plugin = ""
akka.persistence.snapshot-store.plugin = ""

However, these entries are provided as empty "", and require explicit user configuration via override in the user application.conf. For an example of journal plugin which writes messages to LevelDB see Local LevelDB journal. For an example of snapshot store plugin which writes snapshots as individual files to the local filesystem see Local snapshot store.

Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports:

import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._

Journal plugin API

A journal plugin extends AsyncWriteJournal.

AsyncWriteJournal is an actor and the methods to be implemented are:

/**
 * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the
 * journal.
 *
 * The batch is only for performance reasons, i.e. all messages don't have to be written
 * atomically. Higher throughput can typically be achieved by using batch inserts of many
 * records compared inserting records one-by-one, but this aspect depends on the
 * underlying data store and a journal implementation can implement it as efficient as
 * possible with the assumption that the messages of the batch are unrelated.
 *
 * Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to
 * the event that was passed to the `persist` method of the `PersistentActor`, or it
 * contains several `PersistentRepr` that corresponds to the events that were passed
 * to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the
 * `AtomicWrite` must be written to the data store atomically, i.e. all or none must
 * be stored. If the journal (data store) cannot support atomic writes of multiple
 * events it should reject such writes with a `Try` `Failure` with an
 * `UnsupportedOperationException` describing the issue. This limitation should
 * also be documented by the journal plugin.
 *
 * If there are failures when storing any of the messages in the batch the returned
 * `Future` must be completed with failure. The `Future` must only be completed with
 * success when all messages in the batch have been confirmed to be stored successfully,
 * i.e. they will be readable, and visible, in a subsequent replay. If there is
 * uncertainty about if the messages were stored or not the `Future` must be completed
 * with failure.
 *
 * Data store connection problems must be signaled by completing the `Future` with
 * failure.
 *
 * The journal can also signal that it rejects individual messages (`AtomicWrite`) by
 * the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements
 * as the input `messages` `Seq`. Each `Try` element signals if the corresponding
 * `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
 * a message means it was not stored, i.e. it must not be included in a later replay.
 * Rejecting a message is typically done before attempting to store it, e.g. because of
 * serialization error.
 *
 * Data store connection problems must not be signaled as rejections.
 *
 * Note that it is possible to reduce number of allocations by
 * caching some result `Seq` for the happy path, i.e. when no messages are rejected.
 */
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]

/**
 * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
 * (inclusive).
 */
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]

/**
 * Plugin API
 *
 * Allows plugin implementers to use `f pipeTo self` and
 * handle additional messages for implementing advanced features
 */
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior

If the storage backend API only supports synchronous, blocking writes, the methods should be implemented as:

def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
  Future.fromTry(Try {
    // blocking call here
    ???
  })

A journal plugin must also implement the methods defined in AsyncRecovery for replays and sequence number recovery:

/**
 * Plugin API: asynchronously replays persistent messages. Implementations replay
 * a message by calling `replayCallback`. The returned future must be completed
 * when all messages (matching the sequence number bounds) have been replayed.
 * The future must be completed with a failure if any of the persistent messages
 * could not be replayed.
 *
 * The `replayCallback` must also be called with messages that have been marked
 * as deleted. In this case a replayed message's `deleted` method must return
 * `true`.
 *
 * @param persistenceId persistent actor id.
 * @param fromSequenceNr sequence number where replay should start (inclusive).
 * @param toSequenceNr sequence number where replay should end (inclusive).
 * @param max maximum number of messages to be replayed.
 * @param replayCallback called to replay a single message. Can be called from any
 *                       thread.
 *
 * @see [[AsyncWriteJournal]]
 */
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
                        max: Long)(replayCallback: PersistentRepr  Unit): Future[Unit]

/**
 * Plugin API: asynchronously reads the highest stored sequence number for the
 * given `persistenceId`.
 *
 * @param persistenceId persistent actor id.
 * @param fromSequenceNr hint where to start searching for the highest sequence
 *                       number.
 */
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

A journal plugin can be activated with the following minimal configuration:

# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"

# My custom journal plugin
my-journal {
  # Class name of the plugin.
  class = "docs.persistence.MyJournal"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher for SyncWriteJournal plugins and akka.actor.default-dispatcher for AsyncWriteJournal plugins.

Snapshot store plugin API

A snapshot store plugin must extend the SnapshotStore actor and implement the following methods:

/**
 * Plugin API: asynchronously loads a snapshot.
 *
 * @param persistenceId id of the persistent actor.
 * @param criteria selection criteria for loading.
 */
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

/**
 * Plugin API: asynchronously saves a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot.
 */
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]

/**
 * Plugin API: deletes the snapshot identified by `metadata`.
 *
 * @param metadata snapshot metadata.
 */

def deleteAsync(metadata: SnapshotMetadata): Future[Unit]

/**
 * Plugin API: deletes all snapshots matching `criteria`.
 *
 * @param persistenceId id of the persistent actor.
 * @param criteria selection criteria for deleting.
 */
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]

/**
 * Plugin API
 * Allows plugin implementers to use `f pipeTo self` and
 * handle additional messages for implementing advanced features
 */
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior

A snapshot store plugin can be activated with the following minimal configuration:

# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"

# My custom snapshot store plugin
my-snapshot-store {
  # Class name of the plugin.
  class = "docs.persistence.MySnapshotStore"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher.

Plugin TCK

In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (TCK for short).

The TCK is usable from Java as well as Scala projects, for Scala you need to include the akka-persistence-tck-experimental dependency:

"com.typesafe.akka" %% "akka-persistence-tck-experimental" % "2.4-SNAPSHOT" % "test"

To include the Journal TCK tests in your test suite simply extend the provided JournalSpec:

class MyJournalSpec extends JournalSpec(
  config = ConfigFactory.parseString(
    """
    akka.persistence.journal.plugin = "my.journal.plugin"
    """))

We also provide a simple benchmarking class JournalPerfSpec which includes all the tests that JournalSpec has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most typical scenarios.

In order to include the SnapshotStore TCK tests in your test suite simply extend the SnapshotStoreSpec:

class MySnapshotStoreSpec extends SnapshotStoreSpec(
  config = ConfigFactory.parseString(
    """
    akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
    """))

In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the beforeAll and afterAll methods to hook into the tests lifecycle:

class MyJournalSpec extends JournalSpec(
  config = ConfigFactory.parseString(
    """
    akka.persistence.journal.plugin = "my.journal.plugin"
    """)) {

  val storageLocations = List(
    new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
    new File(config.getString("akka.persistence.snapshot-store.local.dir")))

  override def beforeAll() {
    super.beforeAll()
    storageLocations foreach FileUtils.deleteRecursively
  }

  override def afterAll() {
    storageLocations foreach FileUtils.deleteRecursively
    super.afterAll()
  }

}

We highly recommend including these specifications in your test suite, as they cover a broad range of cases you might have otherwise forgotten to test for when writing a plugin from scratch.

Pre-packaged plugins

Local LevelDB journal

LevelDB journal plugin config entry is akka.persistence.journal.leveldb and it writes messages to a local LevelDB instance. The default location of the LevelDB files is a directory named journal in the current working directory. This location can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.journal.leveldb.dir = "target/journal"

With this plugin, each actor system runs its own private LevelDB instance.

Shared LevelDB journal

A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for example, allows persistent actors to failover to a backup node and continue using the shared journal instance from the backup node.

Warning

A shared LevelDB instance is a single point of failure and should therefore only be used for testing purposes. Highly-available, replicated journal are available as Community plugins.

A shared LevelDB instance is started by instantiating the SharedLeveldbStore actor.

  }
}

class MyJournal extends AsyncWriteJournal {
  def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
    Future.fromTry(Try {
      // blocking call here
      ???
    })

  def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
  def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long,
                          toSequenceNr: Long, max: Long)(
                            replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???
  def asyncReadHighestSequenceNr(persistenceId: String,
                                 fromSequenceNr: Long): Future[Long] = ???

  // optionally override:
  override def receivePluginInternal: Receive = super.receivePluginInternal
}

class MySnapshotStore extends SnapshotStore {
  def loadAsync(persistenceId: String,
                criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
  def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
  def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ???
  def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ???

  // optionally override:
  override def receivePluginInternal: Receive = super.receivePluginInternal
}

object PersistenceTCKDoc {
  new AnyRef {
    import akka.persistence.journal.JournalSpec

    class MyJournalSpec extends JournalSpec(
      config = ConfigFactory.parseString(
        """
        akka.persistence.journal.plugin = "my.journal.plugin"
        """))
  }
  new AnyRef {
    import akka.persistence.snapshot.SnapshotStoreSpec

    class MySnapshotStoreSpec extends SnapshotStoreSpec(
      config = ConfigFactory.parseString(
        """
        akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
        """))
  }
  new AnyRef {
    import java.io.File

    import akka.persistence.journal.JournalSpec
    import org.iq80.leveldb.util.FileUtils

    class MyJournalSpec extends JournalSpec(
      config = ConfigFactory.parseString(
        """
        akka.persistence.journal.plugin = "my.journal.plugin"
        """)) {

      val storageLocations = List(
        new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
        new File(config.getString("akka.persistence.snapshot-store.local.dir")))

      override def beforeAll() {
        super.beforeAll()
        storageLocations foreach FileUtils.deleteRecursively
      }

      override def afterAll() {
        storageLocations foreach FileUtils.deleteRecursively
        super.afterAll()
      }

    }
  }
}

By default, the shared instance writes journaled messages to a local directory named journal in the current working directory. The storage location can be changed by configuration:

akka.persistence.journal.leveldb-shared.store.dir = "target/shared"

Actor systems that use a shared LevelDB store must activate the akka.persistence.journal.leveldb-shared plugin.

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"

This plugin must be initialized by injecting the (remote) SharedLeveldbStore actor reference. Injection is done by calling the SharedLeveldbJournal.setStore method with the actor reference as argument.

trait SharedStoreUsage extends Actor {
  override def preStart(): Unit = {
    context.actorSelection("akka.tcp://example@127.0.0.1:2552/user/store") ! Identify(1)
  }

  def receive = {
    case ActorIdentity(1, Some(store)) =>
      SharedLeveldbJournal.setStore(store, context.system)
  }
}

Internal journal commands (sent by persistent actors) are buffered until injection completes. Injection is idempotent i.e. only the first injection is used.

Local snapshot store

Local snapshot store plugin config entry is akka.persistence.snapshot-store.local and it writes snapshot files to the local filesystem. The default storage location is a directory named snapshots in the current working directory. This can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.snapshot-store.local.dir = "target/snapshots"

Custom serialization

Serialization of snapshots and payloads of Persistent messages is configurable with Akka's Serialization infrastructure. For example, if an application wants to serialize

  • payloads of type MyPayload with a custom MyPayloadSerializer and
  • snapshots of type MySnapshot with a custom MySnapshotSerializer

it must add

akka.actor {
  serializers {
    my-payload = "docs.persistence.MyPayloadSerializer"
    my-snapshot = "docs.persistence.MySnapshotSerializer"
  }
  serialization-bindings {
    "docs.persistence.MyPayload" = my-payload
    "docs.persistence.MySnapshot" = my-snapshot
  }
}

to the application configuration. If not specified, a default serializer is used.

Testing

When running tests with LevelDB default settings in sbt, make sure to set fork := true in your sbt project otherwise, you'll see an UnsatisfiedLinkError. Alternatively, you can switch to a LevelDB Java port by setting

akka.persistence.journal.leveldb.native = off

or

akka.persistence.journal.leveldb-shared.store.native = off

in your Akka configuration. The LevelDB Java port is for testing purposes only.

Configuration

There are several configuration properties for the persistence module, please refer to the reference configuration.

Multiple persistence plugin configurations

By default, persistent actor or view will use "default" journal and snapshot store plugins configured in the following sections of the reference.conf configuration resource:

# Absolute path to the default journal plugin configuration entry.
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
# Absolute path to the default snapshot store plugin configuration entry.
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

Note that in this case actor or view overrides only persistenceId method:

trait ActorWithDefaultPlugins extends PersistentActor {
  override def persistenceId = "123"
}

When persistent actor or view overrides journalPluginId and snapshotPluginId methods, the actor or view will be serviced by these specific persistence plugins instead of the defaults:

trait ActorWithOverridePlugins extends PersistentActor {
  override def persistenceId = "123"
  // Absolute path to the journal plugin configuration entry in the `reference.conf`.
  override def journalPluginId = "akka.persistence.chronicle.journal"
  // Absolute path to the snapshot store plugin configuration entry in the `reference.conf`.
  override def snapshotPluginId = "akka.persistence.chronicle.snapshot-store"
}

Note that journalPluginId and snapshotPluginId must refer to properly configured reference.conf plugin entries with standard class property as well as settings which are specific for those plugins, i.e.:

# Configuration entry for the custom journal plugin, see `journalPluginId`.
akka.persistence.chronicle.journal {
  # Standard persistence extension property: provider FQCN.
  class = "akka.persistence.chronicle.ChronicleSyncJournal"
  # Custom setting specific for the journal `ChronicleSyncJournal`.
  folder = ${user.dir}/store/journal
  # Standard persistence extension property: plugin actor uses config injection.
  inject-config = true
} 
# Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`.
akka.persistence.chronicle.snapshot-store {
  # Standard persistence extension property: provider FQCN.
  class = "akka.persistence.chronicle.ChronicleSnapshotStore"
  # Custom setting specific for the snapshot store `ChronicleSnapshotStore`.
  folder = ${user.dir}/store/snapshot
  # Standard persistence extension property: plugin actor uses config injection.
  inject-config = true
}

Contents