Persistence - coding style

Event handlers in the state

The section about Changing Behavior described how commands and events can be handled differently depending on the state. One can take that one step further and define the event handler inside the state classes. In next section the command handlers are also defined in the state.

The state can be seen as your domain object and it should contain the core business logic. Then it’s a matter of taste if event handlers and command handlers should be defined in the state or be kept outside it.

Here we are using a bank account as the example domain. It has 3 state classes that are representing the lifecycle of the account; EmptyAccount, OpenedAccount, and ClosedAccount.

Scala
object AccountEntity {
  // Command
  sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
  final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
    extends AccountCommand[CurrentBalance]
  final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]

  // Reply
  sealed trait AccountCommandReply
  sealed trait OperationResult extends AccountCommandReply
  case object Confirmed extends OperationResult
  final case class Rejected(reason: String) extends OperationResult
  final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply

  // Event
  sealed trait AccountEvent
  case object AccountCreated extends AccountEvent
  case class Deposited(amount: BigDecimal) extends AccountEvent
  case class Withdrawn(amount: BigDecimal) extends AccountEvent
  case object AccountClosed extends AccountEvent

  val Zero = BigDecimal(0)

  // State
  sealed trait Account {
    def applyEvent(event: AccountEvent): Account
  }
  case object EmptyAccount extends Account {
    override def applyEvent(event: AccountEvent): Account = event match {
      case AccountCreated ⇒ OpenedAccount(Zero)
      case _              ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
    }
  }
  case class OpenedAccount(balance: BigDecimal) extends Account {
    require(balance >= Zero, "Account balance can't be negative")

    override def applyEvent(event: AccountEvent): Account =
      event match {
        case Deposited(amount) ⇒ copy(balance = balance + amount)
        case Withdrawn(amount) ⇒ copy(balance = balance - amount)
        case AccountClosed     ⇒ ClosedAccount
        case _                 ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
      }

    def canWithdraw(amount: BigDecimal): Boolean = {
      balance - amount >= Zero
    }

  }
  case object ClosedAccount extends Account {
    override def applyEvent(event: AccountEvent): Account =
      throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
  }

  // Note that after defining command, event and state classes you would probably start here when writing this.
  // When filling in the parameters of PersistentBehaviors.apply you can use IntelliJ alt+Enter > createValue
  // to generate the stub with types for the command and event handlers.

  def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
    PersistentBehavior.withEnforcedReplies(
      PersistenceId(s"Account|$accountNumber"),
      EmptyAccount,
      commandHandler,
      eventHandler
    )
  }

  private val commandHandler: (Account, AccountCommand[_]) ⇒ ReplyEffect[AccountEvent, Account] = {
    (state, cmd) ⇒
      state match {
        case EmptyAccount ⇒ cmd match {
          case c: CreateAccount ⇒ createAccount(c)
          case _                ⇒ Effect.unhandled.thenNoReply() // CreateAccount before handling any other commands
        }

        case acc @ OpenedAccount(_) ⇒ cmd match {
          case c: Deposit       ⇒ deposit(c)
          case c: Withdraw      ⇒ withdraw(acc, c)
          case c: GetBalance    ⇒ getBalance(acc, c)
          case c: CloseAccount  ⇒ closeAccount(acc, c)
          case c: CreateAccount ⇒ Effect.reply(c)(Rejected("Account is already created"))
        }

        case ClosedAccount ⇒
          cmd match {
            case c @ (_: Deposit | _: Withdraw) ⇒
              Effect.reply(c)(Rejected("Account is closed"))
            case c: GetBalance ⇒
              Effect.reply(c)(CurrentBalance(Zero))
            case c: CloseAccount ⇒
              Effect.reply(c)(Rejected("Account is already closed"))
            case c: CreateAccount ⇒
              Effect.reply(c)(Rejected("Account is already created"))
          }
      }
  }

  private val eventHandler: (Account, AccountEvent) ⇒ Account = {
    (state, event) ⇒ state.applyEvent(event)
  }

  private def createAccount(cmd: CreateAccount): ReplyEffect[AccountEvent, Account] = {
    Effect.persist(AccountCreated)
      .thenReply(cmd)(_ ⇒ Confirmed)
  }

  private def deposit(cmd: Deposit): ReplyEffect[AccountEvent, Account] = {
    Effect.persist(Deposited(cmd.amount))
      .thenReply(cmd)(_ ⇒ Confirmed)
  }

  private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = {
    if (acc.canWithdraw(cmd.amount)) {
      Effect.persist(Withdrawn(cmd.amount))
        .thenReply(cmd)(_ ⇒ Confirmed)

    } else {
      Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
    }
  }

  private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[AccountEvent, Account] = {
    Effect.reply(cmd)(CurrentBalance(acc.balance))
  }

  private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[AccountEvent, Account] = {
    if (acc.balance == Zero)
      Effect.persist(AccountClosed)
        .thenReply(cmd)(_ ⇒ Confirmed)
    else
      Effect.reply(cmd)(Rejected("Can't close account with non-zero balance"))
  }

}

