Persistence
Loading

Persistence

Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication channels with at-least-once message delivery semantics.

Warning

This module is marked as “experimental” as of its introduction in Akka 2.3.0. We will continue to improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the akka.persistence package.

Akka persistence is inspired by and the official replacement of the eventsourced library. It follows the same concepts and architecture of eventsourced but significantly differs on API and implementation level. See also Migration Guide Eventsourced to Akka Persistence 2.3.x

Dependencies

Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:

"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.3"

Architecture

  • Processor: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal before its receive method is called. When a processor is started or restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages.
  • View: A view is a persistent, stateful actor that receives journaled messages that have been written by another processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's replicated message stream.
  • Streams: Messages written by a processor can be published in compliance with the Reactive Streams specification. Only those messages that are explicitly requested from downstream processors are actually pulled from a processor's journal.
  • Channel: Channels are used by processors and views to communicate with other actors. They prevent that replayed messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in case of sender and receiver JVM crashes.
  • Journal: A journal stores the sequence of messages sent to a processor. An application can control which messages are journaled and which are received by the processor without being journaled. The storage backend of a journal is pluggable. The default journal storage plugin writes to the local filesystem, replicated journals are available as Community plugins.
  • Snapshot store: A snapshot store persists snapshots of a processor's or a view's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The default snapshot storage plugin writes to the local filesystem.
  • Event sourcing. Based on the building blocks described above, Akka persistence provides abstractions for the development of event sourced applications (see section Event sourcing)

Processors

A processor can be implemented by extending the Processor trait and implementing the receive method.

import akka.persistence.{ Persistent, PersistenceFailure, Processor }

class MyProcessor extends Processor {
  def receive = {
    case Persistent(payload, sequenceNr) =>
    // message successfully written to journal
    case PersistenceFailure(payload, sequenceNr, cause) =>
    // message failed to be written to journal
    case other =>
    // message not written to journal
  }
}

Processors only write messages of type Persistent to the journal, others are received without being persisted. When a processor's receive method is called with a Persistent message it can safely assume that this message has been successfully written to the journal. If a journal fails to write a Persistent message then the processor is stopped, by default. If a processor should continue running on persistence failures it must handle PersistenceFailure messages. In this case, a processor may want to inform the sender about the failure, so that the sender can re-send the message, if needed.

A Processor itself is an Actor and can therefore be instantiated with actorOf.

import akka.actor.Props

val processor = actorOf(Props[MyProcessor], name = "myProcessor")

processor ! Persistent("foo") // will be journaled
processor ! "bar" // will not be journaled

Recovery

By default, a processor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a processor during recovery do not interfere with replayed messages. New messages will only be received by a processor after recovery completes.

Recovery customization

Automated recovery on start can be disabled by overriding preStart with an empty implementation.

override def preStart() = ()

In this case, a processor must be recovered explicitly by sending it a Recover() message.

processor ! Recover()

If not overridden, preStart sends a Recover() message to self. Applications may also override preStart to define further Recover() parameters such as an upper sequence number bound, for example.

override def preStart() {
  self ! Recover(toSequenceNr = 457L)
}

Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated recovery on restart can be disabled by overriding preRestart with an empty implementation.

override def preRestart(reason: Throwable, message: Option[Any]) = ()

Recovery status

A processor can query its own recovery status via the methods

def recoveryRunning: Boolean
def recoveryFinished: Boolean

Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the processor. The processor can send itself a message from preStart. It will be stashed and received after recovery. The mailbox may contain other messages that are queued in front of that message and therefore you need to stash until you receive that message.

override def preStart(): Unit = {
  super.preStart()
  self ! "FIRST"
}

def receive = initializing.orElse(active)

def initializing: Receive = {
  case "FIRST" =>
    recoveryCompleted()
    context.become(active)
    unstashAll()
  case other if recoveryFinished =>
    stash()
}

def recoveryCompleted(): Unit = {
  // perform init after recovery, before any other messages
  // ...
}

