Persistence - coding style
Event handlers in the state
The section about Changing Behavior described how commands and events can be handled differently depending on the state. One can take that one step further and define the event handler inside the state classes. In next section the command handlers are also defined in the state.
The state can be seen as your domain object and it should contain the core business logic. Then it’s a matter of taste if event handlers and command handlers should be defined in the state or be kept outside it.
Here we are using a bank account as the example domain. It has 3 state classes that are representing the lifecycle of the account; EmptyAccount
, OpenedAccount
, and ClosedAccount
.
- Scala
-
source
object AccountEntity { // Command sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply sealed trait AccountCommandReply sealed trait OperationResult extends AccountCommandReply case object Confirmed extends OperationResult final case class Rejected(reason: String) extends OperationResult final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply // Event sealed trait AccountEvent case object AccountCreated extends AccountEvent case class Deposited(amount: BigDecimal) extends AccountEvent case class Withdrawn(amount: BigDecimal) extends AccountEvent case object AccountClosed extends AccountEvent val Zero = BigDecimal(0) // State sealed trait Account { def applyEvent(event: AccountEvent): Account } case object EmptyAccount extends Account { override def applyEvent(event: AccountEvent): Account = event match { case AccountCreated => OpenedAccount(Zero) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]") } } case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyEvent(event: AccountEvent): Account = event match { case Deposited(amount) => copy(balance = balance + amount) case Withdrawn(amount) => copy(balance = balance - amount) case AccountClosed => ClosedAccount case AccountCreated => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]") } def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } } case object ClosedAccount extends Account { override def applyEvent(event: AccountEvent): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } // Note that after defining command, event and state classes you would probably start here when writing this. // When filling in the parameters of EventSourcedBehavior.apply you can use IntelliJ alt+Enter > createValue // to generate the stub with types for the command and event handlers. def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { EventSourcedBehavior.withEnforcedReplies( PersistenceId(s"Account|$accountNumber"), EmptyAccount, commandHandler, eventHandler) } private val commandHandler: (Account, AccountCommand[_]) => ReplyEffect[AccountEvent, Account] = { (state, cmd) => state match { case EmptyAccount => cmd match { case c: CreateAccount => createAccount(c) case _ => Effect.unhandled.thenNoReply() // CreateAccount before handling any other commands } case acc @ OpenedAccount(_) => cmd match { case c: Deposit => deposit(c) case c: Withdraw => withdraw(acc, c) case c: GetBalance => getBalance(acc, c) case c: CloseAccount => closeAccount(acc, c) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } case ClosedAccount => cmd match { case c @ (_: Deposit | _: Withdraw) => Effect.reply(c)(Rejected("Account is closed")) case c: GetBalance => Effect.reply(c)(CurrentBalance(Zero)) case c: CloseAccount => Effect.reply(c)(Rejected("Account is already closed")) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } } } private val eventHandler: (Account, AccountEvent) => Account = { (state, event) => state.applyEvent(event) } private def createAccount(cmd: CreateAccount): ReplyEffect[AccountEvent, Account] = { Effect.persist(AccountCreated).thenReply(cmd)(_ => Confirmed) } private def deposit(cmd: Deposit): ReplyEffect[AccountEvent, Account] = { Effect.persist(Deposited(cmd.amount)).thenReply(cmd)(_ => Confirmed) } private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = { if (acc.canWithdraw(cmd.amount)) { Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed) } else { Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) } } private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[AccountEvent, Account] = { Effect.reply(cmd)(CurrentBalance(acc.balance)) } private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[AccountEvent, Account] = { if (acc.balance == Zero) Effect.persist(AccountClosed).thenReply(cmd)(_ => Confirmed) else Effect.reply(cmd)(Rejected("Can't close account with non-zero balance")) } }
- Java
-
source
public class AccountEntity extends EventSourcedBehaviorWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { // Command interface AccountCommand<Reply> extends ExpectingReply<Reply> {} public static class CreateAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CreateAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Deposit implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; this.amount = amount; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Withdraw implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.amount = amount; this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class GetBalance implements AccountCommand<CurrentBalance> { private final ActorRef<CurrentBalance> replyTo; public GetBalance(ActorRef<CurrentBalance> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<CurrentBalance> replyTo() { return replyTo; } } public static class CloseAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CloseAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } // Reply interface AccountCommandReply {} interface OperationResult extends AccountCommandReply {} enum Confirmed implements OperationResult { INSTANCE } public static class Rejected implements OperationResult { public final String reason; public Rejected(String reason) { this.reason = reason; } } public static class CurrentBalance implements AccountCommandReply { public final BigDecimal balance; public CurrentBalance(BigDecimal balance) { this.balance = balance; } } // Event interface AccountEvent {} public static class AccountCreated implements AccountEvent {} public static class Deposited implements AccountEvent { public final BigDecimal amount; Deposited(BigDecimal amount) { this.amount = amount; } } public static class Withdrawn implements AccountEvent { public final BigDecimal amount; Withdrawn(BigDecimal amount) { this.amount = amount; } } public static class AccountClosed implements AccountEvent {} // State interface Account {} public static class EmptyAccount implements Account { OpenedAccount openedAccount() { return new OpenedAccount(BigDecimal.ZERO); } } public static class OpenedAccount implements Account { private final BigDecimal balance; public OpenedAccount(BigDecimal balance) { this.balance = balance; } OpenedAccount makeDeposit(BigDecimal amount) { return new OpenedAccount(balance.add(amount)); } boolean canWithdraw(BigDecimal amount) { return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0); } OpenedAccount makeWithdraw(BigDecimal amount) { if (!canWithdraw(amount)) throw new IllegalStateException("Account balance can't be negative"); return new OpenedAccount(balance.subtract(amount)); } ClosedAccount closedAccount() { return new ClosedAccount(); } } public static class ClosedAccount implements Account {} public static Behavior<AccountCommand> behavior(String accountNumber) { return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); } public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) { super(new PersistenceId(accountNumber)); } @Override public Account emptyState() { return new EmptyAccount(); } @Override public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() { CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder = newCommandHandlerWithReplyBuilder(); builder.forStateType(EmptyAccount.class).onCommand(CreateAccount.class, this::createAccount); builder .forStateType(OpenedAccount.class) .onCommand(Deposit.class, this::deposit) .onCommand(Withdraw.class, this::withdraw) .onCommand(GetBalance.class, this::getBalance) .onCommand(CloseAccount.class, this::closeAccount); builder .forStateType(ClosedAccount.class) .onAnyCommand(() -> Effect().unhandled().thenNoReply()); return builder.build(); } private ReplyEffect<AccountEvent, Account> createAccount( EmptyAccount account, CreateAccount command) { return Effect() .persist(new AccountCreated()) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } } private ReplyEffect<AccountEvent, Account> getBalance( OpenedAccount account, GetBalance command) { return Effect().reply(command, new CurrentBalance(account.balance)); } private ReplyEffect<AccountEvent, Account> closeAccount( OpenedAccount account, CloseAccount command) { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) .thenReply(command, account2 -> Confirmed.INSTANCE); } else { return Effect().reply(command, new Rejected("balance must be zero for closing account")); } } @Override public EventHandler<Account, AccountEvent> eventHandler() { EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder(); builder .forStateType(EmptyAccount.class) .onEvent(AccountCreated.class, (account, created) -> account.openedAccount()); builder .forStateType(OpenedAccount.class) .onEvent(Deposited.class, (account, deposited) -> account.makeDeposit(deposited.amount)) .onEvent(Withdrawn.class, (account, withdrawn) -> account.makeWithdraw(withdrawn.amount)) .onEvent(AccountClosed.class, (account, closed) -> account.closedAccount()); return builder.build(); } }
Notice how the eventHandler
delegates to the applyEvent
in the Account
(state), which is implemented in the concrete EmptyAccount
, OpenedAccount
, and ClosedAccount
. Notice how the eventHandler
delegates to methods in the concrete Account
(state) classes; EmptyAccount
, OpenedAccount
, and ClosedAccount
.
Command handlers in the state
We can take the previous bank account example one step further by handling the commands in the state too.
- Scala
-
source
object AccountEntity { // Command sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply sealed trait AccountCommandReply sealed trait OperationResult extends AccountCommandReply case object Confirmed extends OperationResult final case class Rejected(reason: String) extends OperationResult final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply // Event sealed trait AccountEvent case object AccountCreated extends AccountEvent case class Deposited(amount: BigDecimal) extends AccountEvent case class Withdrawn(amount: BigDecimal) extends AccountEvent case object AccountClosed extends AccountEvent val Zero = BigDecimal(0) // type alias to reduce boilerplate type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Account] // State sealed trait Account { def applyCommand(cmd: AccountCommand[_]): ReplyEffect def applyEvent(event: AccountEvent): Account } case object EmptyAccount extends Account { override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { case c: CreateAccount => Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() } override def applyEvent(event: AccountEvent): Account = event match { case AccountCreated => OpenedAccount(Zero) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]") } } case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { case c @ Deposit(amount) => Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed) case c @ Withdraw(amount) => if (canWithdraw(amount)) { Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed) } else { Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) } case c: GetBalance => Effect.reply(c)(CurrentBalance(balance)) case c: CloseAccount => if (balance == Zero) Effect.persist(AccountClosed).thenReply(c)(_ => Confirmed) else Effect.reply(c)(Rejected("Can't close account with non-zero balance")) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } override def applyEvent(event: AccountEvent): Account = event match { case Deposited(amount) => copy(balance = balance + amount) case Withdrawn(amount) => copy(balance = balance - amount) case AccountClosed => ClosedAccount case AccountCreated => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]") } def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } } case object ClosedAccount extends Account { override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { case c @ (_: Deposit | _: Withdraw) => Effect.reply(c)(Rejected("Account is closed")) case c: GetBalance => Effect.reply(c)(CurrentBalance(Zero)) case c: CloseAccount => Effect.reply(c)(Rejected("Account is already closed")) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } override def applyEvent(event: AccountEvent): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account]( PersistenceId(s"Account|$accountNumber"), EmptyAccount, (state, cmd) => state.applyCommand(cmd), (state, event) => state.applyEvent(event)) } }
- Java
-
source
public class AccountEntity extends EventSourcedBehaviorWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { // Command interface AccountCommand<Reply> extends ExpectingReply<Reply> {} public static class CreateAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CreateAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Deposit implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; this.amount = amount; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Withdraw implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.amount = amount; this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class GetBalance implements AccountCommand<CurrentBalance> { private final ActorRef<CurrentBalance> replyTo; public GetBalance(ActorRef<CurrentBalance> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<CurrentBalance> replyTo() { return replyTo; } } public static class CloseAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CloseAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } // Reply interface AccountCommandReply {} interface OperationResult extends AccountCommandReply {} enum Confirmed implements OperationResult { INSTANCE } public static class Rejected implements OperationResult { public final String reason; public Rejected(String reason) { this.reason = reason; } } public static class CurrentBalance implements AccountCommandReply { public final BigDecimal balance; public CurrentBalance(BigDecimal balance) { this.balance = balance; } } // Event interface AccountEvent {} public static class AccountCreated implements AccountEvent {} public static class Deposited implements AccountEvent { public final BigDecimal amount; Deposited(BigDecimal amount) { this.amount = amount; } } public static class Withdrawn implements AccountEvent { public final BigDecimal amount; Withdrawn(BigDecimal amount) { this.amount = amount; } } public static class AccountClosed implements AccountEvent {} // State interface Account {} public class EmptyAccount implements Account { ReplyEffect<AccountEvent, Account> createAccount(CreateAccount command) { return Effect() .persist(new AccountCreated()) .thenReply(command, account2 -> Confirmed.INSTANCE); } OpenedAccount openedAccount() { return new OpenedAccount(BigDecimal.ZERO); } } public class OpenedAccount implements Account { public final BigDecimal balance; public OpenedAccount(BigDecimal balance) { this.balance = balance; } ReplyEffect<AccountEvent, Account> deposit(Deposit command) { return Effect() .persist(new Deposited(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } ReplyEffect<AccountEvent, Account> withdraw(Withdraw command) { if (!canWithdraw(command.amount)) { return Effect() .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } } ReplyEffect<AccountEvent, Account> getBalance(GetBalance command) { return Effect().reply(command, new CurrentBalance(balance)); } ReplyEffect<AccountEvent, Account> closeAccount(CloseAccount command) { if (balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) .thenReply(command, account2 -> Confirmed.INSTANCE); } else { return Effect().reply(command, new Rejected("balance must be zero for closing account")); } } OpenedAccount makeDeposit(BigDecimal amount) { return new OpenedAccount(balance.add(amount)); } boolean canWithdraw(BigDecimal amount) { return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0); } OpenedAccount makeWithdraw(BigDecimal amount) { if (!canWithdraw(amount)) throw new IllegalStateException("Account balance can't be negative"); return new OpenedAccount(balance.subtract(amount)); } ClosedAccount closedAccount() { return new ClosedAccount(); } } public static class ClosedAccount implements Account {} public static Behavior<AccountCommand> behavior(String accountNumber) { return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); } public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) { super(new PersistenceId(accountNumber)); } @Override public Account emptyState() { return new EmptyAccount(); } @Override public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() { CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder = newCommandHandlerWithReplyBuilder(); builder .forStateType(EmptyAccount.class) .onCommand(CreateAccount.class, EmptyAccount::createAccount); builder .forStateType(OpenedAccount.class) .onCommand(Deposit.class, OpenedAccount::deposit) .onCommand(Withdraw.class, OpenedAccount::withdraw) .onCommand(GetBalance.class, OpenedAccount::getBalance) .onCommand(CloseAccount.class, OpenedAccount::closeAccount); builder .forStateType(ClosedAccount.class) .onAnyCommand(() -> Effect().unhandled().thenNoReply()); return builder.build(); } @Override public EventHandler<Account, AccountEvent> eventHandler() { EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder(); builder .forStateType(EmptyAccount.class) .onEvent(AccountCreated.class, (account, event) -> account.openedAccount()); builder .forStateType(OpenedAccount.class) .onEvent( Deposited.class, (account, deposited) -> { account.makeDeposit(deposited.amount); return account; }) .onEvent( Withdrawn.class, (account, withdrawn) -> { account.makeWithdraw(withdrawn.amount); return account; }) .onEvent(AccountClosed.class, (account, closed) -> account.closedAccount()); return builder.build(); } }
Notice how the command handler is delegating to applyCommand
in the Account
(state), which is implemented in the concrete EmptyAccount
, OpenedAccount
, and ClosedAccount
. Notice how the command handler delegates to methods in the concrete Account
(state) classes; EmptyAccount
, OpenedAccount
, and ClosedAccount
.
Optional initial state
Sometimes it’s not desirable to use a separate state class for the empty initial state, but rather treat that as there is no state yet. null
can then be used as the emptyState
, but be aware of that the state
parameter will then be null
for the first commands and events until the first event has be persisted to create the non-null state. It’s possible to use Optional
instead of null
but that results in rather much boilerplate to unwrap the Optional
state parameter and therefore null
is probably preferred. The following example illustrates using null
as the emptyState
. Option[State]
can be used as the state type and None
as the emptyState
. Pattern matching is then used in command and event handlers at the outer layer before delegating to the state or other methods.
- Scala
-
source
object AccountEntity { // Command sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply sealed trait AccountCommandReply sealed trait OperationResult extends AccountCommandReply case object Confirmed extends OperationResult final case class Rejected(reason: String) extends OperationResult final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply // Event sealed trait AccountEvent case object AccountCreated extends AccountEvent case class Deposited(amount: BigDecimal) extends AccountEvent case class Withdrawn(amount: BigDecimal) extends AccountEvent case object AccountClosed extends AccountEvent val Zero = BigDecimal(0) // type alias to reduce boilerplate type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Option[Account]] // State sealed trait Account { def applyCommand(cmd: AccountCommand[_]): ReplyEffect def applyEvent(event: AccountEvent): Account } case class OpenedAccount(balance: BigDecimal) extends Account { require(balance >= Zero, "Account balance can't be negative") override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { case c @ Deposit(amount) => Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed) case c @ Withdraw(amount) => if (canWithdraw(amount)) { Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed) } else { Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) } case c: GetBalance => Effect.reply(c)(CurrentBalance(balance)) case c: CloseAccount => if (balance == Zero) Effect.persist(AccountClosed).thenReply(c)(_ => Confirmed) else Effect.reply(c)(Rejected("Can't close account with non-zero balance")) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } override def applyEvent(event: AccountEvent): Account = event match { case Deposited(amount) => copy(balance = balance + amount) case Withdrawn(amount) => copy(balance = balance - amount) case AccountClosed => ClosedAccount case AccountCreated => throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]") } def canWithdraw(amount: BigDecimal): Boolean = { balance - amount >= Zero } } case object ClosedAccount extends Account { override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { case c @ (_: Deposit | _: Withdraw) => Effect.reply(c)(Rejected("Account is closed")) case c: GetBalance => Effect.reply(c)(CurrentBalance(Zero)) case c: CloseAccount => Effect.reply(c)(Rejected("Account is already closed")) case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) } override def applyEvent(event: AccountEvent): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]]( PersistenceId(s"Account|$accountNumber"), None, (state, cmd) => state match { case None => onFirstCommand(cmd) case Some(account) => account.applyCommand(cmd) }, (state, event) => state match { case None => Some(onFirstEvent(event)) case Some(account) => Some(account.applyEvent(event)) }) } def onFirstCommand(cmd: AccountCommand[_]): ReplyEffect = { cmd match { case c: CreateAccount => Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() } } def onFirstEvent(event: AccountEvent): Account = { event match { case AccountCreated => OpenedAccount(Zero) case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]") } } }
- Java
-
source
public class AccountEntity extends EventSourcedBehaviorWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { // Command interface AccountCommand<Reply> extends ExpectingReply<Reply> {} public static class CreateAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CreateAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Deposit implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; this.amount = amount; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Withdraw implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.amount = amount; this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class GetBalance implements AccountCommand<CurrentBalance> { private final ActorRef<CurrentBalance> replyTo; public GetBalance(ActorRef<CurrentBalance> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<CurrentBalance> replyTo() { return replyTo; } } public static class CloseAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CloseAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } // Reply interface AccountCommandReply {} interface OperationResult extends AccountCommandReply {} enum Confirmed implements OperationResult { INSTANCE } public static class Rejected implements OperationResult { public final String reason; public Rejected(String reason) { this.reason = reason; } } public static class CurrentBalance implements AccountCommandReply { public final BigDecimal balance; public CurrentBalance(BigDecimal balance) { this.balance = balance; } } // Event interface AccountEvent {} public static class AccountCreated implements AccountEvent {} public static class Deposited implements AccountEvent { public final BigDecimal amount; Deposited(BigDecimal amount) { this.amount = amount; } } public static class Withdrawn implements AccountEvent { public final BigDecimal amount; Withdrawn(BigDecimal amount) { this.amount = amount; } } public static class AccountClosed implements AccountEvent {} // State interface Account {} public static class OpenedAccount implements Account { public final BigDecimal balance; public OpenedAccount() { this.balance = BigDecimal.ZERO; } public OpenedAccount(BigDecimal balance) { this.balance = balance; } OpenedAccount makeDeposit(BigDecimal amount) { return new OpenedAccount(balance.add(amount)); } boolean canWithdraw(BigDecimal amount) { return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0); } OpenedAccount makeWithdraw(BigDecimal amount) { if (!canWithdraw(amount)) throw new IllegalStateException("Account balance can't be negative"); return new OpenedAccount(balance.subtract(amount)); } ClosedAccount closedAccount() { return new ClosedAccount(); } } public static class ClosedAccount implements Account {} public static Behavior<AccountCommand> behavior(String accountNumber) { return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); } public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) { super(new PersistenceId(accountNumber)); } @Override public Account emptyState() { return null; } @Override public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() { CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder = newCommandHandlerWithReplyBuilder(); builder.forNullState().onCommand(CreateAccount.class, this::createAccount); builder .forStateType(OpenedAccount.class) .onCommand(Deposit.class, this::deposit) .onCommand(Withdraw.class, this::withdraw) .onCommand(GetBalance.class, this::getBalance) .onCommand(CloseAccount.class, this::closeAccount); builder .forStateType(ClosedAccount.class) .onAnyCommand(() -> Effect().unhandled().thenNoReply()); return builder.build(); } private ReplyEffect<AccountEvent, Account> createAccount(CreateAccount command) { return Effect() .persist(new AccountCreated()) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } } private ReplyEffect<AccountEvent, Account> getBalance( OpenedAccount account, GetBalance command) { return Effect().reply(command, new CurrentBalance(account.balance)); } private ReplyEffect<AccountEvent, Account> closeAccount( OpenedAccount account, CloseAccount command) { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) .thenReply(command, account2 -> Confirmed.INSTANCE); } else { return Effect().reply(command, new Rejected("balance must be zero for closing account")); } } @Override public EventHandler<Account, AccountEvent> eventHandler() { EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder(); builder.forNullState().onEvent(AccountCreated.class, () -> new OpenedAccount()); builder .forStateType(OpenedAccount.class) .onEvent( Deposited.class, (account, deposited) -> { account.makeDeposit(deposited.amount); return account; }) .onEvent( Withdrawn.class, (account, withdrawn) -> { account.makeWithdraw(withdrawn.amount); return account; }) .onEvent(AccountClosed.class, (account, closed) -> account.closedAccount()); return builder.build(); } }
Mutable state
The state can be mutable or immutable. When it is immutable the event handler returns a new instance of the state for each change.
When using mutable state it’s important to not send the full state instance as a message to another actor, e.g. as a reply to a command. Messages must be immutable to avoid concurrency problems.
The above examples are using immutable state classes and below is corresponding example with mutable state.
- Java
-
source
public class AccountEntity extends EventSourcedBehaviorWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { // Command interface AccountCommand<Reply> extends ExpectingReply<Reply> {} public static class CreateAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CreateAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Deposit implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; this.amount = amount; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class Withdraw implements AccountCommand<OperationResult> { public final BigDecimal amount; private final ActorRef<OperationResult> replyTo; public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) { this.amount = amount; this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } public static class GetBalance implements AccountCommand<CurrentBalance> { private final ActorRef<CurrentBalance> replyTo; public GetBalance(ActorRef<CurrentBalance> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<CurrentBalance> replyTo() { return replyTo; } } public static class CloseAccount implements AccountCommand<OperationResult> { private final ActorRef<OperationResult> replyTo; public CloseAccount(ActorRef<OperationResult> replyTo) { this.replyTo = replyTo; } @Override public ActorRef<OperationResult> replyTo() { return replyTo; } } // Reply interface AccountCommandReply {} interface OperationResult extends AccountCommandReply {} enum Confirmed implements OperationResult { INSTANCE } public static class Rejected implements OperationResult { public final String reason; public Rejected(String reason) { this.reason = reason; } } public static class CurrentBalance implements AccountCommandReply { public final BigDecimal balance; public CurrentBalance(BigDecimal balance) { this.balance = balance; } } // Event interface AccountEvent {} public static class AccountCreated implements AccountEvent {} public static class Deposited implements AccountEvent { public final BigDecimal amount; Deposited(BigDecimal amount) { this.amount = amount; } } public static class Withdrawn implements AccountEvent { public final BigDecimal amount; Withdrawn(BigDecimal amount) { this.amount = amount; } } public static class AccountClosed implements AccountEvent {} // State interface Account {} public static class EmptyAccount implements Account { OpenedAccount openedAccount() { return new OpenedAccount(); } } public static class OpenedAccount implements Account { private BigDecimal balance = BigDecimal.ZERO; public BigDecimal getBalance() { return balance; } void makeDeposit(BigDecimal amount) { balance = balance.add(amount); } boolean canWithdraw(BigDecimal amount) { return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0); } void makeWithdraw(BigDecimal amount) { if (!canWithdraw(amount)) throw new IllegalStateException("Account balance can't be negative"); balance = balance.subtract(amount); } ClosedAccount closedAccount() { return new ClosedAccount(); } } public static class ClosedAccount implements Account {} public static Behavior<AccountCommand> behavior(String accountNumber) { return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); } public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) { super(new PersistenceId(accountNumber)); } @Override public Account emptyState() { return new EmptyAccount(); } @Override public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() { CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder = newCommandHandlerWithReplyBuilder(); builder.forStateType(EmptyAccount.class).onCommand(CreateAccount.class, this::createAccount); builder .forStateType(OpenedAccount.class) .onCommand(Deposit.class, this::deposit) .onCommand(Withdraw.class, this::withdraw) .onCommand(GetBalance.class, this::getBalance) .onCommand(CloseAccount.class, this::closeAccount); builder .forStateType(ClosedAccount.class) .onAnyCommand(() -> Effect().unhandled().thenNoReply()); return builder.build(); } private ReplyEffect<AccountEvent, Account> createAccount( EmptyAccount account, CreateAccount command) { return Effect() .persist(new AccountCreated()) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) .thenReply(command, account2 -> Confirmed.INSTANCE); } } private ReplyEffect<AccountEvent, Account> getBalance( OpenedAccount account, GetBalance command) { return Effect().reply(command, new CurrentBalance(account.balance)); } private ReplyEffect<AccountEvent, Account> closeAccount( OpenedAccount account, CloseAccount command) { if (account.getBalance().equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) .thenReply(command, account2 -> Confirmed.INSTANCE); } else { return Effect().reply(command, new Rejected("balance must be zero for closing account")); } } @Override public EventHandler<Account, AccountEvent> eventHandler() { EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder(); builder .forStateType(EmptyAccount.class) .onEvent(AccountCreated.class, (account, event) -> account.openedAccount()); builder .forStateType(OpenedAccount.class) .onEvent( Deposited.class, (account, deposited) -> { account.makeDeposit(deposited.amount); return account; }) .onEvent( Withdrawn.class, (account, withdrawn) -> { account.makeWithdraw(withdrawn.amount); return account; }) .onEvent(AccountClosed.class, (account, closed) -> account.closedAccount()); return builder.build(); } }