Persistence

Dependency

To use Akka Persistence Typed, add the module to your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % "2.5.18"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence-typed_2.12</artifactId>
  <version>2.5.18</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence-typed_2.12', version: '2.5.18'
}

Introduction

Akka Persistence is a library for building event sourced actors. For background about how it works see the untyped Akka Persistence section. This documentation shows how the typed API for persistence works and assumes you know what is meant by Command, Event and State.

Warning

This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.

Example

Let’s start with a simple example. The minimum required for a PersistentBehavior is:

Scala
sealed trait Command
sealed trait Event
case class State()

val behavior: Behavior[Command] =
  PersistentBehavior[Command, Event, State](
    persistenceId = PersistenceId("abc"),
    emptyState = State(),
    commandHandler =
      (state, cmd) ⇒
        throw new RuntimeException("TODO: process the command & return an Effect"),
    eventHandler =
      (state, evt) ⇒
        throw new RuntimeException("TODO: process the event return the next state")
  )
Java
public interface Command {}
public interface Event {}
public static class State {}

public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> {
  public MyPersistentBehavior(PersistenceId persistenceId) {
    super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
  }

  @Override
  public State emptyState() {
    return new State();
  }

  @Override
  public CommandHandler<Command, Event, State> commandHandler() {
    return (state, command) -> {
      throw new RuntimeException("TODO: process the command & return an Effect");
    };
  }

  @Override
  public EventHandler<State, Event>  eventHandler() {
    return (state, event) -> {
      throw new RuntimeException("TODO: process the event return the next state");
    };
  }

  @Override
  public void onRecoveryCompleted(State state) {
    throw new RuntimeException("TODO: add some end-of-recovery side-effect here");
  }

  @Override
  public Set<String> tagsFor(Event event) {
    throw new RuntimeException("TODO: inspect the event and return any tags it should have");
  }
}

static PersistentBehavior<Command, Event, State> persistentBehavior =
    new MyPersistentBehavior(new PersistenceId("pid"));

The first important thing to notice is the Behavior of a persistent actor is typed to the type of the Command because this is the type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system. The event and state are only used internally.

The components that make up a PersistentBehavior are:

  • persistenceId is the stable unique identifier for the persistent actor.
  • emptyState defines the State when the entity is first created e.g. a Counter would start with 0 as state.
  • commandHandler defines how to handle command by producing Effects e.g. persisting events, stopping the persistent actor.
  • eventHandler returns the new state given the current state when an event has been persisted.

Next we’ll discuss each of these in detail.

Command handler

The command handler is a function with 2 parameters, the current State and the incoming Command.

A command handler returns an Effect directive that defines what event or events, if any, to persist. Effects are created using a factory that is returned via the Effect() method the Effect factory and can be one of:

  • persist will persist one single event or several events atomically, i.e. all events are stored or none of them are stored if there is an error
  • none no events are to be persisted, for example a read-only command
  • unhandled the command is unhandled (not supported) in current state
  • stop stop this actor

In addition to returning the primary Effect for the command PersistentBehaviors can also chain side effects (SideEffects) are to be performed after successful persist which is achieved with the andThen and thenRun function e.g Effect.persist(..).andThenEffect().persist(..).andThen. The thenRun function is a convenience around creating a SideEffect.

In the example below a reply is sent to the replyTo ActorRef. Note that the new state after applying the event is passed as parameter to the thenRun function. All thenRun registered callbacks are executed sequentially after successful execution of the persist statement (or immediately, in case of none and unhandled).

Event handler

When an event has been persisted successfully the new state is created by applying the event to the current state with the eventHandler.

The event handler returns the new state, which must be immutable so you return a new instance of the state. The same event handler is also used when the entity is started up to recover its state from the stored events.

It is not recommended to perform side effects in the event handler, as those are also executed during recovery of an persistent actor

Basic example

Command and event:

Scala
sealed trait SimpleCommand
case class Cmd(data: String) extends SimpleCommand

sealed trait SimpleEvent
case class Evt(data: String) extends SimpleEvent
Java
public static class SimpleCommand {
  public final String data;

  public SimpleCommand(String data) {
    this.data = data;
  }
}

State is a List containing all the events:

Scala
case class ExampleState(events: List[String] = Nil)
Java
static class SimpleState {
  private final List<String> events;

  SimpleState(List<String> events) {
    this.events = events;
  }