def active: Receive = {
  case Persistent(msg, _) => //...
}

Failure handling

A persistent message that caused an exception will be received again by a processor after restart. To prevent a replay of that message during recovery it can be deleted.

override def preRestart(reason: Throwable, message: Option[Any]) {
  message match {
    case Some(p: Persistent) => deleteMessage(p.sequenceNr)
    case _                   =>
  }
  super.preRestart(reason, message)
}

Message deletion

A processor can delete a single message by calling the deleteMessage method with the sequence number of that message as argument. An optional permanent parameter specifies whether the message shall be permanently deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number, processors should call the deleteMessages method.

Identifiers

A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the String representation of processor's path without the address part and can be obtained via the processorId method.

def processorId: String

Applications can customize a processor's id by specifying an actor name during processor creation as shown in section Processors. This changes that processor's name in its actor hierarchy and hence influences only part of the processor id. To fully customize a processor's id, the processorId method must be overridden.

override def processorId = "my-stable-processor-id"

Overriding processorId is the recommended way to generate stable identifiers.

Views

Views can be implemented by extending the View trait and implementing the receive and the processorId methods.

class MyView extends View {
  def processorId: String = "some-processor-id"

  def receive: Actor.Receive = {
    case Persistent(payload, sequenceNr) => // ...
  }
}

The processorId identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default.

Updates

The default update interval of all views of an actor system is configurable:

akka.persistence.view.auto-update-interval = 5s

View implementation classes may also override the autoUpdateInterval method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an Update message.

val view = system.actorOf(Props[MyView])
view ! Update(await = true)

If the await parameter is set to true, messages that follow the Update request are processed when the incremental message replay, triggered by that update request, completed. If set to false (default), messages following the update request may interleave with the replayed message stream. Automated updates always run with await = false.

Automated updates of all views of an actor system can be turned off by configuration:

akka.persistence.view.auto-update = off

Implementation classes may override the configured default value by overriding the autoUpdate method. To limit the number of replayed messages per update request, applications can configure a custom akka.persistence.view.auto-update-replay-max value or override the autoUpdateReplayMax method. The number of replayed messages for manual updates can be limited with the replayMax parameter of the Update message.

Recovery

Initial recovery of views works in the very same way as for Processors (i.e. by sending a Recover message to self). The maximum number of replayed messages during initial recovery is determined by autoUpdateReplayMax. Further possibilities to customize initial recovery are explained in section Processors.

Identifiers

A view must have an identifier that doesn't change across different actor incarnations. It defaults to the String representation of the actor path without the address part and can be obtained via the viewId method.

Applications can customize a view's id by specifying an actor name during view creation. This changes that view's name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the viewId method must be overridden. Overriding viewId is the recommended way to generate stable identifiers.

The viewId must differ from the referenced processorId, unless Snapshots of a view and its processor shall be shared (which is what applications usually do not want).

Streams

TODO: rename *producer* to *publisher*.

A Reactive Streams Producer can be created from a processor's message stream via the PersistentFlow extension of the Akka Streams Scala DSL:

import org.reactivestreams.api.Producer

import akka.persistence.Persistent
import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow

val materializer = FlowMaterializer(MaterializerSettings())

val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id")
val producer: Producer[Persistent] = flow.toProducer(materializer)

The created flow object is of type Flow[Persistent] and can be composed with other flows using Flow combinators (= methods defined on Flow). Calling the toProducer method on flow creates a producer of type Producer[Persistent].

A persistent message producer only reads from a processor's journal when explicitly requested by downstream consumers. In order to avoid frequent, fine grained read access to a processor's journal, the producer tries to buffer persistent messages in memory from which it serves downstream requests. The maximum buffer size per producer is configurable with a PersistentPublisherSettings configuration object.

PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200))

