The section about Changing Behavior described how commands and events can be handled differently depending on the state. One can take that one step further and define the event handler inside the state classes. In next section the command handlers are also defined in the state.
The state can be seen as your domain object and it should contain the core business logic. Then it’s a matter of taste if event handlers and command handlers should be defined in the state or be kept outside it.
Here we are using a bank account as the example domain. It has 3 state classes that are representing the lifecycle of the account; EmptyAccount, OpenedAccount, and ClosedAccount.
sourceobject AccountEntity{// Commandsealed trait AccountCommand[Reply]extendsExpectingReply[Reply]finalcaseclassCreateAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassDeposit(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassWithdraw(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassGetBalance()(override val replyTo:ActorRef[CurrentBalance])extendsAccountCommand[CurrentBalance]finalcaseclassCloseAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]// Replysealed trait AccountCommandReplysealed trait OperationResultextendsAccountCommandReplycaseobjectConfirmedextendsOperationResultfinalcaseclassRejected(reason:String)extendsOperationResultfinalcaseclassCurrentBalance(balance:BigDecimal)extendsAccountCommandReply// Eventsealed trait AccountEventcaseobjectAccountCreatedextendsAccountEventcaseclassDeposited(amount:BigDecimal)extendsAccountEventcaseclassWithdrawn(amount:BigDecimal)extendsAccountEventcaseobjectAccountClosedextendsAccountEvent
val Zero=BigDecimal(0)// Statesealed trait Account{def applyEvent(event:AccountEvent):Account}caseobjectEmptyAccountextendsAccount{overridedef applyEvent(event:AccountEvent):Account=event match {caseAccountCreated=>OpenedAccount(Zero)case _ =>thrownewIllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")}}caseclassOpenedAccount(balance:BigDecimal)extendsAccount{require(balance >=Zero,"Account balance can't be negative")overridedef applyEvent(event:AccountEvent):Account=event match {caseDeposited(amount)=> copy(balance = balance + amount)caseWithdrawn(amount)=> copy(balance = balance - amount)caseAccountClosed=>ClosedAccountcaseAccountCreated=>thrownewIllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")}def canWithdraw(amount:BigDecimal):Boolean={
balance - amount >=Zero}}caseobjectClosedAccountextendsAccount{overridedef applyEvent(event:AccountEvent):Account=thrownewIllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")}// Note that after defining command, event and state classes you would probably start here when writing this.// When filling in the parameters of EventSourcedBehavior.apply you can use IntelliJ alt+Enter > createValue// to generate the stub with types for the command and event handlers.def behavior(accountNumber:String):Behavior[AccountCommand[AccountCommandReply]]={EventSourcedBehavior.withEnforcedReplies(PersistenceId(s"Account|$accountNumber"),EmptyAccount,
commandHandler,
eventHandler)}private val commandHandler:(Account,AccountCommand[_])=>ReplyEffect[AccountEvent,Account]={(state, cmd)=>
state match {caseEmptyAccount=>
cmd match {case c:CreateAccount=> createAccount(c)case _ =>Effect.unhandled.thenNoReply()// CreateAccount before handling any other commands}case acc @OpenedAccount(_)=>
cmd match {case c:Deposit=> deposit(c)case c:Withdraw=> withdraw(acc, c)case c:GetBalance=> getBalance(acc, c)case c:CloseAccount=> closeAccount(acc, c)case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}caseClosedAccount=>
cmd match {case c @(_:Deposit| _:Withdraw)=>Effect.reply(c)(Rejected("Account is closed"))case c:GetBalance=>Effect.reply(c)(CurrentBalance(Zero))case c:CloseAccount=>Effect.reply(c)(Rejected("Account is already closed"))case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}}}private val eventHandler:(Account,AccountEvent)=>Account={(state,event)=>
state.applyEvent(event)}privatedef createAccount(cmd:CreateAccount):ReplyEffect[AccountEvent,Account]={Effect.persist(AccountCreated).thenReply(cmd)(_ =>Confirmed)}privatedef deposit(cmd:Deposit):ReplyEffect[AccountEvent,Account]={Effect.persist(Deposited(cmd.amount)).thenReply(cmd)(_ =>Confirmed)}privatedef withdraw(acc:OpenedAccount, cmd:Withdraw):ReplyEffect[AccountEvent,Account]={if(acc.canWithdraw(cmd.amount)){Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ =>Confirmed)}else{Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))}}privatedef getBalance(acc:OpenedAccount, cmd:GetBalance):ReplyEffect[AccountEvent,Account]={Effect.reply(cmd)(CurrentBalance(acc.balance))}privatedef closeAccount(acc:OpenedAccount, cmd:CloseAccount):ReplyEffect[AccountEvent,Account]={if(acc.balance ==Zero)Effect.persist(AccountClosed).thenReply(cmd)(_ =>Confirmed)elseEffect.reply(cmd)(Rejected("Can't close account with non-zero balance"))}}
Notice how the eventHandler delegates to the applyEvent in the Account (state), which is implemented in the concrete EmptyAccount, OpenedAccount, and ClosedAccount.Notice how the eventHandler delegates to methods in the concrete Account (state) classes; EmptyAccount, OpenedAccount, and ClosedAccount.
Command handlers in the state
We can take the previous bank account example one step further by handling the commands in the state too.
sourceobject AccountEntity{// Commandsealed trait AccountCommand[Reply]extendsExpectingReply[Reply]finalcaseclassCreateAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassDeposit(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassWithdraw(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassGetBalance()(override val replyTo:ActorRef[CurrentBalance])extendsAccountCommand[CurrentBalance]finalcaseclassCloseAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]// Replysealed trait AccountCommandReplysealed trait OperationResultextendsAccountCommandReplycaseobjectConfirmedextendsOperationResultfinalcaseclassRejected(reason:String)extendsOperationResultfinalcaseclassCurrentBalance(balance:BigDecimal)extendsAccountCommandReply// Eventsealed trait AccountEventcaseobjectAccountCreatedextendsAccountEventcaseclassDeposited(amount:BigDecimal)extendsAccountEventcaseclassWithdrawn(amount:BigDecimal)extendsAccountEventcaseobjectAccountClosedextendsAccountEvent
val Zero=BigDecimal(0)// type alias to reduce boilerplate
type ReplyEffect= akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent,Account]// Statesealed trait Account{def applyCommand(cmd:AccountCommand[_]):ReplyEffectdef applyEvent(event:AccountEvent):Account}caseobjectEmptyAccountextendsAccount{overridedef applyCommand(cmd:AccountCommand[_]):ReplyEffect=
cmd match {case c:CreateAccount=>Effect.persist(AccountCreated).thenReply(c)(_ =>Confirmed)case _ =>// CreateAccount before handling any other commandsEffect.unhandled.thenNoReply()}overridedef applyEvent(event:AccountEvent):Account=event match {caseAccountCreated=>OpenedAccount(Zero)case _ =>thrownewIllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")}}caseclassOpenedAccount(balance:BigDecimal)extendsAccount{require(balance >=Zero,"Account balance can't be negative")overridedef applyCommand(cmd:AccountCommand[_]):ReplyEffect=
cmd match {case c @Deposit(amount)=>Effect.persist(Deposited(amount)).thenReply(c)(_ =>Confirmed)case c @Withdraw(amount)=>if(canWithdraw(amount)){Effect.persist(Withdrawn(amount)).thenReply(c)(_ =>Confirmed)}else{Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))}case c:GetBalance=>Effect.reply(c)(CurrentBalance(balance))case c:CloseAccount=>if(balance ==Zero)Effect.persist(AccountClosed).thenReply(c)(_ =>Confirmed)elseEffect.reply(c)(Rejected("Can't close account with non-zero balance"))case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}overridedef applyEvent(event:AccountEvent):Account=event match {caseDeposited(amount)=> copy(balance = balance + amount)caseWithdrawn(amount)=> copy(balance = balance - amount)caseAccountClosed=>ClosedAccountcaseAccountCreated=>thrownewIllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")}def canWithdraw(amount:BigDecimal):Boolean={
balance - amount >=Zero}}caseobjectClosedAccountextendsAccount{overridedef applyCommand(cmd:AccountCommand[_]):ReplyEffect=
cmd match {case c @(_:Deposit| _:Withdraw)=>Effect.reply(c)(Rejected("Account is closed"))case c:GetBalance=>Effect.reply(c)(CurrentBalance(Zero))case c:CloseAccount=>Effect.reply(c)(Rejected("Account is already closed"))case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}overridedef applyEvent(event:AccountEvent):Account=thrownewIllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")}def behavior(accountNumber:String):Behavior[AccountCommand[AccountCommandReply]]={EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply],AccountEvent,Account](PersistenceId(s"Account|$accountNumber"),EmptyAccount,(state, cmd)=> state.applyCommand(cmd),(state,event)=> state.applyEvent(event))}}
Notice how the command handler is delegating to applyCommand in the Account (state), which is implemented in the concrete EmptyAccount, OpenedAccount, and ClosedAccount.Notice how the command handler delegates to methods in the concrete Account (state) classes; EmptyAccount, OpenedAccount, and ClosedAccount.
Optional initial state
Sometimes it’s not desirable to use a separate state class for the empty initial state, but rather treat that as there is no state yet. null can then be used as the emptyState, but be aware of that the state parameter will then be null for the first commands and events until the first event has be persisted to create the non-null state. It’s possible to use Optional instead of null but that results in rather much boilerplate to unwrap the Optional state parameter and therefore null is probably preferred. The following example illustrates using null as the emptyState.Option[State] can be used as the state type and None as the emptyState. Pattern matching is then used in command and event handlers at the outer layer before delegating to the state or other methods.
sourceobject AccountEntity{// Commandsealed trait AccountCommand[Reply]extendsExpectingReply[Reply]finalcaseclassCreateAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassDeposit(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassWithdraw(amount:BigDecimal)(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]finalcaseclassGetBalance()(override val replyTo:ActorRef[CurrentBalance])extendsAccountCommand[CurrentBalance]finalcaseclassCloseAccount()(override val replyTo:ActorRef[OperationResult])extendsAccountCommand[OperationResult]// Replysealed trait AccountCommandReplysealed trait OperationResultextendsAccountCommandReplycaseobjectConfirmedextendsOperationResultfinalcaseclassRejected(reason:String)extendsOperationResultfinalcaseclassCurrentBalance(balance:BigDecimal)extendsAccountCommandReply// Eventsealed trait AccountEventcaseobjectAccountCreatedextendsAccountEventcaseclassDeposited(amount:BigDecimal)extendsAccountEventcaseclassWithdrawn(amount:BigDecimal)extendsAccountEventcaseobjectAccountClosedextendsAccountEvent
val Zero=BigDecimal(0)// type alias to reduce boilerplate
type ReplyEffect= akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent,Option[Account]]// Statesealed trait Account{def applyCommand(cmd:AccountCommand[_]):ReplyEffectdef applyEvent(event:AccountEvent):Account}caseclassOpenedAccount(balance:BigDecimal)extendsAccount{require(balance >=Zero,"Account balance can't be negative")overridedef applyCommand(cmd:AccountCommand[_]):ReplyEffect=
cmd match {case c @Deposit(amount)=>Effect.persist(Deposited(amount)).thenReply(c)(_ =>Confirmed)case c @Withdraw(amount)=>if(canWithdraw(amount)){Effect.persist(Withdrawn(amount)).thenReply(c)(_ =>Confirmed)}else{Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))}case c:GetBalance=>Effect.reply(c)(CurrentBalance(balance))case c:CloseAccount=>if(balance ==Zero)Effect.persist(AccountClosed).thenReply(c)(_ =>Confirmed)elseEffect.reply(c)(Rejected("Can't close account with non-zero balance"))case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}overridedef applyEvent(event:AccountEvent):Account=event match {caseDeposited(amount)=> copy(balance = balance + amount)caseWithdrawn(amount)=> copy(balance = balance - amount)caseAccountClosed=>ClosedAccountcaseAccountCreated=>thrownewIllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")}def canWithdraw(amount:BigDecimal):Boolean={
balance - amount >=Zero}}caseobjectClosedAccountextendsAccount{overridedef applyCommand(cmd:AccountCommand[_]):ReplyEffect=
cmd match {case c @(_:Deposit| _:Withdraw)=>Effect.reply(c)(Rejected("Account is closed"))case c:GetBalance=>Effect.reply(c)(CurrentBalance(Zero))case c:CloseAccount=>Effect.reply(c)(Rejected("Account is already closed"))case c:CreateAccount=>Effect.reply(c)(Rejected("Account is already created"))}overridedef applyEvent(event:AccountEvent):Account=thrownewIllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")}def behavior(accountNumber:String):Behavior[AccountCommand[AccountCommandReply]]={EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply],AccountEvent,Option[Account]](PersistenceId(s"Account|$accountNumber"),None,(state, cmd)=>
state match {caseNone=> onFirstCommand(cmd)caseSome(account)=> account.applyCommand(cmd)},(state,event)=>
state match {caseNone=>Some(onFirstEvent(event))caseSome(account)=>Some(account.applyEvent(event))})}def onFirstCommand(cmd:AccountCommand[_]):ReplyEffect={
cmd match {case c:CreateAccount=>Effect.persist(AccountCreated).thenReply(c)(_ =>Confirmed)case _ =>// CreateAccount before handling any other commandsEffect.unhandled.thenNoReply()}}def onFirstEvent(event:AccountEvent):Account={event match {caseAccountCreated=>OpenedAccount(Zero)case _ =>thrownewIllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")}}}
The state can be mutable or immutable. When it is immutable the event handler returns a new instance of the state for each change.
When using mutable state it’s important to not send the full state instance as a message to another actor, e.g. as a reply to a command. Messages must be immutable to avoid concurrency problems.
The above examples are using immutable state classes and below is corresponding example with mutable state.