Persistence

Warning

This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.

Warning

This module only has a Scala DSL. See #24193 to track progress and to contribute to the Java DSL.

To use typed persistence add the following dependency:

sbt
libraryDependencies += "com.typesafe.akka" % "akka-persistence-typed_2.11" % "2.5-SNAPSHOT"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence-typed_2.11</artifactId>
  <version>2.5-SNAPSHOT</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence-typed_2.11', version: '2.5-SNAPSHOT'
}

Akka Persistence is a library for building event sourced actors. For background about how it works see the untyped Akka Persistence section. This documentation shows how the typed API for persistence works and assumes you know what is meant by Command, Event and State.

Let’s start with a simple example. The minimum required for a PersistentBehavior is:

Scala
sealed trait Command
sealed trait Event
case class State()

val behavior: Behavior[Command] =
  PersistentBehaviors.immutable[Command, Event, State](
    persistenceId = "abc",
    initialState = State(),
    commandHandler = (ctx, state, cmd) ⇒ ???,
    eventHandler = (state, evt) ⇒ ???)

The first important thing to notice is the Behavior of a persistent actor is typed to the type of the Command because this type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system. The event and state are only used internally.

The parameters to PersistentBehaviors.immutable are::

  • persistenceId is the unique identifier for the persistent actor.
  • initialState defines the State when the entity is first created e.g. a Counter would start wiht 0 as state.
  • commandHandler defines how to handle command and optional functions for other signals, e.g. Termination messages if watch is used.
  • eventHandler updates the current state when an event has been persisted.

Next we’ll discuss each of these in detail.

Command handler

The command handler is a function with 3 parameters for the ActorContext, current State, and Command.

A command handler returns an Effect directive that defines what event or events, if any, to persist.

  • Effect.persist will persist one single event or several events atomically, i.e. all events are stored or none of them are stored if there is an error
  • Effect.none no events are to be persisted, for example a read-only command
  • Effect.unhandled the command is unhandled (not supported) in current state

External side effects can be performed after successful persist with the andThen function e.g Effect.persist(..).andThen. In the example below a reply is sent to the replyTo ActorRef. Note that the new state after applying the event is passed as parameter to the andThen function.

Event handler

When an event has been persisted successfully the current state is updated by applying the event to the current state with the eventHandler function.

The event handler returns the new state, which must be immutable so you return a new instance of the state. The same event handler is also used when the entity is started up to recover its state from the stored events.

It is not recommended to perform side effects in the event handler, as those are also executed during recovery of an persistent actor

Basic example

Command and event:

Scala
sealed trait SimpleCommand
case class Cmd(data: String) extends SimpleCommand

sealed trait SimpleEvent
case class Evt(data: String) extends SimpleEvent

State is a List containing all the events:

Scala
case class ExampleState(events: List[String] = Nil)

The command handler just persists the Cmd payload in an Evt:

Scala
val commandHandler: CommandHandler[SimpleCommand, SimpleEvent, ExampleState] =
  CommandHandler.command {
    case Cmd(data) ⇒ Effect.persist(Evt(data))
  }

The event handler appends the event to the state. This is called after successfully persisting the event in the database:

Scala
val eventHandler: (ExampleState, SimpleEvent) ⇒ (ExampleState) = {
  case (state, Evt(data)) ⇒ state.copy(data :: state.events)
}

These are used to create a PersistentBehavior:

Scala
val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] =
  PersistentBehaviors.immutable[SimpleCommand, SimpleEvent, ExampleState](
    persistenceId = "sample-id-1",
    initialState = ExampleState(Nil),
    commandHandler = commandHandler,
    eventHandler = eventHandler)

The behavior can then be run as with any normal typed actor as described in typed actors documentation.

Larger example

After processing a message plain typed actors are able to return the Behavior that is used for next message.

As you can see in the above examples this is not supported by typed persistent actors. Instead, the state is returned by eventHandler. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes and thus not allowed in Typed Persistence.

For simple actors you can use the same set of command handlers independent of what state the entity is in, as shown in above example. For more complex actors it’s useful to be able to change the behavior in the sense that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.

The next example shows how to define different behavior based on the current State. It is an actor that represents the state of a blog post. Before a post is started the only command it can process is to AddPost. Once it is started then it we can look it up with GetPost, modify it with ChangeBody or publish it with Publish.

The state is captured by:

Scala
object BlogState {
  val empty = BlogState(None, published = false)
}