Other ProducerSettings parameters are:

  • fromSequenceNr: specifies from which sequence number the persistent message stream shall start (defaults to 1L). Please note that specifying fromSequenceNr is much more efficient than using the drop(Int) combinator, especially for larger sequence numbers.
  • idle: an optional parameter that specifies how long a producer shall wait after a journal read attempt didn't return any new persistent messages. If not defined, the producer uses the akka.persistence.view.auto-update-interval configuration parameter, otherwise, it uses the defined idle parameter.

Here are two examples how persistent message producers can be connected to downstream consumers using the Akka Streams Scala DSL and its PersistentFlow extension.

// 1 producer and 2 consumers:
val producer1: Producer[Persistent] =
  PersistentFlow.fromProcessor("processor-1").toProducer(materializer)
Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer)
Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer)

// 2 producers (merged) and 1 consumer:
val producer2: Producer[Persistent] =
  PersistentFlow.fromProcessor("processor-2").toProducer(materializer)
val producer3: Producer[Persistent] =
  PersistentFlow.fromProcessor("processor-3").toProducer(materializer)
Flow(producer2).merge(producer3).foreach { p =>
  println(s"consumer-3: ${p.payload}")
}.consume(materializer)

Channels

Channels are special actors that are used by processors or views to communicate with other actors (channel destinations). The following discusses channels in context of processors but this is also applicable to views.

Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed message is retained by a channel if its delivery has been confirmed by a destination.

import akka.actor.{ Actor, Props }
import akka.persistence.{ Channel, Deliver, Persistent, Processor }

class MyProcessor extends Processor {
  val destination = context.actorOf(Props[MyDestination])
  val channel = context.actorOf(Channel.props(), name = "myChannel")

  def receive = {
    case p @ Persistent(payload, _) =>
      channel ! Deliver(p.withPayload(s"processed ${payload}"), destination.path)
  }
}

class MyDestination extends Actor {
  def receive = {
    case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries) =>
      // ...
      p.confirm()
  }
}

A channel is ready to use once it has been created, no recovery or further activation is needed. A Deliver request instructs a channel to send a Persistent message to a destination. A destination is provided as ActorPath and messages are sent by the channel via that path's ActorSelection. Sender references are preserved by a channel, therefore, a destination can reply to the sender of a Deliver request.

Note

Sending via a channel has at-least-once delivery semantics—by virtue of either the sending actor or the channel being persistent—which means that the semantics do not match those of a normal ActorRef send operation:

  • it is not at-most-once delivery
  • message order for the same sender–receiver pair is not retained due to possible resends
  • after a crash and restart of the destination messages are still delivered—to the new actor incarnation

These semantics match precisely what an ActorPath represents (see Actor Lifecycle), therefore you need to supply a path and not a reference when constructing Deliver messages.

If a processor wants to reply to a Persistent message sender it should use the sender path as channel destination.

channel ! Deliver(p.withPayload(s"processed ${payload}"), sender.path)

Persistent messages delivered by a channel are of type ConfirmablePersistent. ConfirmablePersistent extends Persistent by adding the methods confirm and redeliveries (see also Message re-delivery). A channel destination confirms the delivery of a ConfirmablePersistent message by calling confirm() on that message. This asynchronously writes a confirmation entry to the journal. Replayed messages internally contain confirmation entries which allows a channel to decide if it should retain these messages or not.

A Processor can also be used as channel destination i.e. it can persist ConfirmablePersistent messages too.

Message re-delivery

Channels re-deliver messages to destinations if they do not confirm delivery within a configurable timeout. This timeout can be specified as redeliverInterval when creating a channel, optionally together with the maximum number of re-deliveries a channel should attempt for each unconfirmed message. The number of re-delivery attempts can be obtained via the redeliveries method on ConfirmablePersistent or by pattern matching.

