Using akka-gdpr
The akka-gdpr
module contains utilities to help you manage data shredding, including:
- A
GdprEncryption
extension point that you can use to plug your own encryption. Alternatively, you can use theAbstractGdprEncryption
class that provides encryption but requires you to implement key management. - A serializer that encrypts events and adds a data subject id, when you wrap events with
WithDataSubjectId
. You must provide your own serializer for events that reference multiple data subjects. If you are using Jackson or Play JSON for serialization, theakka-gdpr
module allows you to use them for encryption.
Dependency
To use the GDPR for Akka Persistence feature, add a dependency for the akka-gdpr artifact:
- sbt
-
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html "com.lightbend.akka" %% "akka-gdpr" % "1.1.16"
- Gradle
-
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html dependencies { compile group: 'com.lightbend.akka', name: 'akka-gdpr_2.11', version: '1.1.16' }
- Maven
-
<!-- Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html --> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-gdpr_2.11</artifactId> <version>1.1.16</version> </dependency>
Before you can access this library, you’ll need to configure the Lightbend repository and credentials in your build.
How the module works
Once you have implemented the GdprEncryption
extension, for example, by extending AbstractGdprEncryption
and implementing the KeyManagement
, the GdprSerializer
class in akka-gdpr
handles the encryption for you. The GdprSerializer
is automatically bound for serialization of WithDataSubjectId
classes:
akka.actor {
serializers {
gdpr-serializer = "akka.persistence.gdpr.GdprSerializer"
}
serialization-bindings {
"akka.persistence.gdpr.WithDataSubjectId" = gdpr-serializer
}
}
The GdprSerializer
will do the following:
-
Each of your events is first serialized via Akka’s serialization extension.
-
Then the payload is encrypted with AES in GCM mode with no padding. Initialization vectors are created for each new payload using a
SecureRandom
and are the same length as the chosen key. -
The Initialization vector is then stored at the start of the encrypted payload.
-
The payload is then added to a protobuf message:
option java_package = "akka.persistence.gdpr.serialization";
option optimize_for = SPEED;
message WithDataSubjectId {
required string dataSubjectId = 1;
optional bytes payload = 2;
optional int32 serializerId = 3;
optional string manifest = 4;
}
We provide these details so that you can read the events and validate the cryptographic approach.
General steps
General steps to use the akka-gdpr
module include:
-
Add a dependency to your build as described in Adding the akka-gdpr module dependency.
-
For encryption, choose from the following:
- The
GdprEncryption
extension, which allows you to have full control over encryption algorithms by defining three methods that you can implement. See GdprEncryption for more details. - Extend the
AbstractGdprEncryption
class, which implements encryption and decryption for you, but requires you to use theKeyManagement
interface to plug in the logic for creating and retrieving keys. See AbstractGdprEncryption for more details. - The
JavaKeyStoreGdprEncryption
class implementsGdprEncryption
with support for PKCS12 and JCEKS keystores. See JavaKeyStoreGdprEncryption (This is only appropriate for single node applications.)
- The
-
Define the
akka.persistence.gdpr.encryption-provider
setting to point to a configuration block for your encryption implementation, as described in Pointing to your encryption implementation. -
Use the
WithDataSubjectId
class by wrapping data, or inside of events. For events that contain multiple data subjects, you will need to implement your own serializer, as described in Events with multiple subjects. -
When there is a request to forget a data subject you should call the
shred
method of theGdprEncryption
extension:
- Scala
-
import akka.persistence.gdpr.scaladsl.GdprEncryption GdprEncryption(system).shred(dataSubjectId)
- Java
-
akka.persistence.gdpr.javadsl.GdprEncryption.get(system).shred(dataSubjectId);
For unit testing during development, you can use TestGdprEncryption
, which holds generated keys in memory and can be enabled with configuration. See TestGdprEncryption for more details.
See also:
Wrapping data in WithDataSubjectId
The GDPR for Akka Persistence module provides a WithDataSubjectId
class. Wrap events that require encryption in this class. When WithDataSubjectId
is serialized, the payload
(the actual event) is encrypted with the key that is identified by the dataSubjectId
in the WithDataSubjectId
. To shred the data, as described in The right to be forgotten, remove the encryption key to make the encrypted payload unreadable.
When the encrypted payload of WithDataSubjectId
is deserialized and the encryption key doesn’t exist any more, the payload
is represented as None
getPayload()
is represented as Optional.empty()
. You need to take this value into account when replaying or processing the events.
If you’re using Tagged
events then the Tagged
class should remain on the outside e.g.
Tagged(WithDataSubjectId(dataSubjectId, payload), tags)
Events with multiple subjects
As described above, events wrapped in WithDataSubjectId
are automatically serialized with encryption. When parts of the data in an event — or more typically in snapshots — belong to different subjects you have to implement that in your serializer. Each WithDataSubjectId
part must be represented as bytes in the stored representation and you should use the WithDataSubjectIdSerialization
utility to serialize each WithDataSubjectId
to/from the bytes.
Here is an example of a serializer for a snapshot that contains two separate WithDataSubjectId
parts.
The protobuf definition of the snapshot:
message Snapshot {
required bytes part1 = 1;
required bytes part2 = 2;
required int32 other = 3;
}
The snapshot class:
- Scala
-
final case class Snapshot( part1: WithDataSubjectId[String], part2: WithDataSubjectId[String], other: Int)
- Java
-
public static class Snapshot { public final WithDataSubjectId<String> part1; public final WithDataSubjectId<String> part2; public final int other; public Snapshot(WithDataSubjectId<String> part1, WithDataSubjectId<String> part2, int other) { this.part1 = part1; this.part2 = part2; this.other = other; } }
The serializer of the Snapshot
:
- Scala
-
import akka.persistence.gdpr.WithDataSubjectId import akka.serialization.AsyncSerializerWithStringManifest import akka.actor.ExtendedActorSystem import akka.persistence.gdpr.serialization.TestMessages // generated from proto import java.io.NotSerializableException import com.google.protobuf.ByteString class SnapshotSerializer(val system: ExtendedActorSystem) extends AsyncSerializerWithStringManifest(system) { import system.dispatcher private val withDataSubjectIdSerialization = new WithDataSubjectIdSerialization(system) private val SnapshotManifest = "S" override def identifier: Int = 999 override def manifest(obj: AnyRef): String = obj match { case _: Snapshot => SnapshotManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } override def toBinaryAsync(obj: AnyRef): Future[Array[Byte]] = { obj match { case snap: Snapshot => for { part1Bytes <- withDataSubjectIdSerialization.toBinaryAsync(snap.part1) part2Bytes <- withDataSubjectIdSerialization.toBinaryAsync(snap.part2) } yield { TestMessages.Snapshot.newBuilder() .setPart1(ByteString.copyFrom(part1Bytes)) .setPart2(ByteString.copyFrom(part2Bytes)) .setOther(17) .build().toByteArray } case _ => Future.failed(new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")) } } override def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] = { manifest match { case SnapshotManifest => val snap = TestMessages.Snapshot.parseFrom(bytes) for { part1 <- withDataSubjectIdSerialization.fromBinaryAsync[String](snap.getPart1.toByteArray) part2 <- withDataSubjectIdSerialization.fromBinaryAsync[String](snap.getPart2.toByteArray) } yield { Snapshot(part1, part2, snap.getOther) } case _ => Future.failed(new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")) } } }
- Java
-
import akka.persistence.gdpr.WithDataSubjectId; import akka.serialization.AsyncSerializerWithStringManifestCS; import akka.actor.ExtendedActorSystem; import akka.persistence.gdpr.serialization.TestMessages; // generated from proto import java.io.NotSerializableException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException; public static class SnapshotSerializer extends AsyncSerializerWithStringManifestCS { private static final String SNAPSHOT_MANIFEST = "S"; private final ExtendedActorSystem system; private final WithDataSubjectIdSerialization withDataSubjectIdSerialization; public SnapshotSerializer(ExtendedActorSystem system) { super(system); this.system = system; this.withDataSubjectIdSerialization = new WithDataSubjectIdSerialization(system); } @Override public int identifier() { return 999; } @Override public String manifest(Object obj) { if (obj instanceof Snapshot) return SNAPSHOT_MANIFEST; else throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass() + " in [" + getClass().getName() + "]"); } @Override public CompletionStage<byte[]> toBinaryAsyncCS(Object obj) { if (obj instanceof Snapshot) { Snapshot snap = (Snapshot) obj; return withDataSubjectIdSerialization.toBinaryAsync(snap.part1).thenCompose(part1Bytes -> { return withDataSubjectIdSerialization.toBinaryAsync(snap.part2).thenApply(part2Bytes -> { return TestMessages.Snapshot.newBuilder() .setPart1(ByteString.copyFrom(part1Bytes)) .setPart2(ByteString.copyFrom(part2Bytes)) .setOther(17) .build().toByteArray(); }); }); } else { CompletableFuture<byte[]> result = new CompletableFuture<>(); result.completeExceptionally(new IllegalArgumentException( "Can't serialize object of type " + obj.getClass() + " in [" + getClass().getName() + "]")); return result; } } @Override public CompletionStage<Object> fromBinaryAsyncCS(byte[] bytes, String manifest) { if (SNAPSHOT_MANIFEST.equals(manifest)) { try { TestMessages.Snapshot snap = TestMessages.Snapshot.parseFrom(bytes); return withDataSubjectIdSerialization.fromBinaryAsync(String.class, snap.getPart1().toByteArray()) .thenCompose(part1 -> { return withDataSubjectIdSerialization.fromBinaryAsync(String.class, snap.getPart2().toByteArray()) .thenApply(part2 -> { return new Snapshot(part1, part2, snap.getOther()); }); }); } catch (InvalidProtocolBufferException e) { CompletableFuture<Object> result = new CompletableFuture<>(); result.completeExceptionally(e); return result; } } else { CompletableFuture<Object> result = new CompletableFuture<>(); result.completeExceptionally(new NotSerializableException( "Unimplemented deserialization of message with manifest [" + manifest + "] in [" + getClass().getName() + "]")); return result; } } }
Note that the serializer should extend AsyncSerializerWithStringManifestCS
AsyncSerializerWithStringManifest
and implement methods toBinaryAsync
and fromBinaryAsync
that return CompletionStage
Future
. The reason for using the asynchronous API instead of the ordinary SerializerWithStringManifest
is that encryption typically involves file or network IO; blocking the serialization thread while waiting for such things to complete should be avoided. The encryption calls are managed by GDPR for Akka Persistence and are running on a separate dispatcher dedicated for blocking tasks to avoid starvation of other parts of the system. To compose those asyncronous encryption tasks the serializer must also be asynchronous and chain the calls using thenApply
map
and thenCompose
flatMap
operations of the CompletionStage
Future
. In the above example for
comprehension is used for doing that.