  SimpleState() {
    this.events = new ArrayList<>();
  }


  SimpleState addEvent(SimpleEvent event) {
    List<String> newEvents = new ArrayList<>(events);
    newEvents.add(event.data);
    return new SimpleState(newEvents);
  }
}

The command handler persists the Cmd payload in an Evt. In this simple example the command handler is defined using a lambda, for the more complicated example below a CommandHandlerBuilder is used:

Scala
val commandHandler: CommandHandler[SimpleCommand, SimpleEvent, ExampleState] =
  CommandHandler.command {
    case Cmd(data) ⇒ Effect.persist(Evt(data))
  }
Java
@Override
public CommandHandler<SimpleCommand, SimpleEvent, SimpleState> commandHandler() {
  return (state, cmd) -> Effect().persist(new SimpleEvent(cmd.data));
}

The event handler appends the event to the state. This is called after successfully persisting the event in the database . As with the command handler the event handler is defined using a lambda, see below for a more complicated example using the EventHandlerBuilder:

Scala
val eventHandler: (ExampleState, SimpleEvent) ⇒ ExampleState = {
  case (state, Evt(data)) ⇒ state.copy(data :: state.events)
}
Java
@Override
public EventHandler<SimpleState, SimpleEvent> eventHandler() {
  return (state, event) -> state.addEvent(event);
}

These are used to create a PersistentBehavior:

Scala
val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] =
  PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState](
    persistenceId = PersistenceId("sample-id-1"),
    emptyState = ExampleState(Nil),
    commandHandler = commandHandler,
    eventHandler = eventHandler)
Java
public static PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState> pb =
    new PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState>(new PersistenceId("p1")) {

  @Override
  public SimpleState emptyState() {
    return new SimpleState();
  }

  @Override
  public CommandHandler<SimpleCommand, SimpleEvent, SimpleState> commandHandler() {
    return (state, cmd) -> Effect().persist(new SimpleEvent(cmd.data));
  }

  @Override
  public EventHandler<SimpleState, SimpleEvent> eventHandler() {
    return (state, event) -> state.addEvent(event);
  }

  @Override
  public EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> eventAdapter() {
    return new EventAdapterExample();
  }
};

Cluster Sharding and persistence

In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can resume operations Cluster Sharding is an excellent fit to spread persistent actors over a cluster and address them by id.

The PersistentBehavior can then be run as with any plain typed actor as described in actors documentation, but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together with Cluster Sharding. For a particular persistenceId only one persistent actor instance should be active at one time. If multiple instances were to persist events at the same time, the events would be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one active entity for each id. The Cluster Sharding example illustrates this common combination.

Accessing the ActorContext

If the persistent behavior needs to use the ActorContext, for example to spawn child actors, it can be obtained by wrapping construction with Behaviors.setup:

Scala
val behavior: Behavior[String] =
  Behaviors.setup { ctx ⇒
    PersistentBehavior[String, String, State](
      persistenceId = PersistenceId("myPersistenceId"),
      emptyState = new State,
      commandHandler = CommandHandler.command {
        cmd ⇒
          ctx.log.info("Got command {}", cmd)
          Effect.persist(cmd).thenRun { state ⇒
            ctx.log.info("event persisted, new state {}", state)
          }
      },
      eventHandler = {
        case (state, _) ⇒ state
      })
  }
Java
public Behavior<Command> behavior(PersistenceId persistenceId) {
  return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx));
}

class MyPersistentBehavior extends PersistentBehavior<Command, Event, RecoveryComplete.EventsInFlight> {

  // this makes the context available to the command handler etc.
  private final ActorContext<Command> ctx;

  public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) {
    super(persistenceId);
    this.ctx = ctx;
  }

Changing Behavior

After processing a message, plain typed actors are able to return the Behavior that is used for next message.

As you can see in the above examples this is not supported by typed persistent actors. Instead, the state is returned by eventHandler. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes and thus not allowed in Typed Persistence.

For basic actors you can use the same set of command handlers independent of what state the entity is in, as shown in above example. For more complex actors it’s useful to be able to change the behavior in the sense that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.

The next example shows how to define different behavior based on the current State. It is an actor that represents the state of a blog post. Before a post is started the only command it can process is to AddPost. Once it is started then it we can look it up with GetPost, modify it with ChangeBody or publish it with Publish.

The state is captured by:

