Database Cleanup

Event Sourced cleanup tool

If possible, it is best to keep all events in an event sourced system. That way new Akka Projection R2DBC can be re-built. A Projection can also start or continue from a snapshot, and then events can be deleted before the snapshot.

In some cases keeping all events is not possible or must be removed for regulatory reasons, such as compliance with GDPR. EventSourcedBehaviors can automatically snapshot state and delete events as described in the Akka docs. Snapshotting is useful even if events aren’t deleted as it speeds up recovery.

Deleting all events immediately when an entity has reached its terminal deleted state would have the consequence that Projections might not have consumed all previous events yet and will not be notified of the deleted event. Instead, it’s recommended to emit a final deleted event and store the fact that the entity is deleted separately via a Projection. Then a background task can clean up the events and snapshots for the deleted entities by using the EventSourcedCleanupEventSourcedCleanup tool. The entity itself knows about the terminal state from the deleted event and should not emit further events after that and typically stop itself if it receives more commands.

EventSourcedCleanupEventSourcedCleanup operations include:

  • Delete all events and snapshots for one or many persistence ids
  • Delete all events for one or many persistence ids
  • Delete all snapshots for one or many persistence ids
  • Delete events before snapshot for one or many persistence ids
  • Delete events before a timestamp
Warning

When running an operation with EventSourcedCleanup that deletes all events for a persistence id, the actor with that persistence id must not be running! If the actor is restarted it would in that case be recovered to the wrong state since the stored events have been deleted. Delete events before snapshot can still be used while the actor is running.

The cleanup tool can be combined with the query plugin which has a query to get all persistence ids.

Java
sourceCurrentPersistenceIdsQuery queries = PersistenceQuery.get(system)
    .getReadJournalFor(CurrentPersistenceIdsQuery.class, R2dbcReadJournal.Identifier());
EventSourcedCleanup cleanup = new EventSourcedCleanup(system);

int persistenceIdParallelism = 10;


// forall persistence ids, delete all events before the snapshot
queries
    .currentPersistenceIds()
    .mapAsync(persistenceIdParallelism, pid ->
        FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid)))
    .run(system);
Scala
sourceimport akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery
import akka.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal

val queries = PersistenceQuery(system).readJournalFor[CurrentPersistenceIdsQuery](R2dbcReadJournal.Identifier)
val cleanup = new EventSourcedCleanup(system)

//  how many persistence ids to operate on in parallel
val persistenceIdParallelism = 10

// forall persistence ids, delete all events before the snapshot
queries
  .currentPersistenceIds()
  .mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid))
  .run()

Durable State cleanup tool

DurableStateCleanupDurableStateCleanup operations include:

  • Delete state for one or many persistence ids
Warning

When running any operation with DurableStateCleanup for a persistence id, the actor with that persistence id must not be running! If the actor is restarted it would in that case be recovered to the wrong state since the stored state hase been deleted.

The cleanup tool can be combined with the query plugin which has a query to get all persistence ids.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.