Using akka-gdpr

The akka-gdpr module contains utilities to help you manage data shredding, including:

  • A GdprEncryption extension point that you can use to plug your own encryption. Alternatively, you can use the AbstractGdprEncryption class that provides encryption but requires you to implement key management.
  • A serializer that encrypts events and adds a data subject id, when you wrap events with WithDataSubjectId. You must provide your own serializer for events that reference multiple data subjects. If you are using Jackson or Play JSON for serialization, the akka-gdpr module allows you to use them for encryption.

Dependency

To use the GDPR for Akka Persistence feature, add a dependency for the akka-gdpr artifact:

sbt
// Add Reactive Platform to your build as documented at https://developer.lightbend.com/docs/reactive-platform/2.0/setup/setup-sbt.html
"com.lightbend.akka" %% "akka-gdpr" % "1.1.10"
Gradle
// Add Reactive Platform to your build as documented at https://developer.lightbend.com/docs/reactive-platform/2.0/setup/setup-gradle.html
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-gdpr_2.11', version: '1.1.10'
}
Maven
<!-- Add Reactive Platform to your build as documented at https://developer.lightbend.com/docs/reactive-platform/2.0/setup/setup-maven.html -->
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-gdpr_2.11</artifactId>
  <version>1.1.10</version>
</dependency>

You can obtain your credentials at https://www.lightbend.com/product/lightbend-reactive-platform/credentials including links to information on how to add the credentials to your build.

How the module works

Once you have implemented the GdprEncryption extension, for example, by extending AbstractGdprEncryption and implementing the KeyManagement, the GdprSerializer class in akka-gdpr handles the encryption for you. The GdprSerializer is automatically bound for serialization of WithDataSubjectId classes:

akka.actor {
  serializers {
    gdpr-serializer = "akka.persistence.gdpr.GdprSerializer"
  }

  serialization-bindings {
    "akka.persistence.gdpr.WithDataSubjectId" = gdpr-serializer
  }
}

The GdprSerializer will do the following:

  1. Each of your events is first serialized via Akka’s serialization extension.

  2. Then the payload is encrypted with AES in GCM mode with no padding. Initialization vectors are created for each new payload using a SecureRandom and are the same length as the chosen key.

  3. The Initialization vector is then stored at the start of the encrypted payload.

  4. The payload is then added to a protobuf message:

option java_package = "akka.persistence.gdpr.serialization";
option optimize_for = SPEED;

message WithDataSubjectId {
  required string dataSubjectId = 1;
  optional bytes payload = 2;
  optional int32 serializerId = 3;
  optional string manifest = 4;
}

We provide these details so that you can read the events and validate the cryptographic approach.

General steps

General steps to use the akka-gdpr module include:

  1. Add a dependency to your build as described in Adding the akka-gdpr module dependency.

  2. For encryption, choose from the following:

    • The GdprEncryption extension, which allows you to have full control over encryption algorithms by defining three methods that you can implement. See GdprEncryption for more details.
    • Extend the AbstractGdprEncryption class, which implements encryption and decryption for you, but requires you to use the KeyManagement interface to plug in the logic for creating and retrieving keys. See AbstractGdprEncryption for more details.
    • The JavaKeyStoreGdprEncryptionclass implements GdprEncryption with support for PKCS12 and JCEKS keystores. See JavaKeyStoreGdprEncryption (This is only appropriate for single node applications.)
  3. Define the akka.persistence.gdpr.encryption-provider setting to point to a configuration block for your encryption implementation, as described in Pointing to your encryption implementation.

  4. Use the WithDataSubjectId class by wrapping events, or inside of events. For events that contain multiple data subjects, you will need to implement your own serializer, as described in Events with multiple subjects.

  5. When there is a request to forget a data subject you should call the shred method of the GdprEncryption extension:

Scala
import akka.persistence.gdpr.scaladsl.GdprEncryption

GdprEncryption(system).shred(dataSubjectId)
Java
akka.persistence.gdpr.javadsl.GdprEncryption.get(system).shred(dataSubjectId);