Scala
sealed trait BlogState

case object BlankState extends BlogState

final case class DraftState(content: PostContent) extends BlogState {
  def withBody(newBody: String): DraftState =
    copy(content = content.copy(body = newBody))

  def postId: String = content.postId
}

final case class PublishedState(content: PostContent) extends BlogState {
  def postId: String = content.postId
}
Java
interface BlogState {}

public static enum BlankState implements BlogState {
  INSTANCE
}

public static class DraftState implements BlogState {
  final PostContent postContent;

  DraftState(PostContent postContent) {
    this.postContent = postContent;
  }

  public DraftState withContent(PostContent newContent) {
    return new DraftState(newContent);
  }

  public String postId() {
    return postContent.postId;
  }
}

public static class PublishedState implements BlogState {
  final PostContent postContent;

  PublishedState(PostContent postContent) {
    this.postContent = postContent;
  }

  public PublishedState withContent(PostContent newContent) {
    return new PublishedState(newContent);
  }

  public String postId() {
    return postContent.postId;
  }
}

The commands, of which only a subset are valid depending on the state:

Scala
sealed trait BlogCommand
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPostDone(postId: String)
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
final case class PostContent(postId: String, title: String, body: String)
Java
public interface BlogCommand {
}
public static class AddPost implements BlogCommand {
  final PostContent content;
  final ActorRef<AddPostDone> replyTo;

  public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
    this.content = content;
    this.replyTo = replyTo;
  }
}
public static class AddPostDone implements BlogCommand {
  final String postId;

  public AddPostDone(String postId) {
    this.postId = postId;
  }
}
public static class GetPost implements BlogCommand {
  final ActorRef<PostContent> replyTo;

  public GetPost(ActorRef<PostContent> replyTo) {
    this.replyTo = replyTo;
  }
}
public static class ChangeBody implements BlogCommand {
  final String newBody;
  final ActorRef<Done> replyTo;

  public ChangeBody(String newBody, ActorRef<Done> replyTo) {
    this.newBody = newBody;
    this.replyTo = replyTo;
  }
}
public static class Publish implements BlogCommand {
  final ActorRef<Done> replyTo;

  public Publish(ActorRef<Done> replyTo) {
    this.replyTo = replyTo;
  }
}
public static class PostContent implements BlogCommand {
  final String postId;
  final String title;
  final String body;

  public PostContent(String postId, String title, String body) {
    this.postId = postId;
    this.title = title;
    this.body = body;
  }
}

The commandler handler to process each command is decided by the state class (or state predicate) that is given to the commandHandlerBuilder and the match cases in the builders. Several builders can be composed with orElse: The command handler to process each command is decided by first looking at the state and then the command. It typically becomes two levels of pattern matching, first on the state and then on the command. Delegating to methods is a good practise because the one-line cases give a nice overview of the message dispatch.

Scala
private val commandHandler: (BlogState, BlogCommand) ⇒ Effect[BlogEvent, BlogState] = { (state, command) ⇒
  state match {

    case BlankState ⇒ command match {
      case cmd: AddPost ⇒ addPost(cmd)
      case _            ⇒ Effect.unhandled
    }

    case draftState: DraftState ⇒ command match {
      case cmd: ChangeBody  ⇒ changeBody(draftState, cmd)
      case Publish(replyTo) ⇒ publish(draftState, replyTo)
      case GetPost(replyTo) ⇒ getPost(draftState, replyTo)
      case _: AddPost       ⇒ Effect.unhandled
    }

    case publishedState: PublishedState ⇒ command match {
      case GetPost(replyTo) ⇒ getPost(publishedState, replyTo)
      case _                ⇒ Effect.unhandled
    }
  }
}

private def addPost(cmd: AddPost): Effect[BlogEvent, BlogState] = {
  val evt = PostAdded(cmd.content.postId, cmd.content)
  Effect.persist(evt).thenRun { _ ⇒
    // After persist is done additional side effects can be performed
    cmd.replyTo ! AddPostDone(cmd.content.postId)
  }
}

private def changeBody(state: DraftState, cmd: ChangeBody): Effect[BlogEvent, BlogState] = {
  val evt = BodyChanged(state.postId, cmd.newBody)
  Effect.persist(evt).thenRun { _ ⇒
    cmd.replyTo ! Done
  }
}

