Migrating existing data

First, read Shred or delete? and Encrypting data to understand which existing events you might want to modify. Remember that in general, when using event sourcing, it is not recommended to modify events—they are meant to be immutable.

However, if you have an existing application that you want to move to use data encryption for payloads, and eventually also make use of data shredding, you can use akka-persistence-update to update (modify, in place) events that have been stored using Akka Persistence or Lagom’s Persistent Entities.

Dependencies

To create such a migration you will want to depend on the akka-persistence-update module in addition to the akka-gdpr module, since we will be using its encryption capabilities:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-persistence-update" % "1.1.16"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-persistence-update_2.11</artifactId>
  <version>1.1.16</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-persistence-update_2.11', version: '1.1.16'
}
sbt
libraryDependencies += "com.lightbend.akka" %% "akka-gdpr" % "1.1.16"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-gdpr_2.11</artifactId>
  <version>1.1.16</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-gdpr_2.11', version: '1.1.16'
}

Using JournalUpdater to encrypt data for a given persistenceId

The JournalUpdater API can perform sweeping changes to events in-place.These APIs are intended to be used for offline migrations of data. Specifically in this case, for GDPR compliance, you may want to encrypt events using GDPR for Akka Persistence. The akka-gdpr module automatically encrypts events that are wrapped using akka.persistence.gdpr.WithDataSubjectId. See Wrapping data in WithDataSubjectId for more information.

In order to encrypt all persisted events you may use the transformAllPersistenceIds operation and replace the existing payload with the payload wrapped in an WithDataSubjectId. GDPR for Akka Persistence will handle the key lookup and encryption of such wrapped event transparently for you from there on.

When using Lagom it’s recommended to use the WithDataSubjectId inside the events rather than wrapping the events with WithDataSubjectId as described in the section Lagom PersistentEntity.

Note

Note that for the updater to be able to perform its duties, the underlying Akka Persistence Journal has to implement such update operations. Currently only the Cassandra and JDBC journals implement this feature.

The Cassandra journal must be version 0.85 or later. That version is not used by default by Lagom but it is possible and existing applications can be migrated. Contact Lightbend Support for detailed instructions and help with updating the Cassandra journal version with Lagom.

The JDBC journal must be version 3.4.0 or later, and that is the version that Lagom 1.4.6 is using.

Scala
import java.security.MessageDigest

import akka.Done
import akka.stream.scaladsl.Sink


import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery


import akka.actor.ActorSystem
import akka.persistence.gdpr.WithDataSubjectId
import akka.persistence.update.JournaledEvent
import akka.persistence.update.cassandra.scaladsl.CassandraJournalTransformer
import akka.persistence.update.scaladsl.{ JournalTransformer, JournalUpdater }
import akka.stream.ActorMaterializer

import scala.concurrent.Future

class GdprMigrationExamples {

  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  val identifier: String = CassandraJournalTransformer.Identifier
  // or, for JDBC, the following:
  //  val identifier: String = JdbcJournalTransformer.Identifier

  val transformer: JournalTransformer = JournalUpdater(system)
    .journalUpdaterFor[JournalTransformer](identifier)

  def run(): Unit = {
    transformer.transformAllPersistenceIds(updateEvent)
  }

  private def updateEvent(event: JournaledEvent): Future[JournaledEvent] = {
    // determine the encryption key for given event:
    val encryptiondataSubjectId = determineEncryptionKey(event)

    encryptiondataSubjectId match {
      case Some(key) =>
        // TODO this cast is needed since the Any type of payload, yet if we change it, we break other places... ideas?
        val withSubjectId = event.withPayload(WithDataSubjectId[AnyRef](key, event.payload.asInstanceOf[AnyRef]))
        Future.successful(withSubjectId)

      case None =>
        // not encrypting this event:
        Future.successful(event)
    }
  }


  // only share the instance when using parallelism = 1
  private val sha1 = MessageDigest.getInstance("SHA-1")

  /**
   * Implement your logic for determining a stable data subject id for each event here.
   *
   * For example, it could be based on masking a known user identifier that exists in all
   * events related to a given user. Or it could be *based on* the persistenceId of the event passed in,
   * which is a simple and effective solution.
   */
  private def determineEncryptionKey(event: JournaledEvent): Option[String] = {
    if (event.persistenceId startsWith "user") {
      try {
        sha1.update("my-app-secrets".getBytes)
        sha1.update(event.persistenceId.getBytes)

        Some(new String(sha1.digest()))
      } finally {
        sha1.reset()
      }
    } else {
      None
    }
  }
}
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal;
import akka.persistence.gdpr.WithDataSubjectId;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.update.JournaledEvent;
import akka.persistence.update.cassandra.javadsl.CassandraJournalTransformer;
import akka.persistence.update.javadsl.JournalTransformer;
import akka.persistence.update.javadsl.JournalUpdater;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;

