Persistence
Loading

Persistence

Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted by a supervisor or migrated in a cluster. It also allows stateful actors to recover from JVM crashes, for example. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot of internal actor state which can dramatically reduce recovery times.

Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available soon.

Akka persistence is inspired by the eventsourced library. It follows the same concepts and architecture of eventsourced but significantly differs on API and implementation level.

Warning

This module is marked as “experimental” as of its introduction in Akka 2.3.0. We will continue to improve this API based on our users’ feedback, which implies that while we try to keep incompatible changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the contents of the akka.persistence package.

Dependencies

Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:

"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-M1"

Architecture

  • Processor: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal before its receive method is called. When a processor is started or restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages.
  • Channel: Channels are used by processors to communicate with other actors. They prevent that replayed messages are redundantly delivered to these actors.
  • Journal: A journal stores the sequence of messages sent to a processor. An application can control which messages are stored and which are received by the processor without being journaled. The storage backend of a journal is pluggable.
  • Snapshot store: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable.

Configuration

By default, journaled messages are written to a directory named journal in the current working directory. This can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.journal.leveldb.dir = "target/journal"

The default storage location of Snapshots is a directory named snapshots in the current working directory. This can be changed by configuration where the specified path can be relative or absolute:

akka.persistence.snapshot-store.local.dir = "target/snapshots"

Processors

A processor can be implemented by extending the Processor trait and implementing the receive method.

import akka.persistence.{ Persistent, PersistenceFailure, Processor }

class MyProcessor extends Processor {
  def receive = {
    case Persistent(payload, sequenceNr)  {
      // message successfully written to journal
    }
    case PersistenceFailure(payload, sequenceNr, cause)  {
      // message failed to be written to journal
    }
    case other  {
      // message not written to journal
    }
  }
}

Processors only write messages of type Persistent to the journal, others are received without being persisted. When a processor's receive method is called with a Persistent message it can safely assume that this message has been successfully written to the journal. If a journal fails to write a Persistent message then the processor is stopped, by default. If an application wants that a processors continues to run on persistence failures it must handle PersistenceFailure messages. In this case, a processor may want to inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that the journal recovered from a temporary failure.

A Processor itself is an Actor and can therefore be instantiated with actorOf.

import akka.actor.Props

val processor = actorOf(Props[MyProcessor], name = "myProcessor")

processor ! Persistent("foo") // will be journaled
processor ! "bar" // will not be journaled

Recovery

By default, a processor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a processor during recovery do not interfere with replayed messages. New messages will only be received by that processor after recovery completes.

Recovery customization

Automated recovery on start can be disabled by overriding preStart with an empty implementation.

override def preStart() = ()

In this case, a processor must be recovered explicitly by sending it a Recover() message.

processor ! Recover()

If not overridden, preStart sends a Recover() message to self. Applications may also override preStart to define further Recover() parameters such as an upper sequence number bound, for example.

override def preStart() {
  self ! Recover(toSequenceNr = 457L)
}

Automated recovery on restart can be disabled by overriding preRestart with an empty implementation.

override def preRestart(reason: Throwable, message: Option[Any]) = ()

Recovery status

A processor can query its own recovery status via the methods

def recoveryRunning: Boolean
def recoveryFinished: Boolean

Failure handling

A persistent message that caused an exception will be received again by a processor after restart. To prevent a replay of that message during recovery it can be marked as deleted.

override def preRestart(reason: Throwable, message: Option[Any]) {
  message match {
    case Some(p: Persistent)  deleteMessage(p)
    case _                   
  }
  super.preRestart(reason, message)
}

Identifiers

A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the String representation of processor's path and can be obtained via the processorId method.

def processorId: String

Applications can customize a processor's id by specifying an actor name during processor creation as shown in section Processors. This works well when using local actor references but may cause problems with remote actor references because their paths also contain deployment information such as host and port (and actor deployments are likely to change during the lifetime of an application). In this case, Processor implementation classes should override processorId.

override def processorId = "my-stable-processor-id"

Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids.

Channels

Channels are special actors that are used by processors to communicate with other actors (channel destinations). Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed message is retained by a channel if its previous delivery has been confirmed by a destination.

import akka.actor.{ Actor, Props }
import akka.persistence.{ Channel, Deliver, Persistent, Processor }

class MyProcessor extends Processor {
  val destination = context.actorOf(Props[MyDestination])
  val channel = context.actorOf(Channel.props(), name = "myChannel")