For unit testing during development, you can use TestGdprEncryption, which holds generated keys in memory and can be enabled with configuration. See TestGdprEncryption for more details.

See also:

Wrapping data in WithDataSubjectId

The GDPR for Akka Persistence module provides a WithDataSubjectId class. Wrap events that require encryption in this class. When WithDataSubjectId is serialized, the payload (the actual event) is encrypted with the key that is identified by the dataSubjectId in the WithDataSubjectId. To shred the data, as described in The right to be forgotten, remove the encryption key to make the encrypted payload unreadable.

When the encrypted payload of WithDataSubjectId is deserialized and the encryption key doesn’t exist any more, the payload is represented as NonegetPayload() is represented as Optional.empty(). You need to take this value into account when replaying or processing the events.

If you’re using Tagged events then the Tagged class should remain on the outside e.g.

Tagged(WithDataSubjectId(dataSubjectId, payload), tags)

Events with multiple subjects

As described above, events wrapped in WithDataSubjectId are automatically serialized with encryption. When parts of the data in an event — or more typically in snapshots — belong to different subjects you have to implement that in your serializer. Each WithDataSubjectId part must be represented as bytes in the stored representation and you should use the WithDataSubjectIdSerialization utility to serialize each WithDataSubjectId to/from the bytes.

Here is an example of a serializer for a snapshot that contains two separate WithDataSubjectId parts.

The protobuf definition of the snapshot:

message Snapshot {
  required bytes part1 = 1;
  required bytes part2 = 2;
  required int32 other = 3;
}

The snapshot class:

Scala
final case class Snapshot(
  part1: WithDataSubjectId[String],
  part2: WithDataSubjectId[String],
  other: Int)
Java
public static class Snapshot {
  public final WithDataSubjectId<String> part1;
  public final WithDataSubjectId<String> part2;
  public final int other;

  public Snapshot(WithDataSubjectId<String> part1, WithDataSubjectId<String> part2, int other) {
    this.part1 = part1;
    this.part2 = part2;
    this.other = other;
  }

}

The serializer of the Snapshot:

Scala
import akka.persistence.gdpr.WithDataSubjectId
import akka.serialization.AsyncSerializerWithStringManifest
import akka.actor.ExtendedActorSystem
import akka.persistence.gdpr.serialization.TestMessages // generated from proto
import java.io.NotSerializableException
import com.google.protobuf.ByteString

class SnapshotSerializer(val system: ExtendedActorSystem) extends AsyncSerializerWithStringManifest(system) {

  import system.dispatcher
  private val withDataSubjectIdSerialization = new WithDataSubjectIdSerialization(system)

  private val SnapshotManifest = "S"

  override def identifier: Int = 999

  override def manifest(obj: AnyRef): String = obj match {
    case _: Snapshot => SnapshotManifest
    case _ =>
      throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
  }

  override def toBinaryAsync(obj: AnyRef): Future[Array[Byte]] = {
    obj match {
      case snap: Snapshot =>
        for {
          part1Bytes <- withDataSubjectIdSerialization.toBinaryAsync(snap.part1)
          part2Bytes <- withDataSubjectIdSerialization.toBinaryAsync(snap.part2)
        } yield {
          TestMessages.Snapshot.newBuilder()
            .setPart1(ByteString.copyFrom(part1Bytes))
            .setPart2(ByteString.copyFrom(part2Bytes))
            .setOther(17)
            .build().toByteArray
        }

      case _ =>
        Future.failed(new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]"))
    }
  }