TODO include corresponding example in Java

Notice how the eventHandler delegates to the applyEvent in the Account (state), which is implemented in the concrete EmptyAccount, OpenedAccount, and ClosedAccount.

Command handlers in the state

We can take the previous bank account example one step further by handling the commands in the state too.

Scala
object AccountEntity {
  // Command
  sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
  final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
    extends AccountCommand[CurrentBalance]
  final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]

  // Reply
  sealed trait AccountCommandReply
  sealed trait OperationResult extends AccountCommandReply
  case object Confirmed extends OperationResult
  final case class Rejected(reason: String) extends OperationResult
  final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply

  // Event
  sealed trait AccountEvent
  case object AccountCreated extends AccountEvent
  case class Deposited(amount: BigDecimal) extends AccountEvent
  case class Withdrawn(amount: BigDecimal) extends AccountEvent
  case object AccountClosed extends AccountEvent

  val Zero = BigDecimal(0)

  // type alias to reduce boilerplate
  type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Account]

  // State
  sealed trait Account {
    def applyCommand(cmd: AccountCommand[_]): ReplyEffect
    def applyEvent(event: AccountEvent): Account
  }
  case object EmptyAccount extends Account {
    override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
      cmd match {
        case c: CreateAccount ⇒
          Effect.persist(AccountCreated)
            .thenReply(c)(_ ⇒ Confirmed)
        case _ ⇒
          // CreateAccount before handling any other commands
          Effect.unhandled.thenNoReply()
      }

    override def applyEvent(event: AccountEvent): Account =
      event match {
        case AccountCreated ⇒ OpenedAccount(Zero)
        case _              ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
      }
  }
  case class OpenedAccount(balance: BigDecimal) extends Account {
    require(balance >= Zero, "Account balance can't be negative")

    override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
      cmd match {
        case c @ Deposit(amount) ⇒
          Effect.persist(Deposited(amount))
            .thenReply(c)(_ ⇒ Confirmed)

        case c @ Withdraw(amount) ⇒
          if (canWithdraw(amount)) {
            Effect.persist(Withdrawn(amount))
              .thenReply(c)(_ ⇒ Confirmed)

          } else {
            Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
          }

        case c: GetBalance ⇒
          Effect.reply(c)(CurrentBalance(balance))

        case c: CloseAccount ⇒
          if (balance == Zero)
            Effect.persist(AccountClosed)
              .thenReply(c)(_ ⇒ Confirmed)
          else
            Effect.reply(c)(Rejected("Can't close account with non-zero balance"))

        case c: CreateAccount ⇒
          Effect.reply(c)(Rejected("Account is already created"))

      }

    override def applyEvent(event: AccountEvent): Account =
      event match {
        case Deposited(amount) ⇒ copy(balance = balance + amount)
        case Withdrawn(amount) ⇒ copy(balance = balance - amount)
        case AccountClosed     ⇒ ClosedAccount
        case _                 ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
      }

    def canWithdraw(amount: BigDecimal): Boolean = {
      balance - amount >= Zero
    }

  }
  case object ClosedAccount extends Account {
    override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
      cmd match {
        case c @ (_: Deposit | _: Withdraw) ⇒
          Effect.reply(c)(Rejected("Account is closed"))
        case c: GetBalance ⇒
          Effect.reply(c)(CurrentBalance(Zero))
        case c: CloseAccount ⇒
          Effect.reply(c)(Rejected("Account is already closed"))
        case c: CreateAccount ⇒
          Effect.reply(c)(Rejected("Account is already created"))
      }

    override def applyEvent(event: AccountEvent): Account =
      throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
  }

  def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
    PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account](
      PersistenceId(s"Account|$accountNumber"),
      EmptyAccount,
      (state, cmd) ⇒ state.applyCommand(cmd),
      (state, event) ⇒ state.applyEvent(event)
    )
  }

}

TODO include corresponding example in Java

Notice how the command handler is delegating to applyCommand in the Account (state), which is implemented in the concrete EmptyAccount, OpenedAccount, and ClosedAccount.

