Migrating existing data
First, read Shred or delete? and Encrypting data to understand which existing events you might want to modify. Remember that in general, when using event sourcing, it is not recommended to modify events—they are meant to be immutable.
However, if you have an existing application that you want to move to use data encryption for payloads, and eventually also make use of data shredding, you can use akka-persistence-update
to update (modify, in place) events that have been stored using Akka Persistence or Lagom’s Persistent Entities.
Dependencies
To create such a migration you will want to depend on the akka-persistence-update
module in addition to the akka-gdpr
module, since we will be using its encryption capabilities:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-persistence-update" % "1.1.16"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-persistence-update_2.11</artifactId> <version>1.1.16</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-persistence-update_2.11', version: '1.1.16' }
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-gdpr" % "1.1.16"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-gdpr_2.11</artifactId> <version>1.1.16</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-gdpr_2.11', version: '1.1.16' }
Using JournalUpdater
to encrypt data for a given persistenceId
The JournalUpdater
API can perform sweeping changes to events in-place.These APIs are intended to be used for offline migrations of data. Specifically in this case, for GDPR compliance, you may want to encrypt events using GDPR for Akka Persistence. The akka-gdpr
module automatically encrypts events that are wrapped using akka.persistence.gdpr.WithDataSubjectId
. See Wrapping data in WithDataSubjectId for more information.
In order to encrypt all persisted events you may use the transformAllPersistenceIds
operation and replace the existing payload with the payload wrapped in an WithDataSubjectId
. GDPR for Akka Persistence will handle the key lookup and encryption of such wrapped event transparently for you from there on.
When using Lagom it’s recommended to use the WithDataSubjectId
inside the events rather than wrapping the events with WithDataSubjectId
as described in the section Lagom PersistentEntity.
Note that for the updater to be able to perform its duties, the underlying Akka Persistence Journal has to implement such update operations. Currently only the Cassandra and JDBC journals implement this feature.
The Cassandra journal must be version 0.85 or later. That version is not used by default by Lagom but it is possible and existing applications can be migrated. Contact Lightbend Support for detailed instructions and help with updating the Cassandra journal version with Lagom.
The JDBC journal must be version 3.4.0 or later, and that is the version that Lagom 1.4.6 is using.
- Scala
-
import java.security.MessageDigest import akka.Done import akka.stream.scaladsl.Sink import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.PersistenceQuery import akka.actor.ActorSystem import akka.persistence.gdpr.WithDataSubjectId import akka.persistence.update.JournaledEvent import akka.persistence.update.cassandra.scaladsl.CassandraJournalTransformer import akka.persistence.update.scaladsl.{ JournalTransformer, JournalUpdater } import akka.stream.ActorMaterializer import scala.concurrent.Future class GdprMigrationExamples { implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val mat = ActorMaterializer() val identifier: String = CassandraJournalTransformer.Identifier // or, for JDBC, the following: // val identifier: String = JdbcJournalTransformer.Identifier val transformer: JournalTransformer = JournalUpdater(system) .journalUpdaterFor[JournalTransformer](identifier) def run(): Unit = { transformer.transformAllPersistenceIds(updateEvent) } private def updateEvent(event: JournaledEvent): Future[JournaledEvent] = { // determine the encryption key for given event: val encryptiondataSubjectId = determineEncryptionKey(event) encryptiondataSubjectId match { case Some(key) => // TODO this cast is needed since the Any type of payload, yet if we change it, we break other places... ideas? val withSubjectId = event.withPayload(WithDataSubjectId[AnyRef](key, event.payload.asInstanceOf[AnyRef])) Future.successful(withSubjectId) case None => // not encrypting this event: Future.successful(event) } } // only share the instance when using parallelism = 1 private val sha1 = MessageDigest.getInstance("SHA-1") /** * Implement your logic for determining a stable data subject id for each event here. * * For example, it could be based on masking a known user identifier that exists in all * events related to a given user. Or it could be *based on* the persistenceId of the event passed in, * which is a simple and effective solution. */ private def determineEncryptionKey(event: JournaledEvent): Option[String] = { if (event.persistenceId startsWith "user") { try { sha1.update("my-app-secrets".getBytes) sha1.update(event.persistenceId.getBytes) Some(new String(sha1.digest())) } finally { sha1.reset() } } else { None } } }
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.gdpr.WithDataSubjectId; import akka.persistence.query.PersistenceQuery; import akka.persistence.update.JournaledEvent; import akka.persistence.update.cassandra.javadsl.CassandraJournalTransformer; import akka.persistence.update.javadsl.JournalTransformer; import akka.persistence.update.javadsl.JournalUpdater; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Sink; import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContextExecutor; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class GdprMigration { final ActorSystem system; final Materializer materializer; final MessageDigest SHA1; // only share the instance when using parallelism = 1 String identifier = CassandraJournalTransformer.Identifier(); // or, for JDBC, the following: // String identifier = JdbcJournalTransformer.Identifier(); final JournalTransformer transformer; public GdprMigration() { system = ActorSystem.create(); materializer = ActorMaterializer.create(system); transformer = JournalUpdater.get(system) .getJournalUpdaterFor(JournalTransformer.class, identifier); try { SHA1 = MessageDigest.getInstance("SHA-1"); } catch (NoSuchAlgorithmException ex) { throw new RuntimeException(ex); // should not happen } } public static void main(String... args) throws Exception { new GdprMigration().run(); } private void run() { transformer.transformAllPersistenceIds(this::updateEvent); } private CompletionStage<JournaledEvent> updateEvent(JournaledEvent event) { // determine the encryption key for given event: Optional<String> encryptiondataSubjectId = determineEncryptionKey(event); if (encryptiondataSubjectId.isPresent()) { String key = encryptiondataSubjectId.get(); JournaledEvent withSubjectId = event.withPayload(WithDataSubjectId.create(key, event.payload())); return CompletableFuture.completedFuture(withSubjectId); } else { return CompletableFuture.completedFuture(event); } } /** * Implement your logic for determining a stable data subject id for each event here. * * For example, it could be based on masking a known user identifier that exists in all * events related to a given user. Or it could be *based on* the persistenceId of the event passed in, * which is a simple and effective solution. */ private Optional<String> determineEncryptionKey(JournaledEvent event) { if (event.persistenceId().startsWith("user")) { try { SHA1.update("my-app-secrets".getBytes()); SHA1.update(event.persistenceId().getBytes()); return Optional.of(new String(SHA1.digest())); } catch (Exception ex) { return Optional.empty(); } finally { SHA1.reset(); } } else { return Optional.empty(); } } }
You can use the snippet above to create a runnable migration app that you can execute against your systems. We recommend first doing so on a staging environment to be sure that the key assignment logic is sound and that replays of such migrated system continue to work properly.
You can also selectively run migrations on a specific persistenceId
like this:
- Scala
-
def runSingle(persistenceId: String): Future[Done] = { transformer.transformPersistenceId(persistenceId, updateEvent) }
- Java
-
private CompletionStage<Done> runSingle(String persistenceId) { return transformer.transformPersistenceId(persistenceId, this::updateEvent); }
Finally, you can run the migration id-by-id, rather than scanning through all events, which does not guarantee ordering of events being ordered by persistence id. This approach may be more useful if you needed to abort a migration and have a number of known-to-be-already-migrated events or if you know which persistenceIds need to be migrated to the encrypted format:
- Scala
-
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.PersistenceQuery val queryJournal: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) def runIdById(): Future[Done] = { // obtain all persistence Ids (you could implement "skip known already migrated ids" here by filtering) queryJournal.persistenceIds().mapAsync(parallelism = 1) { id => // for each of the ids perform an one-off per-id migration: transformer.transformPersistenceId(id, updateEvent) }.log("completion") .runWith(Sink.ignore) }
- Java
-
final CassandraReadJournal queryJournal = PersistenceQuery.get(system) .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier()); public CompletionStage<Done> runIdById(CassandraReadJournal queryJournal) { int parallelism = 1; // since we reuse the SHA1 instance, we have to do 1 here return queryJournal.persistenceIds().mapAsync(parallelism, persistenceId -> transformer.transformPersistenceId(persistenceId, this::updateEvent) ).log("completion") .runWith(Sink.ignore(), materializer); }
See also discussion about Data Subject Id and how to treat them.
Migrating snapshots
In addition you may want to delete or migrate to their encrypted forms any snapshots a persistent actor has created. The following sections discuss:
Deleting snapshots
Note that snapshots in general are a performance optimization, and unlike events, it is fine to drop them all up-front when deciding to move to encrypted payloads in the snapshot store. During normal operations the system will then continue as usual, and any snapshot that is going to be persisted wrapped in an akka.persistence.gdpr.WithDataSubjectId
will be encrypted with the proper key then as well. This will in turn enable you to use the data shredding technique instead of deleting snapshots for any future removal deletion requests.
Here is a sample app that you can use to delete snapshots for all, or a subset of (by filtering the all persistence ids stream) your persistent actors.
First we prepare an actor that is designed only to delete snapshots, and ignore all existing events:
- Scala
-
object SnapshotDeleter { final case object DeleteSnapshots final case class SnapshotsDeleted(criteria: SnapshotSelectionCriteria) } class SnapshotDeleter(val persistenceId: String) extends PersistentActor with ActorLogging { import SnapshotDeleter._ private var notifyOnceSnapshotsDeleted: Option[ActorRef] = None override def recovery: Recovery = Recovery() override def receiveRecover: Receive = { // snapshot offer case SnapshotOffer(meta, snap) => // ignore actual snapshot, we // events case event => // ignore all events ... } override def receiveCommand: Receive = { case DeleteSnapshots => notifyOnceSnapshotsDeleted = Some(sender()) deleteSnapshots(SnapshotSelectionCriteria()) // handle snapshot deletion response case DeleteSnapshotsSuccess(criteria) => notifyOnceSnapshotsDeleted foreach { _ ! SnapshotsDeleted(criteria) } context stop self case DeleteSnapshotsFailure(criteria, cause) => // replying with a failure will terminate the outer stream log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria) notifyOnceSnapshotsDeleted foreach { _ ! Failure(cause) } context stop self } }
And then we make use of it by executing it for all existing persistence ids:
- Scala
-
implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val mat = ActorMaterializer() val log = system.log val queryJournal: CassandraReadJournal = PersistenceQuery(system) .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) def deleteAllSnapshots(): Future[Done] = { import akka.pattern.ask implicit val timeout: Timeout = Timeout(60.seconds) // obtain all persistence Ids (you could implement "skip known already migrated ids" here by filtering) queryJournal.persistenceIds() .mapAsync(parallelism = 1) { id => val deleter = system.actorOf(Props(new SnapshotDeleter(id)), s"delete-snapshots-$id") val deleted = (deleter ? SnapshotDeleter.DeleteSnapshots).mapTo[SnapshotDeleter.SnapshotsDeleted] deleted.map { d => log.info("Deleted snapshots for {} {}", id, d.criteria) } }.log("completion") .runWith(Sink.ignore) }
Replacing with a new encrypted snapshot
Other than deleting all snapshots and rely on the running application after doing so to recover the snapshots in normal operation, you may want to proactively make all actors create new snapshots using GDPR for Akka Persistence encryption so that they can be subject to data shredding.
Do to this you can use the following pattern, and instead of the example Actor here use your actual actors. You would have to make sure to start the right type of actor for the specific persistence id though.
Note that the persistent actor performs an actual replay and recovery of its state, unlike in the “only delete the snapshots” application. It does so since once it has completed recovery, it will persist a snapshot yet wrap it using the WithDataSubjectId
same as we were doing for events previously. GDPR for Akka Persistence will take care of properly encrypting the payload with the right key associated to the given subject id transparently. Once this has complete successfully, you can issue a deleteSnapshots
similar to how this was done in the “deleting snapshots” section, however this time we want to delete all-but-the-last snapshot – since that one is already encrypted and we want to keep it for future use.
When using this technique do remember to make sure to start the right kind of actor for each of the ids, as otherwise you would accidentally attempt replaying and recovering state using wrong logic (that would not align with the data persisted for the given persistence id).
Also note that some actors may use the name of the actor and for those you must use the correct name when starting the actor from a stream rather than in the normal application. Persistent actors running in Cluster Sharding may use the actor name as the entity identifier. That is also the case for Lagom PersistentEntity
. Typically such name can be derived from the persistenceId
.
- Scala
-
object DeleteSnapshotAndSnapNewEncryptedOne { final case object AwaitCompletion final case class SnapshotUpdateComplete(criteria: SnapshotSelectionCriteria, newSnapshotSeqNr: Long) // example state, this would be your application state (like "wallet state" etc) final case class ExampleState(data: Int) final case class ExampleEvent(update: Int) } class DeleteSnapshotAndSnapNewEncryptedOne(val persistenceId: String) extends PersistentActor with ActorLogging { import DeleteSnapshotAndSnapNewEncryptedOne._ private var state: ExampleState = ExampleState(0) private var snapshotUpdated = false private var notifyOnceDone: Option[ActorRef] = None override def recovery: Recovery = Recovery() override def receiveRecover: Receive = { // snapshot offer case SnapshotOffer(meta, snap: ExampleState) => state = snap // we do use the previous snapshot // events, we process them all since we will want to store a new snapshot case ExampleEvent(update) => state = updateState(update) // time to store our new snapshot! case RecoveryCompleted => saveSnapshot(WithDataSubjectId(s"MY_SPECIAL_ID_$persistenceId", state)) } // this is only an example; your usual business logic updating the state would be here private def updateState(update: Int): ExampleState = { state.copy(state.data + update) } override def receiveCommand: Receive = { case AwaitCompletion => notifyOnceDone = Some(sender()) // handle snapshot update result (the new, encrypted one) case SaveSnapshotSuccess(criteria) => // delete all except the previous snapshot deleteSnapshots(SnapshotSelectionCriteria().copy(maxSequenceNr = snapshotSequenceNr - 1)) case SaveSnapshotFailure(criteria, cause) => // replying with a failure will terminate the outer stream log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria) notifyOnceDone foreach { _ ! Failure(cause) } context stop self // handle snapshot deletion response case DeleteSnapshotsSuccess(criteria) => if (snapshotUpdated) { notifyOnceDone foreach { _ ! SnapshotUpdateComplete(criteria, snapshotSequenceNr) } context stop self } case DeleteSnapshotsFailure(criteria, cause) => // replying with a failure will terminate the outer stream log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria) notifyOnceDone foreach { _ ! Failure(cause) } context stop self } }
Running the application is similar to what we did previously, we start the “updater” actor for each of the ids and wait for it to reply that it has completed its task by using the ask pattern:
- Scala
-
object DeleteSnapshotAndSnapNewEncryptedOne { final case object AwaitCompletion final case class SnapshotUpdateComplete(criteria: SnapshotSelectionCriteria, newSnapshotSeqNr: Long) // example state, this would be your application state (like "wallet state" etc) final case class ExampleState(data: Int) final case class ExampleEvent(update: Int) } class DeleteSnapshotAndSnapNewEncryptedOne(val persistenceId: String) extends PersistentActor with ActorLogging { import DeleteSnapshotAndSnapNewEncryptedOne._ private var state: ExampleState = ExampleState(0) private var snapshotUpdated = false private var notifyOnceDone: Option[ActorRef] = None override def recovery: Recovery = Recovery() override def receiveRecover: Receive = { // snapshot offer case SnapshotOffer(meta, snap: ExampleState) => state = snap // we do use the previous snapshot // events, we process them all since we will want to store a new snapshot case ExampleEvent(update) => state = updateState(update) // time to store our new snapshot! case RecoveryCompleted => saveSnapshot(WithDataSubjectId(s"MY_SPECIAL_ID_$persistenceId", state)) } // this is only an example; your usual business logic updating the state would be here private def updateState(update: Int): ExampleState = { state.copy(state.data + update) } override def receiveCommand: Receive = { case AwaitCompletion => notifyOnceDone = Some(sender()) // handle snapshot update result (the new, encrypted one) case SaveSnapshotSuccess(criteria) => // delete all except the previous snapshot deleteSnapshots(SnapshotSelectionCriteria().copy(maxSequenceNr = snapshotSequenceNr - 1)) case SaveSnapshotFailure(criteria, cause) => // replying with a failure will terminate the outer stream log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria) notifyOnceDone foreach { _ ! Failure(cause) } context stop self // handle snapshot deletion response case DeleteSnapshotsSuccess(criteria) => if (snapshotUpdated) { notifyOnceDone foreach { _ ! SnapshotUpdateComplete(criteria, snapshotSequenceNr) } context stop self } case DeleteSnapshotsFailure(criteria, cause) => // replying with a failure will terminate the outer stream log.error(cause, "Failed to delete snapshot for {} {}", persistenceId, criteria) notifyOnceDone foreach { _ ! Failure(cause) } context stop self } }