  def receive = {
    case p @ Persistent(payload, _)  {
      channel ! Deliver(p.withPayload(s"processed ${payload}"), destination)
    }
  }
}

class MyDestination extends Actor {
  def receive = {
    case p @ Persistent(payload, _)  {
      println(s"received ${payload}")
      p.confirm()
    }
  }
}

A channel is ready to use once it has been created, no recovery or further activation is needed. A Deliver request instructs a channel to send a Persistent message to a destination where the sender of the Deliver request is forwarded to the destination. A processor may also reply to a message sender directly by using sender as channel destination.

channel ! Deliver(p.withPayload(s"processed ${payload}"), sender)

Channel destinations confirm the delivery of a Persistent message by calling its confirm() method. This (asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation entries which allows a channel to decide if a message should be retained or not.

If an application crashes after a destination called confirm() but before the confirmation entry could have been written to the journal then the unconfirmed message will be delivered again during next recovery and it is the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent receiver. Duplicates can be detected, for example, by tracking sequence numbers.

Currently, channels do not store Deliver requests or retry delivery on network or destination failures. This feature (reliable channels) will be available soon.

Sender resolution

ActorRef s of Persistent message senders are also stored in the journal. Consequently, they may become invalid if an application is restarted and messages are replayed. For example, the stored ActorRef may then reference a previous incarnation of a sender and a new incarnation of that sender cannot receive a reply from a processor. This may be acceptable for many applications but others may require that a new sender incarnation receives the reply (to reliably resume a conversation between actors after a JVM crash, for example). Here, a channel may assist in resolving new sender incarnations by specifying a third Deliver argument:

  • Resolve.Destination if the sender of a persistent message is used as channel destination

    channel ! Deliver(p, sender, Resolve.Destination)
    
  • Resolve.Sender if the sender of a persistent message is forwarded to a destination.

    channel forward Deliver(p, destination, Resolve.Sender)
    

Default is Resolve.Off which means no resolution. Find out more in the Deliver API docs.

Identifiers

In the same way as Processors, channels also have an identifier that defaults to a channel's path. A channel identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this works well when using local actor references but may cause problems with remote actor references. In this case, an application-defined channel id should be provided as argument to Channel.props(String)

context.actorOf(Channel.props("my-stable-channel-id"))

Persistent messages

Payload

The payload of a Persistent message can be obtained via its

def payload: Any

method or by pattern matching

case Persistent(payload, _) 

Inside processors, new persistent messages are derived from the current persistent message before sending them via a channel, either by calling p.withPayload(...) or Persistent.create(...) where the latter uses the implicit currentPersistentMessage made available by Processor.

implicit def currentPersistentMessage: Option[Persistent]

This is necessary for delivery confirmations to work properly. Both ways are equivalent but we recommend using p.withPayload(...) for clarity.

Sequence number

The sequence number of a Persistent message can be obtained via its

def sequenceNr: Long

method or by pattern matching

case Persistent(_, sequenceNr) 

Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at 1L and doesn't contain gaps unless a processor marks a message as deleted.

Snapshots

Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the saveSnapshot method on Processor. If saving of a snapshot succeeds, the processor will receive a SaveSnapshotSuccess message, otherwise a SaveSnapshotFailure message

class MyProcessor extends Processor {
  var state: Any = _

  def receive = {
    case "snap"                                 saveSnapshot(state)
    case SaveSnapshotSuccess(metadata)          // ...
    case SaveSnapshotFailure(metadata, reason)  // ...
  }
}

where metadata is of type SnapshotMetadata:

case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L)

During recovery, the processor is offered a previously saved snapshot via a SnapshotOffer message from which it can initialize internal state.

class MyProcessor extends Processor {
  var state: Any = _

  def receive = {
    case SnapshotOffer(metadata, offeredSnapshot)  state = offeredSnapshot
    case Persistent(payload, sequenceNr)           // ...
  }
}

The replayed messages that follow the SnapshotOffer message, if any, are younger than the offered snapshot. They finally recover the processor to its current (i.e. latest) state.

In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots and at least one of these snapshots matches the SnapshotSelectionCriteria that can be specified for recovery.

processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(
  maxSequenceNr = 457L,
  maxTimestamp = System.currentTimeMillis))