private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[BlogEvent, BlogState] = {
  Effect.persist(Published(state.postId)).thenRun { _ ⇒
    println(s"Blog post ${state.postId} was published")
    replyTo ! Done
  }
}

private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
  replyTo ! state.content
  Effect.none
}

private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
  replyTo ! state.content
  Effect.none
}

TODO rewrite this example to be more like the Scala example

Java
@Override
public CommandHandler<BlogCommand, BlogEvent, BlogState> commandHandler() {
  return
      initialCommandHandler()
          .orElse(draftCommandHandler())
          .orElse(publishedCommandHandler())
          .orElse(commonCommandHandler())
          .build();
}

The CommandHandlerBuilder for a post that hasn’t been initialized with content:

Java
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlankState, BlogState> initialCommandHandler() {
  return commandHandlerBuilder(BlankState.class)
      .matchCommand(AddPost.class, (state, cmd) -> {
        PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
        return Effect().persist(event)
            .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
      });
}

And a different CommandHandlerBuilder for after the post content has been added:

Java
private CommandHandlerBuilder<BlogCommand, BlogEvent, DraftState, BlogState> draftCommandHandler() {
  return commandHandlerBuilder(DraftState.class)
      .matchCommand(ChangeBody.class, (state, cmd) -> {
        BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
        return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
      })
      .matchCommand(Publish.class, (state, cmd) -> Effect()
          .persist(new Published(state.postId())).thenRun(() -> {
            System.out.println("Blog post published: " + state.postId());
            cmd.replyTo.tell(Done.getInstance());
          }))
      .matchCommand(GetPost.class, (state, cmd) -> {
        cmd.replyTo.tell(state.postContent);
        return Effect().none();
      });
}

private CommandHandlerBuilder<BlogCommand, BlogEvent, PublishedState, BlogState> publishedCommandHandler() {
  return commandHandlerBuilder(PublishedState.class)
      .matchCommand(ChangeBody.class, (state, cmd) -> {
        BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
        return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
      })
      .matchCommand(GetPost.class, (state, cmd) -> {
        cmd.replyTo.tell(state.postContent);
        return Effect().none();
      });
}

private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> commonCommandHandler() {
  return commandHandlerBuilder(BlogState.class)
      .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
}

The event handler:

Scala
private val eventHandler: (BlogState, BlogEvent) ⇒ BlogState = { (state, event) ⇒
  state match {

    case BlankState ⇒ event match {
      case PostAdded(_, content) ⇒
        DraftState(content)
      case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
    }

    case draftState: DraftState ⇒ event match {

      case BodyChanged(_, newBody) ⇒
        draftState.withBody(newBody)

      case Published(_) ⇒
        PublishedState(draftState.content)

      case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
    }

    case _: PublishedState ⇒
      // no more changes after published
      throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
  }
}
Java
@Override
public EventHandler<BlogState, BlogEvent> eventHandler() {
  return eventHandlerBuilder()
      .matchEvent(PostAdded.class, (state, event) ->
          new DraftState(event.content))
      .matchEvent(BodyChanged.class, DraftState.class, (state, chg) ->
          state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody)))
      .matchEvent(BodyChanged.class, PublishedState.class, (state, chg) ->
          state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody)))
      .matchEvent(Published.class, DraftState.class, (state, event) ->
          new PublishedState(state.postContent))
      .build();
}

And finally the behavior is created from the PersistentBehavior.apply:

Scala
def behavior(entityId: String): Behavior[BlogCommand] =
  PersistentBehavior[BlogCommand, BlogEvent, BlogState](
    persistenceId = PersistenceId(s"Blog-$entityId"),
    emptyState = BlankState,
    commandHandler,
    eventHandler)
Java
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, BlogState> {
  public static Behavior<BlogCommand> behavior(String entityId) {
    return Behaviors.setup(ctx ->
        new BlogBehavior(new PersistenceId("Blog-" + entityId), ctx)
    );
  }

  @Override
  public BlogState emptyState() {
    return BlankState.INSTANCE;
  }

  // commandHandler, eventHandler as in above snippets
}

This can be taken one or two steps further by defining the event and command handlers in the state class as illustrated in event handlers in the state and command handlers in the state.

There is also an example illustrating an optional initial state.

Effects and Side Effects

Each command has a single Effect which can be:

  • Persist events
  • None: Accept the comment but no effects
  • Unhandled: Don’t handle this message