context.actorOf(Channel.props(
  ChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
  name = "myChannel")

A channel keeps messages in memory until their successful delivery has been confirmed or the maximum number of re-deliveries is reached. To be notified about messages that have reached the maximum number of re-deliveries, applications can register a listener at channel creation.

class MyListener extends Actor {
  def receive = {
    case RedeliverFailure(messages) => // ...
  }
}

val myListener = context.actorOf(Props[MyListener])
val myChannel = context.actorOf(Channel.props(
  ChannelSettings(redeliverFailureListener = Some(myListener))))

A listener receives RedeliverFailure notifications containing all messages that could not be delivered. On receiving a RedeliverFailure message, an application may decide to restart the sending processor to enforce a re-send of these messages to the channel or confirm these messages to prevent further re-sends. The sending processor can also be restarted any time later to re-send unconfirmed messages.

This combination of

  • message persistence by sending processors
  • message replays by sending processors
  • message re-deliveries by channels and
  • application-level confirmations (acknowledgements) by destinations

enables channels to provide at-least-once message delivery semantics. Possible duplicates can be detected by destinations by tracking message sequence numbers. Message sequence numbers are generated per sending processor. Depending on how a processor routes outbound messages to destinations, they may either see a contiguous message sequence or a sequence with gaps.

Warning

If a processor emits more than one outbound message per inbound Persistent message it must use a separate channel for each outbound message to ensure that confirmations are uniquely identifiable, otherwise, at-least-once message delivery semantics do not apply. This rule has been introduced to avoid writing additional outbound message identifiers to the journal which would decrease the overall throughput. It is furthermore recommended to collapse multiple outbound messages to the same destination into a single outbound message, otherwise, if sent via multiple channels, their ordering is not defined.

If an application wants to have more control how sequence numbers are assigned to messages it should use an application-specific sequence number generator and include the generated sequence numbers into the payload of Persistent messages.

Persistent channels

Channels created with Channel.props do not persist messages. These channels are usually used in combination with a sending processor that takes care of persistence, hence, channel-specific persistence is not necessary in this case. They are referred to as transient channels in the following.

Persistent channels are like transient channels but additionally persist messages before delivering them. Messages that have been persisted by a persistent channel are deleted when destinations confirm their delivery. A persistent channel can be created with PersistentChannel.props and configured with a PersistentChannelSettings object.

val channel = context.actorOf(PersistentChannel.props(
  PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
  name = "myPersistentChannel")

channel ! Deliver(Persistent("example"), destination.path)

A persistent channel is useful for delivery of messages to slow destinations or destinations that are unavailable for a long time. It can constrain the number of pending confirmations based on the pendingConfirmationsMax and pendingConfirmationsMin parameters of PersistentChannelSettings.

PersistentChannelSettings(
  pendingConfirmationsMax = 10000,
  pendingConfirmationsMin = 2000)

It suspends delivery when the number of pending confirmations reaches pendingConfirmationsMax and resumes delivery again when this number falls below pendingConfirmationsMin. This prevents both, flooding destinations with more messages than they can process and unlimited memory consumption by the channel. A persistent channel continues to persist new messages even when message delivery is temporarily suspended.

Standalone usage

Applications may also use channels standalone. Transient channels can be used standalone if re-delivery attempts to destinations are required but message loss in case of a sender JVM crash is not an issue. If message loss in case of a sender JVM crash is an issue, persistent channels should be used. In this case, applications may want to receive replies from the channel whether messages have been successfully persisted or not. This can be enabled by creating the channel with the replyPersistent configuration parameter set to true:

PersistentChannelSettings(replyPersistent = true)

With this setting, either the successfully persisted message is replied to the sender or a PersistenceFailure message. In case the latter case, the sender should re-send the message.

Identifiers

In the same way as Processors and Views, channels also have an identifier that defaults to a channel's path. A channel identifier can therefore be customized by using a custom actor name at channel creation. This changes that channel's name in its actor hierarchy and hence influences only part of the channel identifier. To fully customize a channel identifier, it should be provided as argument Channel.props(String) or PersistentChannel.props(String) (recommended to generate stable identifiers).

context.actorOf(Channel.props("my-stable-channel-id"))

Persistent messages

Payload

The payload of a Persistent message can be obtained via its

def payload: Any

method or by pattern matching

case Persistent(payload, _) =>

Inside processors, new persistent messages are derived from the current persistent message before sending them via a channel, either by calling p.withPayload(...) or Persistent(...) where the latter uses the implicit currentPersistentMessage made available by Processor.

implicit def currentPersistentMessage: Option[Persistent]

This is necessary for delivery confirmations to work properly. Both ways are equivalent but we recommend using p.withPayload(...) for clarity.

Sequence number

The sequence number of a Persistent message can be obtained via its

def sequenceNr: Long

method or by pattern matching

case Persistent(_, sequenceNr) =>

Persistent messages are assigned sequence numbers on a per-processor basis (or per channel basis if used standalone). A sequence starts at 1L and doesn't contain gaps unless a processor deletes messages.

Snapshots

Snapshots can dramatically reduce recovery times of processors and views. The following discusses snapshots in context of processors but this is also applicable to views.

Processors can save snapshots of internal state by calling the saveSnapshot method. If saving of a snapshot succeeds, the processor receives a SaveSnapshotSuccess message, otherwise a SaveSnapshotFailure message

class MyProcessor extends Processor {
  var state: Any = _

  def receive = {
    case "snap"                                => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata)         => // ...
    case SaveSnapshotFailure(metadata, reason) => // ...
  }
}

where metadata is of type SnapshotMetadata:

case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L)