If not specified, they default to SnapshotSelectionCriteria.Latest which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use SnapshotSelectionCriteria.None. A recovery where no saved snapshot matches the specified SnapshotSelectionCriteria will replay all journaled messages.

Event sourcing

In all the examples so far, messages that change a processor's state have been sent as Persistent messages by an application, so that they can be replayed during recovery. From this point of view, the journal acts as a write-ahead-log for whatever Persistent messages a processor receives. This is also known as command sourcing. Commands, however, may fail and some applications cannot tolerate command failures during recovery.

For these applications Event Sourcing is a better choice. Applied to Akka persistence, the basic idea behind event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it can be applied to the current state. Here, validation can mean anything, from simple inspection of a command message's fields up to a conversation with several external services, for example. If validation succeeds, events are generated from the command, representing the effect of the command. These events are then persisted and, after successful persistence, used to change a processor's state. When the processor needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. In other words, events cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course also process commands that do not change application state, such as query commands, for example.

Akka persistence supports event sourcing with the EventsourcedProcessor trait (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this trait does not handle Persistent messages directly but uses the persist method to persist and handle events. The behavior of an EventsourcedProcessor is defined by implementing receiveReplay and receiveCommand. This is best explained with an example (which is also part of akka-sample-persistence).

import akka.actor._
import akka.persistence._

case class Cmd(data: String)
case class Evt(data: String)

case class ExampleState(events: List[String] = Nil) {
  def update(evt: Evt) = copy(evt.data :: events)
  def size = events.length
  override def toString: String = events.reverse.toString
}

class ExampleProcessor extends EventsourcedProcessor {
  var state = ExampleState()

  def updateState(event: Evt): Unit =
    state = state.update(event)

  def numEvents =
    state.size

  val receiveReplay: Receive = {
    case evt: Evt                                  updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState)  state = snapshot
  }

  val receiveCommand: Receive = {
    case Cmd(data)  {
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event 
        updateState(event)
        context.system.eventStream.publish(event)
        if (data == "foo") context.become(otherCommandHandler)
      }
    }
    case "snap"   saveSnapshot(state)
    case "print"  println(state)
  }

  val otherCommandHandler: Receive = {
    case Cmd("bar")  {
      persist(Evt(s"bar-${numEvents}")) { event 
        updateState(event)
        context.unbecome()
      }
      unstashAll()
    }
    case other  stash()
  }
}

The example defines two data types, Cmd and Evt to represent commands and events, respectively. The state of the ExampleProcessor is a list of persisted event data contained in ExampleState.

The processor's receiveReplay method defines how state is updated during recovery by handling Evt and SnapshotOffer messages. The processor's receiveCommand method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling persist with an event (or a sequence of events) as first argument and an event handler as second argument.

The persist method persists events asynchronously and the event handler is executed for successfully persisted events. Successfully persisted events are internally sent back to the processor as separate messages which trigger the event handler execution. An event handler may therefore close over processor state and mutate it. The sender of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command (not shown).

The main responsibility of an event handler is changing processor state using event data and notifying others about successful state changes by publishing events.

When persisting events with persist it is guaranteed that the processor will not receive new commands between the persist call and the execution(s) of the associated event handler. This also holds for multiple persist calls in context of a single command.

The example also demonstrates how to change the processor's default behavior, defined by receiveCommand, to another behavior, defined by otherCommandHandler, and back using context.become() and context.unbecome(). See also the API docs of persist for further details.

Storage plugins

Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports:

import scala.concurrent.Future
import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._

Journal plugin API

A journal plugin either extends SyncWriteJournal or AsyncWriteJournal. SyncWriteJournal is an actor that should be extended when the storage backend API only supports synchronous, blocking writes. The methods to be implemented in this case are:

/**
 * Plugin API.
 *
 * Synchronously writes a `persistent` message to the journal.
 */
def write(persistent: PersistentImpl): Unit

/**
 * Plugin API.
 *
 * Synchronously marks a `persistent` message as deleted.
 */
def delete(persistent: PersistentImpl): Unit

/**
 * Plugin API.
 *
 * Synchronously writes a delivery confirmation to the journal.
 */
def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit

AsyncWriteJournal is an actor that should be extended if the storage backend API supports asynchronous, non-blocking writes. The methods to be implemented in that case are:

/**
 * Plugin API.
 *
 * Asynchronously writes a `persistent` message to the journal.
 */
def writeAsync(persistent: PersistentImpl): Future[Unit]