final case class BlogState(content: Option[PostContent], published: Boolean) {
  def withContent(newContent: PostContent): BlogState =
    copy(content = Some(newContent))
  def isEmpty: Boolean = content.isEmpty
  def postId: String = content match {
    case Some(c) ⇒ c.postId
    case None    ⇒ throw new IllegalStateException("postId unknown before post is created")
  }
}

The commands (only a subset are valid depending on state):

Scala
sealed trait BlogCommand extends Serializable
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPostDone(postId: String)
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
final case object PassivatePost extends BlogCommand
final case class PostContent(postId: String, title: String, body: String)

The command handler to process each command is decided by a CommandHandler.byState command handler, which is a function from State => CommandHandler:

Scala
private def commandHandler: CommandHandler[BlogCommand, BlogEvent, BlogState] = CommandHandler.byState {
  case state if state.isEmpty  ⇒ initial
  case state if !state.isEmpty ⇒ postAdded
}

This can refer to many other CommandHandlers e.g one for a post that hasn’t been started:

Scala
private def initial: CommandHandler[BlogCommand, BlogEvent, BlogState] =
  (ctx, state, cmd) ⇒
    cmd match {
      case AddPost(content, replyTo) ⇒
        val evt = PostAdded(content.postId, content)
        Effect.persist(evt).andThen { state2 ⇒
          // After persist is done additional side effects can be performed
          replyTo ! AddPostDone(content.postId)
        }
      case PassivatePost ⇒
        Effect.stop
      case _ ⇒
        Effect.unhandled
    }

And a different CommandHandler for after the post has been added:

Scala
private def postAdded: CommandHandler[BlogCommand, BlogEvent, BlogState] = {
  (ctx, state, cmd) ⇒
    cmd match {
      case ChangeBody(newBody, replyTo) ⇒
        val evt = BodyChanged(state.postId, newBody)
        Effect.persist(evt).andThen { _ ⇒
          replyTo ! Done
        }
      case Publish(replyTo) ⇒
        Effect.persist(Published(state.postId)).andThen { _ ⇒
          println(s"Blog post ${state.postId} was published")
          replyTo ! Done
        }
      case GetPost(replyTo) ⇒
        replyTo ! state.content.get
        Effect.none
      case _: AddPost ⇒
        Effect.unhandled
      case PassivatePost ⇒
        Effect.stop
    }
}

The event handler is always the same independent of state. The main reason for not making the event handler part of the CommandHandler is that all events must be handled and that is typically independent of what the current state is. The event handler can of course still decide what to do based on the state if that is needed.

Scala
private def eventHandler(state: BlogState, event: BlogEvent): BlogState =
  event match {
    case PostAdded(postId, content) ⇒
      state.withContent(content)

    case BodyChanged(_, newBody) ⇒
      state.content match {
        case Some(c) ⇒ state.copy(content = Some(c.copy(body = newBody)))
        case None    ⇒ state
      }

    case Published(_) ⇒
      state.copy(published = true)
  }

And finally the behavior is created from the byState command handler:

Scala
def behavior: Behavior[BlogCommand] =
  PersistentBehaviors.immutable[BlogCommand, BlogEvent, BlogState](
    persistenceId = "abc",
    initialState = BlogState.empty,
    commandHandler,
    eventHandler)

Serialization

The same serialization mechanism as for untyped actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events you should also consider that it must be possible read old events when the application has evolved. Strategies for that can be found in the schema evolution.

Recovery

Since it is strongly discouraged to perform side effects in applyEvent , side effects should be performed once recovery has completed in the onRecoveryCompleted callback

Scala
val recoveryBehavior: Behavior[Command] =
  PersistentBehaviors.immutable[Command, Event, State](
    persistenceId = "abc",
    initialState = State(),
    commandHandler = (ctx, state, cmd) ⇒ ???,
    eventHandler = (state, evt) ⇒ ???)
    .onRecoveryCompleted { (ctx, state) ⇒
      ???
    }

The onRecoveryCompleted takes on an ActorContext and the current State.

Tagging

Persistence typed allows you to use event tags with the following withTagging method, without using EventAdapter.

Scala
val taggingBehavior: Behavior[Command] =
  PersistentBehaviors.immutable[Command, Event, State](
    persistenceId = "abc",
    initialState = State(),
    commandHandler = (ctx, state, cmd) ⇒ ???,
    eventHandler = (state, evt) ⇒ ???
  ).withTagger(_ ⇒ Set("tag1", "tag2"))

Current limitations

  • The PersistentBehavior can’t be wrapped in other behaviors, such as Behaviors.deferred. See #23694
  • Can only tag events with event adapters. See #23817
  • Missing Java DSL. See #24193
  • Snapshot support. See #24196
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.