During recovery, the processor is offered a previously saved snapshot via a SnapshotOffer message from which it can initialize internal state.

class MyProcessor extends Processor {
  var state: Any = _

  def receive = {
    case SnapshotOffer(metadata, offeredSnapshot) => state = offeredSnapshot
    case Persistent(payload, sequenceNr)          => // ...
  }
}

The replayed messages that follow the SnapshotOffer message, if any, are younger than the offered snapshot. They finally recover the processor to its current (i.e. latest) state.

In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots and at least one of these snapshots matches the SnapshotSelectionCriteria that can be specified for recovery.

processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(
  maxSequenceNr = 457L,
  maxTimestamp = System.currentTimeMillis))

If not specified, they default to SnapshotSelectionCriteria.Latest which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use SnapshotSelectionCriteria.None. A recovery where no saved snapshot matches the specified SnapshotSelectionCriteria will replay all journaled messages.

Snapshot deletion

A processor can delete individual snapshots by calling the deleteSnapshot method with the sequence number and the timestamp of a snapshot as argument. To bulk-delete snapshots matching SnapshotSelectionCriteria, processors should use the deleteSnapshots method.

Event sourcing

In all the examples so far, messages that change a processor's state have been sent as Persistent messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever Persistent messages a processor receives. This is also known as command sourcing. Commands, however, may fail and some applications cannot tolerate command failures during recovery.

For these applications Event Sourcing is a better choice. Applied to Akka persistence, the basic idea behind event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it can be applied to the current state. Here, validation can mean anything, from simple inspection of a command message's fields up to a conversation with several external services, for example. If validation succeeds, events are generated from the command, representing the effect of the command. These events are then persisted and, after successful persistence, used to change a processor's state. When the processor needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. In other words, events cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course also process commands that do not change application state, such as query commands, for example.

Akka persistence supports event sourcing with the EventsourcedProcessor trait (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this trait does not handle Persistent messages directly but uses the persist method to persist and handle events. The behavior of an EventsourcedProcessor is defined by implementing receiveRecover and receiveCommand. This is demonstrated in the following example.

import akka.actor._
import akka.persistence._

case class Cmd(data: String)
case class Evt(data: String)

case class ExampleState(events: List[String] = Nil) {
  def update(evt: Evt) = copy(evt.data :: events)
  def size = events.length
  override def toString: String = events.reverse.toString
}

class ExampleProcessor extends EventsourcedProcessor {
  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.update(event)

  def numEvents =
    state.size

  val receiveRecover: Receive = {
    case evt: Evt                                 => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  }

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case "print" => println(state)
  }

}