Note that there is only one of these. It is not possible to both persist and say none/unhandled. These are created using a factory that is returned via the Effect() method the Effect factory and once created additional SideEffects can be added.

Most of them time this will be done with the thenRun method on the Effect above. It is also possible factor out common SideEffects. For example:

Scala
// Example factoring out a chained effect rather than using `andThen`
val commonChainedEffects = SideEffect[Mood](_ ⇒ println("Command processed"))
// Then in a command handler:
Effect.persist(Remembered("Yep")) // persist event
  .andThen(commonChainedEffects) // add on common chained effect
Java
// Factored out Chained effect
static final SideEffect<ExampleState>  commonChainedEffect =
    SideEffect.create(s -> System.out.println("Command handled!"));

 return commandHandlerBuilder(ExampleState.class)
   .matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
     .thenRun(() -> cmd.sender.tell(new Ack()))
     .andThen(commonChainedEffect)
   )
   .build();

Side effects ordering and guarantees

Any SideEffects are executed on an at-once basis and will not be executed if the persist fails. The SideEffects are executed sequentially, it is not possible to execute SideEffects in parallel.

Replies

The Request-Response interaction pattern is very common for persistent actors, because you typically want to know if the command was rejected due to validation errors and when accepted you want a confirmation when the events have been successfully stored.

Therefore you typically include a ActorRef[ReplyMessageType]ActorRef<ReplyMessageType> in the commands. After validation errors or after persisting events, using a thenRun side effect, the reply message can be sent to the ActorRef.

Scala
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPostDone(postId: String)
Java
public static class AddPost implements BlogCommand {
  final PostContent content;
  final ActorRef<AddPostDone> replyTo;

  public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
    this.content = content;
    this.replyTo = replyTo;
  }
}
public static class AddPostDone implements BlogCommand {
  final String postId;

  public AddPostDone(String postId) {
    this.postId = postId;
  }
}
Scala
val evt = PostAdded(cmd.content.postId, cmd.content)
Effect.persist(evt).thenRun { _ ⇒
  // After persist is done additional side effects can be performed
  cmd.replyTo ! AddPostDone(cmd.content.postId)
}
Java
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
    .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));

Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that it can be used to enforce that replies are not forgotten when implementing the PersistentBehavior. If it’s defined with PersistentBehavior.withEnforcedRepliesPersistentBehaviorWithEnforcedReplies there will be compilation errors if the returned effect isn’t a ReplyEffect, which can be created with Effect.replyEffects().reply, Effect.noReplyEffects().noReply, Effect.thenReplyEffects().thenReply, or Effect.thenNoReplyEffects().thenNoReply.

These effects will send the reply message even when PersistentBehavior.withEnforcedRepliesPersistentBehaviorWithEnforcedReplies is not used, but then there will be no compilation errors if the reply decision is left out.

Note that the noReply is a way of making conscious decision that a reply shouldn’t be sent for a specific command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.

Scala
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
  extends AccountCommand[OperationResult]
sealed trait AccountCommandReply
sealed trait OperationResult extends AccountCommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult

TODO include corresponding example in Java

When using the reply effect the commands must implement ExpectingReply to include the ActorRef[ReplyMessageType]ActorRef<ReplyMessageType> in a standardized way.

Scala
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = {
  if (acc.canWithdraw(cmd.amount)) {
    Effect.persist(Withdrawn(cmd.amount))
      .thenReply(cmd)(_ ⇒ Confirmed)

  } else {
    Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
  }
}

TODO include corresponding example in Java

Scala
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
  PersistentBehavior.withEnforcedReplies(
    PersistenceId(s"Account|$accountNumber"),
    EmptyAccount,
    commandHandler,
    eventHandler
  )
}

TODO include corresponding example in Java

Serialization

The same serialization mechanism as for untyped actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events you should also consider that it must be possible read old events when the application has evolved. Strategies for that can be found in the schema evolution.

Recovery

It is strongly discouraged to perform side effects in applyEvent, so side effects should be performed once recovery has completed in the onRecoveryCompleted callback. by overriding onRecoveryCompleted

Scala
val recoveryBehavior: Behavior[Command] =
  PersistentBehavior[Command, Event, State](
    persistenceId = PersistenceId("abc"),
    emptyState = State(),
    commandHandler =
      (state, cmd) ⇒
        throw new RuntimeException("TODO: process the command & return an Effect"),
    eventHandler =
      (state, evt) ⇒
        throw new RuntimeException("TODO: process the event return the next state")
  ).onRecoveryCompleted { state ⇒
      throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
    }