Optional initial state

Sometimes it’s not desirable to use a separate state class for the empty initial state, but rather treat that as there is no state yet. null can then be used as the emptyState, but be aware of that the state parameter will then be null for the first commands and events until the first event has be persisted to create the non-null state. It’s possible to use Optional instead of null but that results in rather much boilerplate to unwrap the Optional state parameter and therefore null is probably preferred. The following example illustrates using null as the emptyState. Option[State] can be used as the state type and None as the emptyState. Pattern matching is then used in command and event handlers at the outer layer before delegating to the state or other methods.

Scala
object AccountEntity {
  // Command
  sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
  final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]
  final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
    extends AccountCommand[CurrentBalance]
  final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
    extends AccountCommand[OperationResult]

  // Reply
  sealed trait AccountCommandReply
  sealed trait OperationResult extends AccountCommandReply
  case object Confirmed extends OperationResult
  final case class Rejected(reason: String) extends OperationResult
  final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply

  // Event
  sealed trait AccountEvent
  case object AccountCreated extends AccountEvent
  case class Deposited(amount: BigDecimal) extends AccountEvent
  case class Withdrawn(amount: BigDecimal) extends AccountEvent
  case object AccountClosed extends AccountEvent

  val Zero = BigDecimal(0)

  // type alias to reduce boilerplate
  type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Option[Account]]

  // State
  sealed trait Account {
    def applyCommand(cmd: AccountCommand[_]): ReplyEffect
    def applyEvent(event: AccountEvent): Account
  }
  case class OpenedAccount(balance: BigDecimal) extends Account {
    require(balance >= Zero, "Account balance can't be negative")

    override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
      cmd match {
        case c @ Deposit(amount) ⇒
          Effect.persist(Deposited(amount))
            .thenReply(c)(_ ⇒ Confirmed)

        case c @ Withdraw(amount) ⇒
          if (canWithdraw(amount)) {
            Effect.persist(Withdrawn(amount))
              .thenReply(c)(_ ⇒ Confirmed)

          } else {
            Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
          }

        case c: GetBalance ⇒
          Effect.reply(c)(CurrentBalance(balance))

        case c: CloseAccount ⇒
          if (balance == Zero)
            Effect.persist(AccountClosed)
              .thenReply(c)(_ ⇒ Confirmed)
          else
            Effect.reply(c)(Rejected("Can't close account with non-zero balance"))

        case c: CreateAccount ⇒
          Effect.reply(c)(Rejected("Account is already created"))

      }

    override def applyEvent(event: AccountEvent): Account =
      event match {
        case Deposited(amount) ⇒ copy(balance = balance + amount)
        case Withdrawn(amount) ⇒ copy(balance = balance - amount)
        case AccountClosed     ⇒ ClosedAccount
        case _                 ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
      }

    def canWithdraw(amount: BigDecimal): Boolean = {
      balance - amount >= Zero
    }

  }
  case object ClosedAccount extends Account {
    override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
      cmd match {
        case c @ (_: Deposit | _: Withdraw) ⇒
          Effect.reply(c)(Rejected("Account is closed"))
        case c: GetBalance ⇒
          Effect.reply(c)(CurrentBalance(Zero))
        case c: CloseAccount ⇒
          Effect.reply(c)(Rejected("Account is already closed"))
        case c: CreateAccount ⇒
          Effect.reply(c)(Rejected("Account is already created"))
      }

    override def applyEvent(event: AccountEvent): Account =
      throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
  }

  def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
    PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]](
      PersistenceId(s"Account|$accountNumber"),
      None,
      (state, cmd) ⇒ state match {
        case None          ⇒ onFirstCommand(cmd)
        case Some(account) ⇒ account.applyCommand(cmd)
      },
      (state, event) ⇒ state match {
        case None          ⇒ Some(onFirstEvent(event))
        case Some(account) ⇒ Some(account.applyEvent(event))
      }
    )
  }

  def onFirstCommand(cmd: AccountCommand[_]): ReplyEffect = {
    cmd match {
      case c: CreateAccount ⇒
        Effect.persist(AccountCreated)
          .thenReply(c)(_ ⇒ Confirmed)
      case _ ⇒
        // CreateAccount before handling any other commands
        Effect.unhandled.thenNoReply()
    }
  }

  def onFirstEvent(event: AccountEvent): Account = {
    event match {
      case AccountCreated ⇒ OpenedAccount(Zero)
      case _              ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
    }
  }

}

TODO include corresponding example in Java

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.