Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)
Loading

Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)

Akka Persistence is an experimental module, which means that neither Binary Compatibility nor API stability is provided for Persistence while under the experimental flag. The goal of this phase is to gather user feedback before we freeze the APIs in a major release.

Renamed EventsourcedProcessor to PersistentActor

EventsourcedProcessor is now deprecated and replaced by PersistentActor which provides the same (and more) API. Migrating to 2.4.x is as simple as changing all your classes to extending PersistentActor.

Replace all classes like:

class DeprecatedProcessor extends EventsourcedProcessor {
  def processorId = "id"
  /*...*/
}

To extend PersistentActor:

class NewPersistentActor extends PersistentActor {
  def persistenceId = "id"
  /*...*/
}

Read more about the persistent actor in the documentation for Scala and documentation for Java.

Changed processorId to (abstract) persistenceId

In Akka Persistence 2.3.3 and previously, the main building block of applications were Processors. Persistent messages, as well as processors implemented the processorId method to identify which persistent entity a message belonged to.

This concept remains the same in Akka 2.3.4, yet we rename processorId to persistenceId because Processors will be removed, and persistent messages can be used from different classes not only PersistentActor (Views, directly from Journals etc).

Please note that persistenceId is abstract in the new API classes (PersistentActor and PersistentView), and we do not provide a default (actor-path derrived) value for it like we did for processorId. The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical "which persistent entity this actor represents". A longer discussion on this subject can be found on issue #15436 on github.

In case you want to perserve the old behavior of providing the actor's path as the default persistenceId, you can easily implement it yourself either as a helper trait or simply by overriding persistenceId as follows:

override def persistenceId = self.path.toStringWithoutAddress

We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of processorId to persistenceId.

Removed Processor in favour of extending PersistentActor with persistAsync

The Processor is now deprecated since 2.3.4 and will be removed in 2.4.x. It's semantics replicated in PersistentActor in the form of an additional persist method: persistAsync.

In essence, the difference betwen persist and persistAsync is that the former will stash all incomming commands until all persist callbacks have been processed, whereas the latter does not stash any commands. The new persistAsync should be used in cases of low consistency yet high responsiveness requirements, the Actor can keep processing incomming commands, even though not all previous events have been handled.

When these persist and persistAsync are used together in the same PersistentActor, the persist logic will win over the async version so that all guarantees concerning persist still hold. This will however lower the throughput

Now deprecated code using Processor:

class OldProcessor extends Processor {
  override def processorId = "user-wallet-1337"

  def receive = {
    case Persistent(cmd) => sender() ! cmd
  }
}

Replacement code, with the same semantics, using PersistentActor:

class NewProcessor extends PersistentActor {
  override def persistenceId = "user-wallet-1337"

  def receiveCommand = {
    case cmd =>
      persistAsync(cmd) { e => sender() ! e }
  }

  def receiveRecover = {
    case _ => // logic for handling replay
  }
}

It is worth pointing out that using sender() inside the persistAsync callback block is valid, and does not suffer any of the problems Futures have when closing over the sender reference.

Using the PersistentActor instead of Processor also shifts the responsibility of deciding if a message should be persisted to the receiver instead of the sender of the message. Previously, using Processor, clients would have to wrap messages as Persistent(cmd) manually, as well as have to be aware of the receiver being a Processor, which didn't play well with transparency of the ActorRefs in general.

Removed deleteMessage

deleteMessage is deprecated and will be removed. When using command sourced Processor the command was stored before it was received and could be validated and then there was a reason to remove faulty commands to avoid repeating the error during replay. When using PersistentActor you can always validate the command before persisting and therefore the stored event (or command) should always be valid for replay.

deleteMessages can still be used for pruning of all messages up to a sequence number.

Renamed View to PersistentView, which receives plain messages (Persistent() wrapper is gone)

Views used to receive messages wrapped as Persistent(payload, seqNr), this is no longer the case and views receive the payload as message from the Journal directly. The rationale here is that the wrapper aproach was inconsistent with the other Akka Persistence APIs and also is not easily "discoverable" (you have to know you will be getting this Persistent wrapper).

Instead, since 2.3.4, views get plain messages, and can use additional methods provided by the View to identify if a message was sent from the Journal (had been played back to the view). So if you had code like this:

class AverageView extends View {
  override def processorId = "average-view"

  def receive = {
    case Persistent(msg, seqNr) =>
      // from Journal

    case msg =>
      // from user-land
}

You should update it to extend PersistentView instead:

class AverageView extends PersistentView {
  override def persistenceId = "persistence-sample"
  override def viewId = "persistence-sample-average"

  def receive = {
    case msg if isPersistent =>
      // from Journal
      val seqNr = lastSequenceNr // in case you require the sequence number

    case msg =>
      // from user-land
  }
}

In case you need to obtain the current sequence number the view is looking at, you can use the lastSequenceNr method. It is equivalent to "current sequence number", when isPersistent returns true, otherwise it yields the sequence number of the last persistent message that this view was updated with.

Removed Channel and PersistentChannel in favour of AtLeastOnceDelivery trait

One of the primary tasks of a Channel was to de-duplicate messages that were sent from a Processor during recovery. Performing external side effects during recovery is not encouraged with event sourcing and therefore the Channel is not needed for this purpose.

The Channel and PersistentChannel also performed at-least-once delivery of messages, but it did not free a sending actor from implementing retransmission or timeouts, since the acknowledgement from the channel is needed to guarantee safe hand-off. Therefore at-least-once delivery is provided in a new AtLeastOnceDelivery trait that is mixed-in to the persistent actor on the sending side.

Read more about at-least-once delivery in the documentation for Scala and documentation for Java.

Contents