The example defines two data types, Cmd and Evt to represent commands and events, respectively. The state of the ExampleProcessor is a list of persisted event data contained in ExampleState.

The processor's receiveRecover method defines how state is updated during recovery by handling Evt and SnapshotOffer messages. The processor's receiveCommand method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling persist with an event (or a sequence of events) as first argument and an event handler as second argument.

The persist method persists events asynchronously and the event handler is executed for successfully persisted events. Successfully persisted events are internally sent back to the processor as individual messages that trigger event handler executions. An event handler may close over processor state and mutate it. The sender of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command (not shown).

The main responsibility of an event handler is changing processor state using event data and notifying others about successful state changes by publishing events.

When persisting events with persist it is guaranteed that the processor will not receive further commands between the persist call and the execution(s) of the associated event handler. This also holds for multiple persist calls in context of a single command.

The easiest way to run this example yourself is to download Typesafe Activator and open the tutorial named Akka Persistence Samples with Scala. It contains instructions on how to run the EventsourcedExample.

Note

It's also possible to switch between different command handlers during normal processing and recovery with context.become() and context.unbecome(). To get the actor into the same state after recovery you need to take special care to perform the same state transitions with become and unbecome in the receiveRecover method as you would have done in the command handler.

Reliable event delivery

Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery, Channels must be used. In this case, also replayed events (received by receiveRecover) must be sent to a channel, as shown in the following example:

class MyEventsourcedProcessor(destination: ActorRef) extends EventsourcedProcessor {
  val channel = context.actorOf(Channel.props("channel"))

  def handleEvent(event: String) = {
    // update state
    // ...
    // reliably deliver events
    channel ! Deliver(Persistent(event), destination.path)
  }

  def receiveRecover: Receive = {
    case event: String => handleEvent(event)
  }

  def receiveCommand: Receive = {
    case "cmd" => {
      // ...
      persist("evt")(handleEvent)
    }
  }
}

In larger integration scenarios, channel destinations may be actors that submit received events to an external message broker, for example. After having successfully submitted an event, they should call confirm() on the received ConfirmablePersistent message.

Batch writes

To optimize throughput, a Processor internally batches received Persistent messages under high load before writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads to a configurable maximum size (default is 200) under high load.

akka.persistence.journal.max-message-batch-size = 200

A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.

Applications that want to have more explicit control over batch writes and batch sizes can send processors PersistentBatch messages.

class MyProcessor extends Processor {
  def receive = {
    case Persistent("a", _) => // ...
    case Persistent("b", _) => // ...
  }
}

val system = ActorSystem("example")
val processor = system.actorOf(Props[MyProcessor])

processor ! PersistentBatch(List(Persistent("a"), Persistent("b")))

Persistent messages contained in a PersistentBatch are always written atomically, even if the batch size is greater than max-message-batch-size. Also, a PersistentBatch is written isolated from other batches. Persistent messages contained in a PersistentBatch are received individually by a processor.

PersistentBatch messages, for example, are used internally by an EventsourcedProcessor to ensure atomic writes of events. All events that are persisted in context of a single command are written as a single batch to the journal (even if persist is called multiple times per command). The recovery of an EventsourcedProcessor will therefore never be done partially (with only a subset of events persisted by a single command).

Confirmation and deletion operations performed by Channels are also batched. The maximum confirmation and deletion batch sizes are configurable with akka.persistence.journal.max-confirmation-batch-size and akka.persistence.journal.max-deletion-batch-size, respectively.

Storage plugins

Storage backends for journals and snapshot stores are pluggable in Akka persistence. The default journal plugin writes messages to LevelDB (see Local LevelDB journal). The default snapshot store plugin writes snapshots as individual files to the local filesystem (see Local snapshot store). Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports:

import scala.concurrent.Future
import scala.collection.immutable.Seq
import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._

Journal plugin API