/**
 * Plugin API.
 *
 * Asynchronously marks a `persistent` message as deleted.
 */
def deleteAsync(persistent: PersistentImpl): Future[Unit]

/**
 * Plugin API.
 *
 * Asynchronously writes a delivery confirmation to the journal.
 */
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]

Message replays are always asynchronous, therefore, any journal plugin must implement:

/**
 * Plugin API.
 *
 * Asynchronously replays persistent messages. Implementations replay a message
 * by calling `replayCallback`. The returned future must be completed when all
 * messages (matching the sequence number bounds) have been replayed. The future
 * `Long` value must be the highest stored sequence number in the journal for the
 * specified processor. The future must be completed with a failure if any of
 * the persistent messages could not be replayed.
 *
 * The `replayCallback` must also be called with messages that have been marked
 * as deleted. In this case a replayed message's `deleted` field must be set to
 * `true`.
 *
 * The channel ids of delivery confirmations that are available for a replayed
 * message must be contained in that message's `confirms` sequence.
 *
 * @param processorId processor id.
 * @param fromSequenceNr sequence number where replay should start.
 * @param toSequenceNr sequence number where replay should end (inclusive).
 * @param replayCallback called to replay a single message. Can be called from any
 *                       thread.
 *
 * @see [[AsyncWriteJournal]]
 * @see [[SyncWriteJournal]]
 */
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl  Unit): Future[Long]

A journal plugin can be activated with the following minimal configuration:

# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"

# My custom journal plugin
my-journal {
  # Class name of the plugin.
  class = "docs.persistence.MyJournal"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.actor.default-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher for SyncWriteJournal plugins and akka.actor.default-dispatcher for AsyncWriteJournal plugins.

Snapshot store plugin API

A snapshot store plugin must extend the SnapshotStore actor and implement the following methods:

/**
 * Plugin API.
 *
 * Asynchronously loads a snapshot.
 *
 * @param processorId processor id.
 * @param criteria selection criteria for loading.
 */
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

/**
 * Plugin API.
 *
 * Asynchronously saves a snapshot.
 *
 * @param metadata snapshot metadata.
 * @param snapshot snapshot.
 */
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]

/**
 * Plugin API.
 *
 * Called after successful saving of a snapshot.
 *
 * @param metadata snapshot metadata.
 */
def saved(metadata: SnapshotMetadata)

/**
 * Plugin API.
 *
 * Deletes the snapshot identified by `metadata`.
 *
 * @param metadata snapshot metadata.
 */
def delete(metadata: SnapshotMetadata)

A snapshot store plugin can be activated with the following minimal configuration:

# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"

# My custom snapshot store plugin
my-snapshot-store {
  # Class name of the plugin.
  class = "docs.persistence.MySnapshotStore"
  # Dispatcher for the plugin actor.
  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}

The specified plugin class must have a no-arg constructor. The plugin-dispatcher is the dispatcher used for the plugin actor. If not specified, it defaults to akka.persistence.dispatchers.default-plugin-dispatcher.

Custom serialization

Serialization of snapshots and payloads of Persistent messages is configurable with Akka's Serialization infrastructure. For example, if an application wants to serialize

  • payloads of type MyPayload with a custom MyPayloadSerializer and
  • snapshots of type MySnapshot with a custom MySnapshotSerializer

it must add

akka.actor {
  serializers {
    my-payload = "docs.persistence.MyPayloadSerializer"
    my-snapshot = "docs.persistence.MySnapshotSerializer"
  }
  serialization-bindings {
    "docs.persistence.MyPayload" = my-payload
    "docs.persistence.MySnapshot" = my-snapshot
  }
}

to the application configuration. If not specified, a default serializer is used, which is the JavaSerializer in this example.

Miscellaneous

State machines

State machines can be persisted by mixing in the FSM trait into processors.

import akka.actor.FSM
import akka.persistence.{ Processor, Persistent }

class PersistentDoor extends Processor with FSM[String, Int] {
  startWith("closed", 0)

  when("closed") {
    case Event(Persistent("open", _), counter)  {
      goto("open") using (counter + 1) replying (counter)
    }
  }

  when("open") {
    case Event(Persistent("close", _), counter)  {
      goto("closed") using (counter + 1) replying (counter)
    }
  }
}

Upcoming features

  • Reliable channels
  • Extended deletion of messages and snapshots
  • ...

Contents