import java.security.MessageDigest;

import java.security.NoSuchAlgorithmException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class GdprMigration {

  final ActorSystem system;
  final Materializer materializer;

  final MessageDigest SHA1; // only share the instance when using parallelism = 1


  String identifier = CassandraJournalTransformer.Identifier();
  // or, for JDBC, the following:
  //  String identifier = JdbcJournalTransformer.Identifier();

  final JournalTransformer transformer;

  public GdprMigration() {
    system = ActorSystem.create();
    materializer = ActorMaterializer.create(system);
    transformer = JournalUpdater.get(system)
      .getJournalUpdaterFor(JournalTransformer.class, identifier);

    try {
      SHA1 = MessageDigest.getInstance("SHA-1");
    } catch (NoSuchAlgorithmException ex) {
      throw new RuntimeException(ex); // should not happen
    }
  }

  public static void main(String... args) throws Exception {
    new GdprMigration().run();
  }

  private void run() {
    transformer.transformAllPersistenceIds(this::updateEvent);
  }

  private CompletionStage<JournaledEvent> updateEvent(JournaledEvent event) {
    // determine the encryption key for given event:
    Optional<String> encryptiondataSubjectId = determineEncryptionKey(event);

    if (encryptiondataSubjectId.isPresent()) {
      String key = encryptiondataSubjectId.get();
      JournaledEvent withSubjectId = event.withPayload(WithDataSubjectId.create(key, event.payload()));
      return CompletableFuture.completedFuture(withSubjectId);

    } else {
      return CompletableFuture.completedFuture(event);
    }
  }

  /**
   * Implement your logic for determining a stable data subject id for each event here.
   *
   * For example, it could be based on masking a known user identifier that exists in all
   * events related to a given user. Or it could be *based on* the persistenceId of the event passed in,
   * which is a simple and effective solution.
   */
  private Optional<String> determineEncryptionKey(JournaledEvent event) {
    if (event.persistenceId().startsWith("user")) {
      try {
        SHA1.update("my-app-secrets".getBytes());
        SHA1.update(event.persistenceId().getBytes());
        return Optional.of(new String(SHA1.digest()));
      } catch (Exception ex) {
        return  Optional.empty();
      } finally {
        SHA1.reset();
      }
    } else {
      return Optional.empty();
    }
  }
}

You can use the snippet above to create a runnable migration app that you can execute against your systems. We recommend first doing so on a staging environment to be sure that the key assignment logic is sound and that replays of such migrated system continue to work properly.

You can also selectively run migrations on a specific persistenceId like this:

Scala
def runSingle(persistenceId: String): Future[Done] = {
  transformer.transformPersistenceId(persistenceId, updateEvent)
}
Java
private CompletionStage<Done> runSingle(String persistenceId) {
  return transformer.transformPersistenceId(persistenceId, this::updateEvent);
}

Finally, you can run the migration id-by-id, rather than scanning through all events, which does not guarantee ordering of events being ordered by persistence id. This approach may be more useful if you needed to abort a migration and have a number of known-to-be-already-migrated events or if you know which persistenceIds need to be migrated to the encrypted format:

Scala
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery

val queryJournal: CassandraReadJournal =
  PersistenceQuery(system)
    .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

def runIdById(): Future[Done] = {
  // obtain all persistence Ids (you could implement "skip known already migrated ids" here by filtering)
  queryJournal.persistenceIds().mapAsync(parallelism = 1) { id =>
    // for each of the ids perform an one-off per-id migration:
    transformer.transformPersistenceId(id, updateEvent)
  }.log("completion")
    .runWith(Sink.ignore)
}
Java
  final CassandraReadJournal queryJournal =
      PersistenceQuery.get(system)
          .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier());

public CompletionStage<Done> runIdById(CassandraReadJournal queryJournal) {
  int parallelism = 1; // since we reuse the SHA1 instance, we have to do 1 here
  return queryJournal.persistenceIds().mapAsync(parallelism, persistenceId ->
      transformer.transformPersistenceId(persistenceId, this::updateEvent)
  ).log("completion")
  .runWith(Sink.ignore(), materializer);
}
Note

See also discussion about Data Subject Id and how to treat them.

Migrating snapshots

In addition you may want to delete or migrate to their encrypted forms any snapshots a persistent actor has created. The following sections discuss:

Deleting snapshots