A journal plugin either extends SyncWriteJournal or AsyncWriteJournal. SyncWriteJournal is an actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this case, the methods to be implemented are:

/**
 * Plugin API: synchronously writes a batch of persistent messages to the journal.
 * The batch write must be atomic i.e. either all persistent messages in the batch
 * are written or none.
 */
def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit

/**
 * Plugin API: synchronously writes a batch of delivery confirmations to the journal.
 */
def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Unit

/**
 * Plugin API: synchronously deletes messages identified by `messageIds` from the
 * journal. If `permanent` is set to `false`, the persistent messages are marked as
 * deleted, otherwise they are permanently deleted.
 */
def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit

/**
 * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`
 * (inclusive). If `permanent` is set to `false`, the persistent messages are marked
 * as deleted, otherwise they are permanently deleted.
 */
def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Unit

AsyncWriteJournal is an actor that should be extended if the storage backend API supports asynchronous, non-blocking writes. In this case, the methods to be implemented are:

/**
 * Plugin API: asynchronously writes a batch of persistent messages to the journal.
 * The batch write must be atomic i.e. either all persistent messages in the batch
 * are written or none.
 */
def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit]

/**
 * Plugin API: asynchronously writes a batch of delivery confirmations to the journal.
 */
def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit]

/**
 * Plugin API: asynchronously deletes messages identified by `messageIds` from the
 * journal. If `permanent` is set to `false`, the persistent messages are marked as
 * deleted, otherwise they are permanently deleted.
 */
def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit]

/**
 * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
 * (inclusive). If `permanent` is set to `false`, the persistent messages are marked
 * as deleted, otherwise they are permanently deleted.
 */
def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]

Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement:

/**
 * Plugin API: asynchronously replays persistent messages. Implementations replay
 * a message by calling `replayCallback`. The returned future must be completed
 * when all messages (matching the sequence number bounds) have been replayed.
 * The future must be completed with a failure if any of the persistent messages
 * could not be replayed.
 *
 * The `replayCallback` must also be called with messages that have been marked
 * as deleted. In this case a replayed message's `deleted` method must return
 * `true`.
 *
 * The channel ids of delivery confirmations that are available for a replayed
 * message must be contained in that message's `confirms` sequence.
 *
 * @param processorId processor id.
 * @param fromSequenceNr sequence number where replay should start (inclusive).
 * @param toSequenceNr sequence number where replay should end (inclusive).
 * @param max maximum number of messages to be replayed.
 * @param replayCallback called to replay a single message. Can be called from any
 *                       thread.
 *
 * @see [[AsyncWriteJournal]]
 * @see [[SyncWriteJournal]]
 */
def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr  Unit): Future[Unit]

/**
 * Plugin API: asynchronously reads the highest stored sequence number for the
 * given `processorId`.
 *
 * @param processorId processor id.
 * @param fromSequenceNr hint where to start searching for the highest sequence
 *                       number.
 */
def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long]

A journal plugin can be activated with the following minimal configuration:

# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"

# My custom journal plugin
my-journal {
  # Class name of the plugin.
  class = "docs.persistence.MyJournal"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher for SyncWriteJournal plugins and akka.actor.default-dispatcher for AsyncWriteJournal plugins.

Snapshot store plugin API

A snapshot store plugin must extend the SnapshotStore actor and implement the following methods:

/**
 * Plugin API: asynchronously loads a snapshot.
 *
 * @param processorId processor id.
 * @param criteria selection criteria for loading.
 */
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

/**
 * Plugin API: asynchronously saves a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot.
 */
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]

/**
 * Plugin API: called after successful saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
def saved(metadata: SnapshotMetadata)

/**
 * Plugin API: deletes the snapshot identified by `metadata`.
 *
 * @param metadata snapshot metadata.
 */

def delete(metadata: SnapshotMetadata)

/**
 * Plugin API: deletes all snapshots matching `criteria`.
 *
 * @param processorId processor id.
 * @param criteria selection criteria for deleting.
 */
