Persistence
Dependency
To use Akka Persistence Typed, add the module to your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-persistence-typed_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-persistence-typed_2.12:2.5.32" }
Introduction
Akka Persistence is a library for building event sourced actors. For background about how it works see the untyped Akka Persistence section. This documentation shows how the typed API for persistence works and assumes you know what is meant by Command
, Event
and State
.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Example
Let’s start with a simple example. The minimum required for a EventSourcedBehavior
is:
- Scala
-
source
sealed trait Command sealed trait Event final case class State() val behavior: Behavior[Command] = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
- Java
-
source
public class MyPersistentBehavior extends EventSourcedBehavior< MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> { static EventSourcedBehavior<Command, Event, State> eventSourcedBehavior = new MyPersistentBehavior(new PersistenceId("pid")); interface Command {} interface Event {} public static class State {} public MyPersistentBehavior(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return new State(); } @Override public CommandHandler<Command, Event, State> commandHandler() { return (state, command) -> { throw new RuntimeException("TODO: process the command & return an Effect"); }; } @Override public EventHandler<State, Event> eventHandler() { return (state, event) -> { throw new RuntimeException("TODO: process the event return the next state"); }; } }
The first important thing to notice is the Behavior
of a persistent actor is typed to the type of the Command
because this is the type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system. The event and state are only used internally.
The components that make up a EventSourcedBehavior are:
persistenceId
is the stable unique identifier for the persistent actor.emptyState
defines theState
when the entity is first created e.g. a Counter would start with 0 as state.commandHandler
defines how to handle command by producing Effects e.g. persisting events, stopping the persistent actor.eventHandler
returns the new state given the current state when an event has been persisted.
Next we’ll discuss each of these in detail.
Command handler
The command handler is a function with 2 parameters, the current State
and the incoming Command
.
A command handler returns an Effect
directive that defines what event or events, if any, to persist. Effects are created using a factory that is returned via the Effect()
method the Effect
factory and can be one of:
persist
will persist one single event or several events atomically, i.e. all events are stored or none of them are stored if there is an errornone
no events are to be persisted, for example a read-only commandunhandled
the command is unhandled (not supported) in current statestop
stop this actor
In addition to returning the primary Effect
for the command EventSourcedBehavior
s can also chain side effects (SideEffect
s) are to be performed after successful persist which is achieved with the andThen
and thenRun
function e.g Effect.persist(..).andThen
Effect().persist(..).andThen
. The thenRun
function is a convenience around creating a SideEffect
.
In the example below a reply is sent to the replyTo
ActorRef. Note that the new state after applying the event is passed as parameter to the thenRun
function. All thenRun
registered callbacks are executed sequentially after successful execution of the persist statement (or immediately, in case of none
and unhandled
).
Event handler
When an event has been persisted successfully the new state is created by applying the event to the current state with the eventHandler
.
The state is typically defined as an immutable class and then the event handler returns a new instance of the state. You may choose to use a mutable class for the state, and then the event handler may update the state instance and return the same instance. Both immutable and mutable state is supported.
The same event handler is also used when the entity is started up to recover its state from the stored events.
The event handler should only update the state and never perform side effects, as those would also be executed during recovery of the persistent actor.
Completing the example
Let’s fill in the details of the example.
Command and event:
- Scala
-
source
sealed trait Command final case class Add(data: String) extends Command case object Clear extends Command sealed trait Event final case class Added(data: String) extends Event case object Cleared extends Event
- Java
-
source
interface Command {} public static class Add implements Command { public final String data; public Add(String data) { this.data = data; } } public enum Clear implements Command { INSTANCE } interface Event {} public static class Added implements Event { public final String data; public Added(String data) { this.data = data; } } public enum Cleared implements Event { INSTANCE }
State is a List containing the 5 latest items:
- Scala
-
source
final case class State(history: List[String] = Nil)
- Java
-
source
public static class State { private final List<String> items; private State(List<String> items) { this.items = items; } public State() { this.items = new ArrayList<>(); } public State addItem(String data) { List<String> newItems = new ArrayList<>(items); newItems.add(0, data); // keep 5 items List<String> latest = newItems.subList(0, Math.min(4, newItems.size() - 1)); return new State(latest); } }
The command handler persists the Add
payload in an Added
event:
- Scala
-
source
import akka.persistence.typed.scaladsl.Effect val commandHandler: (State, Command) => Effect[Event, State] = { (state, command) => command match { case Add(data) => Effect.persist(Added(data)) case Clear => Effect.persist(Cleared) } }
- Java
-
source
@Override public CommandHandler<Command, Event, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(Add.class, command -> Effect().persist(new Added(command.data))) .onCommand(Clear.class, command -> Effect().persist(Cleared.INSTANCE)) .build(); }
The event handler appends the item to the state and keeps 5 items. This is called after successfully persisting the event in the database:
- Scala
-
source
val eventHandler: (State, Event) => State = { (state, event) => event match { case Added(data) => state.copy((data :: state.history).take(5)) case Cleared => State(Nil) } }
- Java
-
source
@Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(Added.class, (state, event) -> state.addItem(event.data)) .onEvent(Cleared.class, () -> new State()) .build(); }
These are used to create a EventSourcedBehavior
: These are defined in an EventSourcedBehavior
:
- Scala
-
source
def behavior(id: String): EventSourcedBehavior[Command, Event, State] = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId(id), emptyState = State(Nil), commandHandler = commandHandler, eventHandler = eventHandler)
- Java
-
source
public class MyPersistentBehavior extends EventSourcedBehavior< MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> { public MyPersistentBehavior(PersistenceId persistenceId) { super(persistenceId); } @Override public State emptyState() { return new State(); } @Override public CommandHandler<Command, Event, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(Add.class, command -> Effect().persist(new Added(command.data))) .onCommand(Clear.class, command -> Effect().persist(Cleared.INSTANCE)) .build(); } @Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(Added.class, (state, event) -> state.addItem(event.data)) .onEvent(Cleared.class, () -> new State()) .build(); } }
Cluster Sharding and persistence
In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can resume operations Cluster Sharding is an excellent fit to spread persistent actors over a cluster and address them by id.
The EventSourcedBehavior
can then be run as with any plain typed actor as described in actors documentation, but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together with Cluster Sharding. For a particular persistenceId
only one persistent actor instance should be active at one time. If multiple instances were to persist events at the same time, the events would be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one active entity for each id. The Cluster Sharding example illustrates this common combination.
Accessing the ActorContext
If the persistent behavior needs to use the ActorContext
, for example to spawn child actors, it can be obtained by wrapping construction with Behaviors.setup
:
- Scala
-
source
import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler val behaviorWithContext: Behavior[String] = Behaviors.setup { context => EventSourcedBehavior[String, String, State]( persistenceId = PersistenceId("myPersistenceId"), emptyState = new State, commandHandler = CommandHandler.command { cmd => context.log.info("Got command {}", cmd) Effect.persist(cmd).thenRun { state => context.log.info("event persisted, new state {}", state) } }, eventHandler = { case (state, _) => state }) }
- Java
-
source
public class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, State> { public static Behavior<Command> behavior(PersistenceId persistenceId) { return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx)); } // this makes the context available to the command handler etc. private final ActorContext<Command> ctx; public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) { super(persistenceId); this.ctx = ctx; } }
Changing Behavior
After processing a message, plain typed actors are able to return the Behavior
that is used for next message.
As you can see in the above examples this is not supported by typed persistent actors. Instead, the state is returned by eventHandler
. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes and thus not allowed in Typed Persistence.
For basic actors you can use the same set of command handlers independent of what state the entity is in, as shown in above example. For more complex actors it’s useful to be able to change the behavior in the sense that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.
The next example shows how to define different behavior based on the current State
. It is an actor that represents the state of a blog post. Before a post is started the only command it can process is to AddPost
. Once it is started then it we can look it up with GetPost
, modify it with ChangeBody
or publish it with Publish
.
The state is captured by:
- Scala
-
source
sealed trait BlogState case object BlankState extends BlogState final case class DraftState(content: PostContent) extends BlogState { def withBody(newBody: String): DraftState = copy(content = content.copy(body = newBody)) def postId: String = content.postId } final case class PublishedState(content: PostContent) extends BlogState { def postId: String = content.postId }
- Java
-
source
interface BlogState {} public enum BlankState implements BlogState { INSTANCE } public class DraftState implements BlogState { final PostContent content; DraftState(PostContent content) { this.content = content; } public DraftState withContent(PostContent newContent) { return new DraftState(newContent); } public DraftState withBody(String newBody) { return withContent(new PostContent(postId(), content.title, newBody)); } public String postId() { return content.postId; } } public class PublishedState implements BlogState { final PostContent content; PublishedState(PostContent content) { this.content = content; } public PublishedState withContent(PostContent newContent) { return new PublishedState(newContent); } public PublishedState withBody(String newBody) { return withContent(new PostContent(postId(), content.title, newBody)); } public String postId() { return content.postId; } }
The commands, of which only a subset are valid depending on the state:
- Scala
-
source
sealed trait BlogCommand final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand final case class AddPostDone(postId: String) final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand final case class PostContent(postId: String, title: String, body: String)
- Java
-
source
public interface BlogCommand {} public class AddPost implements BlogCommand { final PostContent content; final ActorRef<AddPostDone> replyTo; public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) { this.content = content; this.replyTo = replyTo; } } public class AddPostDone implements BlogCommand { final String postId; public AddPostDone(String postId) { this.postId = postId; } } public class GetPost implements BlogCommand { final ActorRef<PostContent> replyTo; public GetPost(ActorRef<PostContent> replyTo) { this.replyTo = replyTo; } } public class ChangeBody implements BlogCommand { final String newBody; final ActorRef<Done> replyTo; public ChangeBody(String newBody, ActorRef<Done> replyTo) { this.newBody = newBody; this.replyTo = replyTo; } } public class Publish implements BlogCommand { final ActorRef<Done> replyTo; public Publish(ActorRef<Done> replyTo) { this.replyTo = replyTo; } } public class PostContent implements BlogCommand { final String postId; final String title; final String body; public PostContent(String postId, String title, String body) { this.postId = postId; this.title = title; this.body = body; } }
The commandler handler to process each command is decided by the state class (or state predicate) that is given to the forStateType
of the CommandHandlerBuilder
and the match cases in the builders. The command handler to process each command is decided by first looking at the state and then the command. It typically becomes two levels of pattern matching, first on the state and then on the command. Delegating to methods is a good practice because the one-line cases give a nice overview of the message dispatch.
- Scala
-
source
private val commandHandler: (BlogState, BlogCommand) => Effect[BlogEvent, BlogState] = { (state, command) => state match { case BlankState => command match { case cmd: AddPost => addPost(cmd) case _ => Effect.unhandled } case draftState: DraftState => command match { case cmd: ChangeBody => changeBody(draftState, cmd) case Publish(replyTo) => publish(draftState, replyTo) case GetPost(replyTo) => getPost(draftState, replyTo) case _: AddPost => Effect.unhandled } case publishedState: PublishedState => command match { case GetPost(replyTo) => getPost(publishedState, replyTo) case _ => Effect.unhandled } } } private def addPost(cmd: AddPost): Effect[BlogEvent, BlogState] = { val evt = PostAdded(cmd.content.postId, cmd.content) Effect.persist(evt).thenRun { _ => // After persist is done additional side effects can be performed cmd.replyTo ! AddPostDone(cmd.content.postId) } } private def changeBody(state: DraftState, cmd: ChangeBody): Effect[BlogEvent, BlogState] = { val evt = BodyChanged(state.postId, cmd.newBody) Effect.persist(evt).thenRun { _ => cmd.replyTo ! Done } } private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[BlogEvent, BlogState] = { Effect.persist(Published(state.postId)).thenRun { _ => println(s"Blog post ${state.postId} was published") replyTo ! Done } } private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = { replyTo ! state.content Effect.none } private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = { replyTo ! state.content Effect.none }
- Java
-
source
@Override public CommandHandler<BlogCommand, BlogEvent, BlogState> commandHandler() { CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState> builder = newCommandHandlerBuilder(); builder.forStateType(BlankState.class).onCommand(AddPost.class, this::addPost); builder .forStateType(DraftState.class) .onCommand(ChangeBody.class, this::changeBody) .onCommand(Publish.class, this::publish) .onCommand(GetPost.class, this::getPost); builder .forStateType(PublishedState.class) .onCommand(ChangeBody.class, this::changeBody) .onCommand(GetPost.class, this::getPost); builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> Effect().unhandled()); return builder.build(); } private Effect<BlogEvent, BlogState> addPost(AddPost cmd) { PostAdded event = new PostAdded(cmd.content.postId, cmd.content); return Effect() .persist(event) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); } private Effect<BlogEvent, BlogState> changeBody(DraftState state, ChangeBody cmd) { BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance())); } private Effect<BlogEvent, BlogState> changeBody(PublishedState state, ChangeBody cmd) { BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance())); } private Effect<BlogEvent, BlogState> publish(DraftState state, Publish cmd) { return Effect() .persist(new Published(state.postId())) .thenRun( () -> { System.out.println("Blog post published: " + state.postId()); cmd.replyTo.tell(Done.getInstance()); }); } private Effect<BlogEvent, BlogState> getPost(DraftState state, GetPost cmd) { cmd.replyTo.tell(state.content); return Effect().none(); } private Effect<BlogEvent, BlogState> getPost(PublishedState state, GetPost cmd) { cmd.replyTo.tell(state.content); return Effect().none(); }
The event handler:
- Scala
-
source
private val eventHandler: (BlogState, BlogEvent) => BlogState = { (state, event) => state match { case BlankState => event match { case PostAdded(_, content) => DraftState(content) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [$state]") } case draftState: DraftState => event match { case BodyChanged(_, newBody) => draftState.withBody(newBody) case Published(_) => PublishedState(draftState.content) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [$state]") } case _: PublishedState => // no more changes after published throw new IllegalStateException(s"unexpected event [$event] in state [$state]") } }
- Java
-
source
@Override public EventHandler<BlogState, BlogEvent> eventHandler() { EventHandlerBuilder<BlogState, BlogEvent> builder = newEventHandlerBuilder(); builder .forStateType(BlankState.class) .onEvent(PostAdded.class, event -> new DraftState(event.content)); builder .forStateType(DraftState.class) .onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody)) .onEvent(Published.class, (state, event) -> new PublishedState(state.content)); builder .forStateType(PublishedState.class) .onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody)); return builder.build(); }
And finally the behavior is created from the EventSourcedBehavior.apply
:
- Scala
-
source
def behavior(entityId: String): Behavior[BlogCommand] = EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( persistenceId = PersistenceId(s"Blog-$entityId"), emptyState = BlankState, commandHandler, eventHandler)
- Java
-
source
public class BlogBehavior extends EventSourcedBehavior<BlogCommand, BlogEvent, BlogState> { public BlogBehavior(PersistenceId persistenceId) { super(persistenceId); } public static Behavior<BlogCommand> behavior(String entityId) { return Behaviors.setup(ctx -> new BlogBehavior(new PersistenceId("Blog-" + entityId))); } @Override public BlogState emptyState() { return BlankState.INSTANCE; } // commandHandler, eventHandler as in above snippets }
This can be taken one or two steps further by defining the event and command handlers in the state class as illustrated in event handlers in the state and command handlers in the state.
There is also an example illustrating an optional initial state.
Effects and Side Effects
Each command has a single Effect
which can be:
- Persist events
- None: Accept the command but no effects
- Unhandled: Don’t handle this command
- Stash: the current command is placed in a buffer and can be unstashed and processed later
Note that there is only one of these. It is not possible to both persist and say none/unhandled. These are created using a factory that is returned via the Effect()
method the Effect
factory and once created additional SideEffects
can be added.
Most of them time this will be done with the thenRun
method on the Effect
above. You can factor out common side effects into functions and reuse for several commands. For example:
- Scala
-
source
// Example factoring out a chained effect to use in several places with `thenRun` val commonChainedEffects: Mood => Unit = _ => println("Command processed") // Then in a command handler: Effect .persist(Remembered("Yep")) // persist event .thenRun(commonChainedEffects) // add on common chained effect
- Java
-
source
// Example factoring out a chained effect to use in several places with `thenRun` static final Procedure<ExampleState> commonChainedEffect = state -> System.out.println("Command handled!"); return newCommandHandlerBuilder() .forStateType(ExampleState.class) .onCommand( Cmd.class, (state, cmd) -> Effect() .persist(new Evt(cmd.data)) .thenRun(() -> cmd.sender.tell(new Ack())) .thenRun(commonChainedEffect)) .build();
Side effects ordering and guarantees
Any SideEffect
s are executed on an at-once basis and will not be executed if the persist fails. The SideEffect
s are executed sequentially, it is not possible to execute SideEffect
s in parallel.
Replies
The Request-Response interaction pattern is very common for persistent actors, because you typically want to know if the command was rejected due to validation errors and when accepted you want a confirmation when the events have been successfully stored.
Therefore you typically include a ActorRef[ReplyMessageType]
ActorRef<ReplyMessageType>
in the commands. After validation errors or after persisting events, using a thenRun
side effect, the reply message can be sent to the ActorRef
.
- Scala
-
source
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand final case class AddPostDone(postId: String)
- Java
-
source
public class AddPost implements BlogCommand { final PostContent content; final ActorRef<AddPostDone> replyTo; public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) { this.content = content; this.replyTo = replyTo; } } public class AddPostDone implements BlogCommand { final String postId; public AddPostDone(String postId) { this.postId = postId; } }
- Scala
-
source
val evt = PostAdded(cmd.content.postId, cmd.content) Effect.persist(evt).thenRun { _ => // After persist is done additional side effects can be performed cmd.replyTo ! AddPostDone(cmd.content.postId) }
- Java
-
source
PostAdded event = new PostAdded(cmd.content.postId, cmd.content); return Effect() .persist(event) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that it can be used to enforce that replies are not forgotten when implementing the EventSourcedBehavior
. If it’s defined with EventSourcedBehavior.withEnforcedReplies
EventSourcedBehaviorWithEnforcedReplies
there will be compilation errors if the returned effect isn’t a ReplyEffect
, which can be created with Effect.reply
Effects().reply
, Effect.noReply
Effects().noReply
, Effect.thenReply
Effects().thenReply
, or Effect.thenNoReply
Effects().thenNoReply
.
- Scala
-
source
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { EventSourcedBehavior.withEnforcedReplies( PersistenceId(s"Account|$accountNumber"), EmptyAccount, commandHandler, eventHandler) }
- Java
-
source
public class AccountEntity extends EventSourcedBehaviorWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
The commands must implement ExpectingReply
to include the ActorRef[ReplyMessageType]
ActorRef<ReplyMessageType>
in a standardized way.
- Scala
-
source
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] sealed trait AccountCommandReply sealed trait OperationResult extends AccountCommandReply case object Confirmed extends OperationResult final case class Rejected(reason: String) extends OperationResult
- Java
-
source
interface AccountCommand<Reply> extends ExpectingReply<Reply> {} interface AccountCommandReply {} interface OperationResult extends AccountCommandReply {} enum Confirmed implements OperationResult { INSTANCE } public static class Rejected implements OperationResult { public final String reason; public Rejected(String reason) { this.reason = reason; } }
The ReplyEffect
is created with Effect.reply
Effects().reply
, Effect.noReply
Effects().noReply
, Effect.thenReply
Effects().thenReply
, or Effect.thenNoReply
Effects().thenNoReply
.
Note that command handlers are defined with newCommandHandlerWithReplyBuilder
when using EventSourcedBehaviorWithEnforcedReplies
, as opposed to newCommandHandlerBuilder when using EventSourcedBehavior
.]
- Scala
-
source
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = { if (acc.canWithdraw(cmd.amount)) { Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed) } else { Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) } }
- Java
-
source
private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } }
These effects will send the reply message even when EventSourcedBehavior.withEnforcedReplies
EventSourcedBehaviorWithEnforcedReplies
is not used, but then there will be no compilation errors if the reply decision is left out.
Note that the noReply
is a way of making conscious decision that a reply shouldn’t be sent for a specific command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
Serialization
The same serialization mechanism as for untyped actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events you should also consider that it must be possible read old events when the application has evolved. Strategies for that can be found in the schema evolution.
Recovery
It is strongly discouraged to perform side effects in applyEvent
, so side effects should be performed once recovery has completed as a reaction to the RecoveryCompleted
signal receiveSignal
handler by overriding receiveSignal
- Scala
-
source
val recoveryBehavior: Behavior[Command] = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) .receiveSignal { case (state, RecoveryCompleted) => throw new RuntimeException("TODO: add some end-of-recovery side-effect here") }
- Java
-
source
@Override public SignalHandler signalHandler() { return newSignalHandlerBuilder() .onSignal( RecoveryCompleted.instance(), state -> { throw new RuntimeException("TODO: add some end-of-recovery side-effect here"); }) .build(); }
The RecoveryCompleted
contains the current State
.
@ref[Snapshots)[persistence-snapshot.md] can be used for optimizing recovery times.
Tagging
Persistence typed allows you to use event tags without using EventAdapter
:
- Scala
-
source
val taggingBehavior: Behavior[Command] = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) .withTagger(_ => Set("tag1", "tag2"))
- Java
-
source
@Override public Set<String> tagsFor(Event event) { throw new RuntimeException("TODO: inspect the event and return any tags it should have"); }
Event adapters
Event adapters can be programmatically added to your EventSourcedBehavior
s that can convert from your Event
type to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:
- Scala
-
source
case class Wrapper[T](t: T) class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] { override def toJournal(e: T): Wrapper[T] = Wrapper(e) override def fromJournal(p: Wrapper[T]): T = p.t }
- Java
-
source
public static class Wrapper<T> { private final T t; public Wrapper(T t) { this.t = t; } public T getT() { return t; } } public static class EventAdapterExample extends EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> { @Override public Wrapper<SimpleEvent> toJournal(SimpleEvent simpleEvent) { return new Wrapper<>(simpleEvent); } @Override public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) { return simpleEventWrapper.getT(); } }
Then install it on a persistent behavior:
- Scala
-
source
persistentBehavior.eventAdapter(new WrapperEventAdapter[Event])
- Java
-
source
@Override public EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> eventAdapter() { return new EventAdapterExample(); }
Wrapping Persistent Behaviors
When creating a EventSourcedBehavior
, it is possible to wrap EventSourcedBehavior
in other behaviors such as Behaviors.setup
in order to access the ActorContext
object. For instance to access the actor logging upon taking snapshots for debug purpose.
- Scala
-
source
val samplePersistentBehavior = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) .receiveSignal { case (state, RecoveryCompleted) => throw new RuntimeException("TODO: add some end-of-recovery side-effect here") } val debugAlwaysSnapshot: Behavior[Command] = Behaviors.setup { context => samplePersistentBehavior.snapshotWhen((state, _, _) => { context.log.info("Snapshot actor {} => state: {}", context.self.path.name, state) true }) }
- Java
-
source
Behavior<Command> debugAlwaysSnapshot = Behaviors.setup( (context) -> { return new MyPersistentBehavior(new PersistenceId("pid")) { @Override public boolean shouldSnapshot(State state, Event event, long sequenceNr) { context .getLog() .info( "Snapshot actor {} => state: {}", context.getSelf().path().name(), state); return true; } }; });
Journal failures
By default a EventSourcedBehavior
will stop if an exception is thrown from the journal. It is possible to override this with any BackoffSupervisorStrategy
. It is not possible to use the normal supervision wrapping for this as it isn’t valid to resume
a behavior on a journal failure as it is not known if the event was persisted.
- Scala
-
source
val supervisedBehavior = samplePersistentBehavior.onPersistFailure( SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
- Java
-
source
public class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, State> { public MyPersistentBehavior(PersistenceId persistenceId) { super( persistenceId, SupervisorStrategy.restartWithBackoff( Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2)); }
Journal rejections
Journals can reject events. The difference from a failure is that the journal must decide to reject an event before trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won’t be in the journal. This is signalled to a EventSourcedBehavior
via a EventRejectedException
and can be handled with a supervisor.
Stash
When persisting events with persist
or persistAll
it is guaranteed that the persistent actor will not receive further commands until after the events have been confirmed to be persisted and additional side effects have been run. Incoming messages are stashed automatically until the persist
is completed.
Commands are also stashed during recovery and will not interfere with replayed events. Commands will be received when recovery has been completed.
The stashing described above is handled automatically, but there is also a possibility to stash commands when they are received to defer processing of them until later. One example could be waiting for some external condition or interaction to complete before processing additional commands. That is accomplished by returning a stash
effect and later use thenUnstashAll
.
Let’s use an example of a task manager to illustrate how the stashing effects can be used. It handles three commands; StartTask
, NextStep
and EndTask
. Those commands are associated with a given taskId
and the manager process one taskId
at a time. A task is started when receiving StartTask
, and continues when receiving NextStep
commands until the final EndTask
is received. Commands with another taskId
than the one in progress are deferred by stashing them. When EndTask
is processed a new task can start and the stashed commands are processed.
- Scala
-
source
object TaskManager { sealed trait Command final case class StartTask(taskId: String) extends Command final case class NextStep(taskId: String, instruction: String) extends Command final case class EndTask(taskId: String) extends Command sealed trait Event final case class TaskStarted(taskId: String) extends Event final case class TaskStep(taskId: String, instruction: String) extends Event final case class TaskCompleted(taskId: String) extends Event final case class State(taskIdInProgress: Option[String]) def apply(persistenceId: PersistenceId): Behavior[Command] = EventSourcedBehavior[Command, Event, State]( persistenceId = persistenceId, emptyState = State(None), commandHandler = (state, command) => onCommand(state, command), eventHandler = (state, event) => applyEvent(state, event)) .onPersistFailure(SupervisorStrategy.restartWithBackoff(1.second, 30.seconds, 0.2)) private def onCommand(state: State, command: Command): Effect[Event, State] = { state.taskIdInProgress match { case None => command match { case StartTask(taskId) => Effect.persist(TaskStarted(taskId)) case _ => Effect.unhandled } case Some(inProgress) => command match { case StartTask(taskId) => if (inProgress == taskId) Effect.none // duplicate, already in progress else // other task in progress, wait with new task until later Effect.stash() case NextStep(taskId, instruction) => if (inProgress == taskId) Effect.persist(TaskStep(taskId, instruction)) else // other task in progress, wait with new task until later Effect.stash() case EndTask(taskId) => if (inProgress == taskId) Effect.persist(TaskCompleted(taskId)).thenUnstashAll() // continue with next task else // other task in progress, wait with new task until later Effect.stash() } } } private def applyEvent(state: State, event: Event): State = { event match { case TaskStarted(taskId) => State(Option(taskId)) case TaskStep(_, _) => state case TaskCompleted(_) => State(None) } } }
- Java
-
source
public static class TaskManager extends EventSourcedBehavior<TaskManager.Command, TaskManager.Event, TaskManager.State> { public interface Command {} public static final class StartTask implements Command { public final String taskId; public StartTask(String taskId) { this.taskId = taskId; } } public static final class NextStep implements Command { public final String taskId; public final String instruction; public NextStep(String taskId, String instruction) { this.taskId = taskId; this.instruction = instruction; } } public static final class EndTask implements Command { public final String taskId; public EndTask(String taskId) { this.taskId = taskId; } } public interface Event {} public static final class TaskStarted implements Event { public final String taskId; public TaskStarted(String taskId) { this.taskId = taskId; } } public static final class TaskStep implements Event { public final String taskId; public final String instruction; public TaskStep(String taskId, String instruction) { this.taskId = taskId; this.instruction = instruction; } } public static final class TaskCompleted implements Event { public final String taskId; public TaskCompleted(String taskId) { this.taskId = taskId; } } public static class State { public final Optional<String> taskIdInProgress; public State(Optional<String> taskIdInProgress) { this.taskIdInProgress = taskIdInProgress; } } public static Behavior<Command> createBehavior(PersistenceId persistenceId) { return new TaskManager(persistenceId); } public TaskManager(PersistenceId persistenceId) { super( persistenceId, SupervisorStrategy.restartWithBackoff( Duration.ofSeconds(1), Duration.ofSeconds(30), 0.2)); } @Override public State emptyState() { return new State(Optional.empty()); } @Override public CommandHandler<Command, Event, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(StartTask.class, this::onStartTask) .onCommand(NextStep.class, this::onNextStep) .onCommand(EndTask.class, this::onEndTask) .build(); } private Effect<Event, State> onStartTask(State state, StartTask command) { if (state.taskIdInProgress.isPresent()) { if (state.taskIdInProgress.get().equals(command.taskId)) return Effect().none(); // duplicate, already in progress else return Effect().stash(); // other task in progress, wait with new task until later } else { return Effect().persist(new TaskStarted(command.taskId)); } } private Effect<Event, State> onNextStep(State state, NextStep command) { if (state.taskIdInProgress.isPresent()) { if (state.taskIdInProgress.get().equals(command.taskId)) return Effect().persist(new TaskStep(command.taskId, command.instruction)); else return Effect().stash(); // other task in progress, wait with new task until later } else { return Effect().unhandled(); } } private Effect<Event, State> onEndTask(State state, EndTask command) { if (state.taskIdInProgress.isPresent()) { if (state.taskIdInProgress.get().equals(command.taskId)) return Effect().persist(new TaskCompleted(command.taskId)); else return Effect().stash(); // other task in progress, wait with new task until later } else { return Effect().unhandled(); } } @Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(TaskStarted.class, (state, event) -> new State(Optional.of(event.taskId))) .onEvent(TaskStep.class, (state, event) -> state) .onEvent(TaskCompleted.class, (state, event) -> new State(Optional.empty())) .build(); } }
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the stash buffer will fill up and when reaching its maximum capacity the commands will be dropped. The capacity can be configured with:
akka.persistence.typed.stash-capacity = 10000
Note that the stashed commands are kept in an in-memory buffer, so in case of a crash they will not be processed.
- Stashed commands are discarded if the actor (entity) is passivated or rebalanced by Cluster Sharding.
- Stashed commands are discarded if the actor is restarted (or stopped) due to that an exception was thrown from processing a command or side effect after persisting.
- Stashed commands are preserved and processed later in case of failure in storing events if an
onPersistFailure
backoff supervisor strategy is defined.
It’s allowed to stash messages while unstashing. Those newly added commands will not be processed by the unstashAll
effect that was in progress and have to be unstashed by another unstashAll
.