Note that snapshots in general are a performance optimization, and unlike events, it is fine to drop them all up-front when deciding to move to encrypted payloads in the snapshot store. During normal operations the system will then continue as usual, and any snapshot that is going to be persisted wrapped in an akka.persistence.gdpr.WithDataSubjectId will be encrypted with the proper key then as well. This will in turn enable you to use the data shredding technique instead of deleting snapshots for any future removal deletion requests.

Here is a sample app that you can use to delete snapshots for all, or a subset of (by filtering the all persistence ids stream) your persistent actors.

First we prepare an actor that is designed only to delete snapshots, and ignore all existing events:

Scala
object SnapshotDeleter {
  final case object DeleteSnapshots
  final case class SnapshotsDeleted(criteria: SnapshotSelectionCriteria)
}
class SnapshotDeleter(val persistenceId: String) extends PersistentActor with ActorLogging {
  import SnapshotDeleter._

  private var notifyOnceSnapshotsDeleted: Option[ActorRef] = None

  override def recovery: Recovery = Recovery()

  override def receiveRecover: Receive = {
    // snapshot offer
    case SnapshotOffer(meta, snap) =>
    // ignore actual snapshot, we

    // events
    case event                     =>
    // ignore all events ...
  }

  override def receiveCommand: Receive = {
    case DeleteSnapshots =>
      notifyOnceSnapshotsDeleted = Some(sender())
      deleteSnapshots(SnapshotSelectionCriteria())

    // handle snapshot deletion response
    case DeleteSnapshotsSuccess(criteria) =>
      notifyOnceSnapshotsDeleted foreach { _ ! SnapshotsDeleted(criteria) }
      context stop self

    case DeleteSnapshotsFailure(criteria, cause) =>
      // replying with a failure will terminate the outer stream
      log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria)
      notifyOnceSnapshotsDeleted foreach { _ ! Failure(cause) }
      context stop self
  }
}

And then we make use of it by executing it for all existing persistence ids:

Scala
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
val log = system.log

val queryJournal: CassandraReadJournal =
  PersistenceQuery(system)
    .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)


def deleteAllSnapshots(): Future[Done] = {
  import akka.pattern.ask
  implicit val timeout: Timeout = Timeout(60.seconds)

  // obtain all persistence Ids (you could implement "skip known already migrated ids" here by filtering)
  queryJournal.persistenceIds()
    .mapAsync(parallelism = 1) { id =>
      val deleter = system.actorOf(Props(new SnapshotDeleter(id)), s"delete-snapshots-$id")
      val deleted = (deleter ? SnapshotDeleter.DeleteSnapshots).mapTo[SnapshotDeleter.SnapshotsDeleted]
      deleted.map { d => log.info("Deleted snapshots for {} {}", id, d.criteria) }
    }.log("completion")
    .runWith(Sink.ignore)
}

Replacing with a new encrypted snapshot

Other than deleting all snapshots and rely on the running application after doing so to recover the snapshots in normal operation, you may want to proactively make all actors create new snapshots using GDPR for Akka Persistence encryption so that they can be subject to data shredding.

Do to this you can use the following pattern, and instead of the example Actor here use your actual actors. You would have to make sure to start the right type of actor for the specific persistence id though.

Note that the persistent actor performs an actual replay and recovery of its state, unlike in the “only delete the snapshots” application. It does so since once it has completed recovery, it will persist a snapshot yet wrap it using the WithDataSubjectId same as we were doing for events previously. GDPR for Akka Persistence will take care of properly encrypting the payload with the right key associated to the given subject id transparently. Once this has complete successfully, you can issue a deleteSnapshots similar to how this was done in the “deleting snapshots” section, however this time we want to delete all-but-the-last snapshot – since that one is already encrypted and we want to keep it for future use.

When using this technique do remember to make sure to start the right kind of actor for each of the ids, as otherwise you would accidentally attempt replaying and recovering state using wrong logic (that would not align with the data persisted for the given persistence id).

Also note that some actors may use the name of the actor and for those you must use the correct name when starting the actor from a stream rather than in the normal application. Persistent actors running in Cluster Sharding may use the actor name as the entity identifier. That is also the case for Lagom PersistentEntity. Typically such name can be derived from the persistenceId.

Scala
object DeleteSnapshotAndSnapNewEncryptedOne {
  final case object AwaitCompletion
  final case class SnapshotUpdateComplete(criteria: SnapshotSelectionCriteria, newSnapshotSeqNr: Long)

  // example state, this would be your application state (like "wallet state" etc)
  final case class ExampleState(data: Int)
  final case class ExampleEvent(update: Int)
}
class DeleteSnapshotAndSnapNewEncryptedOne(val persistenceId: String) extends PersistentActor with ActorLogging {
  import DeleteSnapshotAndSnapNewEncryptedOne._