def delete(processorId: String, criteria: SnapshotSelectionCriteria)

A snapshot store plugin can be activated with the following minimal configuration:

# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"

# My custom snapshot store plugin
my-snapshot-store {
  # Class name of the plugin.
  class = "docs.persistence.MySnapshotStore"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher.

Pre-packaged plugins

Local LevelDB journal

The default journal plugin is akka.persistence.journal.leveldb which writes messages to a local LevelDB instance. The default location of the LevelDB files is a directory named journal in the current working directory. This location can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.journal.leveldb.dir = "target/journal"

With this plugin, each actor system runs its own private LevelDB instance.

Shared LevelDB journal

A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for example, allows processors to failover to a backup node and continue using the shared journal instance from the backup node.

Warning

A shared LevelDB instance is a single point of failure and should therefore only be used for testing purposes. Highly-available, replicated journal are available as Community plugins.

A shared LevelDB instance is started by instantiating the SharedLeveldbStore actor.

import akka.persistence.journal.leveldb.SharedLeveldbStore

val store = system.actorOf(Props[SharedLeveldbStore], "store")

By default, the shared instance writes journaled messages to a local directory named journal in the current working directory. The storage location can be changed by configuration:

akka.persistence.journal.leveldb-shared.store.dir = "target/shared"

Actor systems that use a shared LevelDB store must activate the akka.persistence.journal.leveldb-shared plugin.

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"

This plugin must be initialized by injecting the (remote) SharedLeveldbStore actor reference. Injection is done by calling the SharedLeveldbJournal.setStore method with the actor reference as argument.

trait SharedStoreUsage extends Actor {
  override def preStart(): Unit = {
    context.actorSelection("akka.tcp://example@127.0.0.1:2552/user/store") ! Identify(1)
  }

  def receive = {
    case ActorIdentity(1, Some(store)) =>
      SharedLeveldbJournal.setStore(store, context.system)
  }
}

Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent i.e. only the first injection is used.

Local snapshot store

The default snapshot store plugin is akka.persistence.snapshot-store.local. It writes snapshot files to the local filesystem. The default storage location is a directory named snapshots in the current working directory. This can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.snapshot-store.local.dir = "target/snapshots"

Custom serialization

Serialization of snapshots and payloads of Persistent messages is configurable with Akka's Serialization infrastructure. For example, if an application wants to serialize

  • payloads of type MyPayload with a custom MyPayloadSerializer and
  • snapshots of type MySnapshot with a custom MySnapshotSerializer

it must add

akka.actor {
  serializers {
    my-payload = "docs.persistence.MyPayloadSerializer"
    my-snapshot = "docs.persistence.MySnapshotSerializer"
  }
  serialization-bindings {
    "docs.persistence.MyPayload" = my-payload
    "docs.persistence.MySnapshot" = my-snapshot
  }
}

to the application configuration. If not specified, a default serializer is used.

Testing

When running tests with LevelDB default settings in sbt, make sure to set fork := true in your sbt project otherwise, you'll see an UnsatisfiedLinkError. Alternatively, you can switch to a LevelDB Java port by setting

akka.persistence.journal.leveldb.native = off

or

akka.persistence.journal.leveldb-shared.store.native = off

in your Akka configuration. The LevelDB Java port is for testing purposes only.

Miscellaneous

State machines

State machines can be persisted by mixing in the FSM trait into processors.

import akka.actor.FSM
import akka.persistence.{ Processor, Persistent }

class PersistentDoor extends Processor with FSM[String, Int] {
  startWith("closed", 0)

  when("closed") {
    case Event(Persistent("open", _), counter) =>
      goto("open") using (counter + 1) replying (counter)
  }

  when("open") {
    case Event(Persistent("close", _), counter) =>
      goto("closed") using (counter + 1) replying (counter)
  }
}

Configuration

There are several configuration properties for the persistence module, please refer to the reference configuration.

Contents