  override def fromBinaryAsync(bytes: Array[Byte], manifest: String): Future[AnyRef] = {
    manifest match {
      case SnapshotManifest =>
        val snap = TestMessages.Snapshot.parseFrom(bytes)
        for {
          part1 <- withDataSubjectIdSerialization.fromBinaryAsync[String](snap.getPart1.toByteArray)
          part2 <- withDataSubjectIdSerialization.fromBinaryAsync[String](snap.getPart2.toByteArray)
        } yield {
          Snapshot(part1, part2, snap.getOther)
        }

      case _ =>
        Future.failed(new NotSerializableException(
          s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]"))
    }
  }
}
Java
import akka.persistence.gdpr.WithDataSubjectId;
import akka.serialization.AsyncSerializerWithStringManifestCS;
import akka.actor.ExtendedActorSystem;
import akka.persistence.gdpr.serialization.TestMessages; // generated from proto
import java.io.NotSerializableException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException;

public static class SnapshotSerializer extends AsyncSerializerWithStringManifestCS {
  private static final String SNAPSHOT_MANIFEST = "S";

  private final ExtendedActorSystem system;
  private final WithDataSubjectIdSerialization withDataSubjectIdSerialization;

  public SnapshotSerializer(ExtendedActorSystem system) {
    super(system);
    this.system = system;
    this.withDataSubjectIdSerialization = new WithDataSubjectIdSerialization(system);
  }

  @Override
  public int identifier() {
    return 999;
  }

  @Override
  public String manifest(Object obj) {
    if (obj instanceof Snapshot)
      return SNAPSHOT_MANIFEST;
    else
      throw new IllegalArgumentException("Can't serialize object of type " + obj.getClass() +
          " in [" + getClass().getName() + "]");
  }

  @Override
  public CompletionStage<byte[]> toBinaryAsyncCS(Object obj) {
    if (obj instanceof Snapshot) {
      Snapshot snap = (Snapshot) obj;
      return withDataSubjectIdSerialization.toBinaryAsync(snap.part1).thenCompose(part1Bytes -> {
        return withDataSubjectIdSerialization.toBinaryAsync(snap.part2).thenApply(part2Bytes -> {
          return TestMessages.Snapshot.newBuilder()
              .setPart1(ByteString.copyFrom(part1Bytes))
              .setPart2(ByteString.copyFrom(part2Bytes))
              .setOther(17)
              .build().toByteArray();
        });
      });

    } else {
      CompletableFuture<byte[]> result = new CompletableFuture<>();
      result.completeExceptionally(new IllegalArgumentException(
          "Can't serialize object of type " + obj.getClass() + " in [" + getClass().getName() + "]"));
      return result;
    }
  }

  @Override
  public CompletionStage<Object> fromBinaryAsyncCS(byte[] bytes, String manifest) {
    if (SNAPSHOT_MANIFEST.equals(manifest)) {
      try {
        TestMessages.Snapshot snap = TestMessages.Snapshot.parseFrom(bytes);
        return withDataSubjectIdSerialization.fromBinaryAsync(String.class, snap.getPart1().toByteArray())
            .thenCompose(part1 -> {
              return withDataSubjectIdSerialization.fromBinaryAsync(String.class, snap.getPart2().toByteArray())
                  .thenApply(part2 -> {
                    return new Snapshot(part1, part2, snap.getOther());
                  });
            });
      } catch (InvalidProtocolBufferException e) {
        CompletableFuture<Object> result = new CompletableFuture<>();
        result.completeExceptionally(e);
        return result;
      }
    } else {
      CompletableFuture<Object> result = new CompletableFuture<>();
      result.completeExceptionally(new NotSerializableException(
          "Unimplemented deserialization of message with manifest [" + manifest + "] in [" +
              getClass().getName() + "]"));
      return result;
    }
  }
}

Note that the serializer should extend AsyncSerializerWithStringManifestCSAsyncSerializerWithStringManifest and implement methods toBinaryAsync and fromBinaryAsync that return CompletionStageFuture. The reason for using the asynchronous API instead of the ordinary SerializerWithStringManifest is that encryption typically involves file or network IO; blocking the serialization thread while waiting for such things to complete should be avoided. The encryption calls are managed by GDPR for Akka Persistence and are running on a separate dispatcher dedicated for blocking tasks to avoid starvation of other parts of the system. To compose those asyncronous encryption tasks the serializer must also be asynchronous and chain the calls using thenApplymap and thenComposeflatMap operations of the CompletionStageFuture. In the above example for comprehension is used for doing that.