  private var state: ExampleState = ExampleState(0)
  private var snapshotUpdated = false
  private var notifyOnceDone: Option[ActorRef] = None

  override def recovery: Recovery = Recovery()

  override def receiveRecover: Receive = {
    // snapshot offer
    case SnapshotOffer(meta, snap: ExampleState) =>
      state = snap // we do use the previous snapshot

    // events, we process them all since we will want to store a new snapshot
    case ExampleEvent(update) =>
      state = updateState(update)

    // time to store our new snapshot!
    case RecoveryCompleted =>
      saveSnapshot(WithDataSubjectId(s"MY_SPECIAL_ID_$persistenceId", state))
  }

  // this is only an example; your usual business logic updating the state would be here
  private def updateState(update: Int): ExampleState = {
    state.copy(state.data + update)
  }

  override def receiveCommand: Receive = {
    case AwaitCompletion =>
      notifyOnceDone = Some(sender())

    // handle snapshot update result (the new, encrypted one)
    case SaveSnapshotSuccess(criteria) =>
      // delete all except the previous snapshot
      deleteSnapshots(SnapshotSelectionCriteria().copy(maxSequenceNr = snapshotSequenceNr - 1))

    case SaveSnapshotFailure(criteria, cause) =>
      // replying with a failure will terminate the outer stream
      log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria)
      notifyOnceDone foreach { _ ! Failure(cause) }
      context stop self

    // handle snapshot deletion response
    case DeleteSnapshotsSuccess(criteria) =>
      if (snapshotUpdated) {
        notifyOnceDone foreach { _ ! SnapshotUpdateComplete(criteria, snapshotSequenceNr) }
        context stop self
      }

    case DeleteSnapshotsFailure(criteria, cause) =>
      // replying with a failure will terminate the outer stream
      log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria)
      notifyOnceDone foreach { _ ! Failure(cause) }
      context stop self
  }
}

Running the application is similar to what we did previously, we start the “updater” actor for each of the ids and wait for it to reply that it has completed its task by using the ask pattern:

Scala
object DeleteSnapshotAndSnapNewEncryptedOne {
  final case object AwaitCompletion
  final case class SnapshotUpdateComplete(criteria: SnapshotSelectionCriteria, newSnapshotSeqNr: Long)

  // example state, this would be your application state (like "wallet state" etc)
  final case class ExampleState(data: Int)
  final case class ExampleEvent(update: Int)
}
class DeleteSnapshotAndSnapNewEncryptedOne(val persistenceId: String) extends PersistentActor with ActorLogging {
  import DeleteSnapshotAndSnapNewEncryptedOne._

  private var state: ExampleState = ExampleState(0)
  private var snapshotUpdated = false
  private var notifyOnceDone: Option[ActorRef] = None

  override def recovery: Recovery = Recovery()

  override def receiveRecover: Receive = {
    // snapshot offer
    case SnapshotOffer(meta, snap: ExampleState) =>
      state = snap // we do use the previous snapshot

    // events, we process them all since we will want to store a new snapshot
    case ExampleEvent(update) =>
      state = updateState(update)

    // time to store our new snapshot!
    case RecoveryCompleted =>
      saveSnapshot(WithDataSubjectId(s"MY_SPECIAL_ID_$persistenceId", state))
  }

  // this is only an example; your usual business logic updating the state would be here
  private def updateState(update: Int): ExampleState = {
    state.copy(state.data + update)
  }

  override def receiveCommand: Receive = {
    case AwaitCompletion =>
      notifyOnceDone = Some(sender())

    // handle snapshot update result (the new, encrypted one)
    case SaveSnapshotSuccess(criteria) =>
      // delete all except the previous snapshot
      deleteSnapshots(SnapshotSelectionCriteria().copy(maxSequenceNr = snapshotSequenceNr - 1))

    case SaveSnapshotFailure(criteria, cause) =>
      // replying with a failure will terminate the outer stream
      log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria)
      notifyOnceDone foreach { _ ! Failure(cause) }
      context stop self

    // handle snapshot deletion response
    case DeleteSnapshotsSuccess(criteria) =>
      if (snapshotUpdated) {
        notifyOnceDone foreach { _ ! SnapshotUpdateComplete(criteria, snapshotSequenceNr) }
        context stop self
      }

    case DeleteSnapshotsFailure(criteria, cause) =>
      // replying with a failure will terminate the outer stream
      log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria)
      notifyOnceDone foreach { _ ! Failure(cause) }
      context stop self
  }
}