Classic Persistent FSM
Akka Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Akka Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
Persistent FSMs are part of Akka persistence, you must add the following dependency in your project:
- sbt
val AkkaVersion = "2.8.7" libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % AkkaVersion
- Maven
- Gradle
Persistent FSM is no longer actively developed and will be replaced by Akka Persistence Typed. It is not advised to build new applications with Persistent FSM. Existing users of Persistent FSM should migrate.
PersistentFSM
handles the incoming messages in an FSM like fashion. Its internal state is persisted as a sequence of changes, later referred to as domain events. Relationship between incoming messages, FSM’s states and transitions, persistence of domain events is defined by a DSL.
A Simple Example
To demonstrate the features of the PersistentFSM
trait , consider an actor which represents a Web store customer. The contract of our “WebStoreCustomerFSMActor” is that it accepts the following commands:
- Scala
-
source
sealed trait Command case class AddItem(item: Item) extends Command case object Buy extends Command case object Leave extends Command case object GetCurrentCart extends Command
- Java
AddItem
sent when the customer adds an item to a shopping cart Buy
- when the customer finishes the purchase Leave
- when the customer leaves the store without purchasing anything GetCurrentCart
allows to query the current state of customer’s shopping cart
The customer can be in one of the following states:
- Scala
-
source
sealed trait UserState extends FSMState case object LookingAround extends UserState { override def identifier: String = "Looking Around" } case object Shopping extends UserState { override def identifier: String = "Shopping" } case object Inactive extends UserState { override def identifier: String = "Inactive" } case object Paid extends UserState { override def identifier: String = "Paid" }
- Java
LookingAround
customer is browsing the site, but hasn’t added anything to the shopping cart Shopping
customer has recently added items to the shopping cart Inactive
customer has items in the shopping cart, but hasn’t added anything recently Paid
customer has purchased the items
PersistentFSM
states must inherit from trait PersistentFSM.FSMState
and implement the def identifier: String
method. This is required in order to simplify the serialization of FSM states. String identifiers should be unique!
Customer’s actions are “recorded” as a sequence of “domain events” which are persisted. Those events are replayed on an actor’s start in order to restore the latest customer’s state:
- Scala
-
source
sealed trait DomainEvent case class ItemAdded(item: Item) extends DomainEvent case object OrderExecuted extends DomainEvent case object OrderDiscarded extends DomainEvent case object CustomerInactive extends DomainEvent
- Java
Customer state data represents the items in a customer’s shopping cart:
- Scala
-
source
case class Item(id: String, name: String, price: Float) sealed trait ShoppingCart { def addItem(item: Item): ShoppingCart def empty(): ShoppingCart } case object EmptyShoppingCart extends ShoppingCart { def addItem(item: Item) = NonEmptyShoppingCart(item :: Nil) def empty() = this } case class NonEmptyShoppingCart(items: Seq[Item]) extends ShoppingCart { def addItem(item: Item) = NonEmptyShoppingCart(items :+ item) def empty() = EmptyShoppingCart }
- Java
Here is how everything is wired together:
- Scala
-
source
startWith(LookingAround, EmptyShoppingCart) when(LookingAround) { case Event(AddItem(item), _) => goto(Shopping).applying(ItemAdded(item)).forMax(1 seconds) case Event(GetCurrentCart, data) => stay().replying(data) } when(Shopping) { case Event(AddItem(item), _) => stay().applying(ItemAdded(item)).forMax(1 seconds) case Event(Buy, _) => goto(Paid).applying(OrderExecuted).andThen { case NonEmptyShoppingCart(items) => reportActor ! PurchaseWasMade(items) saveStateSnapshot() case EmptyShoppingCart => saveStateSnapshot() } case Event(Leave, _) => stop().applying(OrderDiscarded).andThen { case _ => reportActor ! ShoppingCardDiscarded saveStateSnapshot() } case Event(GetCurrentCart, data) => stay().replying(data) case Event(StateTimeout, _) => goto(Inactive).forMax(2 seconds) } when(Inactive) { case Event(AddItem(item), _) => goto(Shopping).applying(ItemAdded(item)).forMax(1 seconds) case Event(StateTimeout, _) => stop().applying(OrderDiscarded).andThen { case _ => reportActor ! ShoppingCardDiscarded } } when(Paid) { case Event(Leave, _) => stop() case Event(GetCurrentCart, data) => stay().replying(data) }
- Java
State data can only be modified directly on initialization. Later it’s modified only as a result of applying domain events. Override the applyEvent
method to define how state data is affected by domain events, see the example below
- Scala
-
source
override def applyEvent(event: DomainEvent, cartBeforeEvent: ShoppingCart): ShoppingCart = { event match { case ItemAdded(item) => cartBeforeEvent.addItem(item) case OrderExecuted => cartBeforeEvent case OrderDiscarded => cartBeforeEvent.empty() case CustomerInactive => cartBeforeEvent } }
- Java
andThen
can be used to define actions which will be executed following event’s persistence - convenient for “side effects” like sending a message or logging. Notice that actions defined in andThen
block are not executed on recovery:
- Scala
-
source
goto(Paid).applying(OrderExecuted).andThen { case NonEmptyShoppingCart(items) => reportActor ! PurchaseWasMade(items) }
- Java
A snapshot of state data can be persisted by calling the saveStateSnapshot()
method:
- Scala
-
source
stop().applying(OrderDiscarded).andThen { case _ => reportActor ! ShoppingCardDiscarded saveStateSnapshot() }
- Java
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the applyEvent
method.
Migration to EventSourcedBehavior
Persistent FSMs can be represented using Persistence Typed. The data stored by Persistence FSM can be read by an EventSourcedBehavior
using a snapshot adapter and an event adapter. The adapters are required as Persistent FSM doesn’t store snapshots and user data directly, it wraps them in internal types that include state transition information.
Before reading the migration guide it is advised to understand Persistence Typed.
Migration steps
- Modify or create new commands to include
replyTo
ActorRef
- Typically persisted events will remain the same
- Create an
EventSourcedBehavior
that mimics the oldPersistentFSM
- Replace any state timeouts with
Behaviors.withTimers
either hard coded or stored in the state - Add an
EventAdapter
to convert state transition events added byPersistentFSM
into private events or filter them - If snapshots are used add a
SnapshotAdapter
to convert PersistentFSM snapshots into theEventSourcedBehavior
sState
The following is the shopping cart example above converted to an EventSourcedBehavior
.
The new commands, note the replyTo field for getting the current cart.
- Scala
-
source
sealed trait Command case class AddItem(item: Item) extends Command case object Buy extends Command case object Leave extends Command case class GetCurrentCart(replyTo: ActorRef[ShoppingCart]) extends Command private case object Timeout extends Command
- Java
The states of the FSM are represented using the EventSourcedBehavior
’s state parameter along with the event and command handlers. Here are the states:
- Scala
-
source
sealed trait State case class LookingAround(cart: ShoppingCart) extends State case class Shopping(cart: ShoppingCart) extends State case class Inactive(cart: ShoppingCart) extends State case class Paid(cart: ShoppingCart) extends State
- Java
The command handler has a separate section for each of the PersistentFSM’s states:
- Scala
-
source
def commandHandler(timers: TimerScheduler[Command])(state: State, command: Command): Effect[DomainEvent, State] = state match { case LookingAround(cart) => command match { case AddItem(item) => Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second)) case GetCurrentCart(replyTo) => replyTo ! cart Effect.none case _ => Effect.none } case Shopping(cart) => command match { case AddItem(item) => Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second)) case Buy => Effect.persist(OrderExecuted).thenRun(_ => timers.cancel(StateTimeout)) case Leave => Effect.persist(OrderDiscarded).thenStop() case GetCurrentCart(replyTo) => replyTo ! cart Effect.none case Timeout => Effect.persist(CustomerInactive) } case Inactive(_) => command match { case AddItem(item) => Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second)) case Timeout => Effect.persist(OrderDiscarded) case _ => Effect.none } case Paid(cart) => command match { case Leave => Effect.stop() case GetCurrentCart(replyTo) => replyTo ! cart Effect.none case _ => Effect.none } }
- Java
Note that there is no explicit support for state timeout as with PersistentFSM but the same behavior can be achieved using Behaviors.withTimers
. If the timer is the same for all events then it can be hard coded, otherwise the old PersistentFSM timeout can be taken from the StateChangeEvent
in the event adapter and is also available when constructing a SnapshotAdapter
. This can be added to an internal event and then stored in the State
. Care must also be taken to restart timers on recovery in the signal handler:
- Scala
-
source
.receiveSignal { case (state, RecoveryCompleted) => state match { case _: Shopping | _: Inactive => timers.startSingleTimer(StateTimeout, Timeout, 1.second) case _ => } }
- Java
Then the event handler:
- Scala
-
source
def eventHandler(state: State, event: DomainEvent): State = { state match { case la @ LookingAround(cart) => event match { case ItemAdded(item) => Shopping(cart.addItem(item)) case _ => la } case Shopping(cart) => event match { case ItemAdded(item) => Shopping(cart.addItem(item)) case OrderExecuted => Paid(cart) case OrderDiscarded => state // will be stopped case CustomerInactive => Inactive(cart) } case i @ Inactive(cart) => event match { case ItemAdded(item) => Shopping(cart.addItem(item)) case OrderDiscarded => i // will be stopped case _ => i } case Paid(_) => state // no events after paid } }
- Java
The last step is the adapters that will allow the new EventSourcedBehavior
to read the old data:
- Scala
-
source
class PersistentFsmEventAdapter extends EventAdapter[DomainEvent, Any] { override def toJournal(e: DomainEvent): Any = e override def manifest(event: DomainEvent): String = "" @nowarn("msg=deprecated") override def fromJournal(journalEvent: Any, manifest: String): EventSeq[DomainEvent] = { journalEvent match { case _: StateChangeEvent => // In this example the state transitions can be inferred from the events // Alternatively the StateChangeEvent can be converted to a private event if either the StateChangeEvent.stateIdentifier // or StateChangeEvent.timeout is required // Many use cases have the same timeout so it can be hard coded, otherwise it cane be stored in the state EventSeq.empty case other => // If using a new domain event model the conversion would happen here EventSeq.single(other.asInstanceOf[DomainEvent]) } } }
- Java
The snapshot adapter needs to adapt an internal type of PersistentFSM so a helper function is provided to build the SnapshotAdapter
:
- Scala
-
source
val persistentFSMSnapshotAdapter: SnapshotAdapter[State] = PersistentFSMMigration.snapshotAdapter[State] { case (stateIdentifier, data, _) => val cart = data.asInstanceOf[ShoppingCart] stateIdentifier match { case "Looking Around" => LookingAround(cart) case "Shopping" => Shopping(cart) case "Inactive" => Inactive(cart) case "Paid" => Paid(cart) case id => throw new IllegalStateException(s"Unexpected state identifier $id") } }
- Java
That concludes all the steps to allow an EventSourcedBehavior
to read a PersistentFSM
’s data. Once the new code has been running you can not roll back as the PersistentFSM will not be able to read data written by Persistence Typed.
There is one case where a full shutdown and startup is required.