Java
@Override
public void onRecoveryCompleted(State state) {
  throw new RuntimeException("TODO: add some end-of-recovery side-effect here");
}

The onRecoveryCompleted takes an ActorContext and the current State, and doesn’t return anything.

Tagging

Persistence typed allows you to use event tags without using EventAdapter:

Scala
val taggingBehavior: Behavior[Command] =
  PersistentBehavior[Command, Event, State](
    persistenceId = PersistenceId("abc"),
    emptyState = State(),
    commandHandler =
      (state, cmd) ⇒
        throw new RuntimeException("TODO: process the command & return an Effect"),
    eventHandler =
      (state, evt) ⇒
        throw new RuntimeException("TODO: process the event return the next state")
  ).withTagger(_ ⇒ Set("tag1", "tag2"))
Java
@Override
public Set<String> tagsFor(Event event) {
  throw new RuntimeException("TODO: inspect the event and return any tags it should have");
}

Event adapters

Event adapters can be programmatically added to your PersistentBehaviors that can convert from your Event type to another type that is then passed to the journal.

Defining an event adapter is done by extending an EventAdapter:

Scala
case class Wrapper[T](t: T)
class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
  override def toJournal(e: T): Wrapper[T] = Wrapper(e)
  override def fromJournal(p: Wrapper[T]): T = p.t
}
Java
public static class Wrapper<T> {
  private final T t;
  public Wrapper(T t) {
    this.t = t;
  }
  public T getT() {
    return t;
  }
}

public static class EventAdapterExample extends EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> {
  @Override
  public Wrapper<SimpleEvent> toJournal(SimpleEvent simpleEvent) {
    return new Wrapper<>(simpleEvent);
  }
  @Override
  public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) {
    return simpleEventWrapper.getT();
  }
}

Then install it on a persistent behavior:

Scala
persistentBehavior.eventAdapter(new WrapperEventAdapter[Event])
Java
@Override
public EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> eventAdapter() {
  return new EventAdapterExample();
}

Wrapping Persistent Behaviors

When creating a PersistentBehavior, it is possible to wrap PersistentBehavior in other behaviors such as Behaviors.setup in order to access the ActorContext object. For instance to access the actor logging upon taking snapshots for debug purpose.

Scala
val samplePersistentBehavior = PersistentBehavior[Command, Event, State](
  persistenceId = PersistenceId("abc"),
  emptyState = State(),
  commandHandler =
    (state, cmd) ⇒
      throw new RuntimeException("TODO: process the command & return an Effect"),
  eventHandler =
    (state, evt) ⇒
      throw new RuntimeException("TODO: process the event return the next state")
).onRecoveryCompleted { state ⇒
    throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
  }

val debugAlwaysSnapshot: Behavior[Command] = Behaviors.setup {
  context ⇒
    samplePersistentBehavior.snapshotWhen((state, _, _) ⇒ {
      context.log.info(
        "Snapshot actor {} => state: {}",
        context.self.path.name, state)
      true
    })
}
Java
static Behavior<Command> debugAlwaysSnapshot = Behaviors.setup((context) -> {
          return new MyPersistentBehavior(new PersistenceId("pid")) {
            @Override
            public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
              context.getLog().info("Snapshot actor {} => state: {}",
                      context.getSelf().path().name(), state);
              return true;
            }
          };
        }
);

Journal failures

By default a PersistentBehavior will stop if an exception is thrown from the journal. It is possible to override this with any BackoffSupervisorStrategy. It is not possible to use the normal supervision wrapping for this as it isn’t valid to resume a behavior on a journal failure as it is not known if the event was persisted.

Scala
val supervisedBehavior = samplePersistentBehavior.onPersistFailure(
  SupervisorStrategy.restartWithBackoff(
    minBackoff = 10.seconds,
    maxBackoff = 60.seconds,
    randomFactor = 0.1
  ))
Java
public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> {
  public MyPersistentBehavior(PersistenceId persistenceId) {
    super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
  }

Journal rejections

Journals can reject events. The difference from a failure is that the journal must decide to reject an event before trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won’t be in the journal. This is signalled to a PersistentBehavior via a EventRejectedException and can be handled with a supervisor.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.