Query Plugin

Event sourced queries

DynamoDBReadJournalDynamoDBReadJournal implements the following Persistence Queries:

  • eventsBySlices, currentEventsBySlices

Accessing the DynamoDBReadJournal:

Java
sourceimport akka.persistence.dynamodb.query.javadsl.DynamoDBReadJournal;
import akka.persistence.query.PersistenceQuery;

DynamoDBReadJournal eventQueries =
    PersistenceQuery.get(system)
        .getReadJournalFor(DynamoDBReadJournal.class, DynamoDBReadJournal.Identifier());
Scala
sourceimport akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.query.PersistenceQuery

val eventQueries = PersistenceQuery(system)
  .readJournalFor[DynamoDBReadJournal](DynamoDBReadJournal.Identifier)

eventsBySlices

The eventsBySlices and currentEventsBySlices queries are useful for retrieving all events for a given entity type. eventsBySlices should be used via Akka Projection.

Note

This has historically been done with eventsByTag but the DynamoDB plugin is instead providing eventsBySlices as an improved solution.

The usage of eventsByTag for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means a lot of overhead, since many projection instances would be running on each node in a small Akka Cluster, with each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Akka nodes.

With eventsBySlices more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.

A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.

Example of currentEventsBySlices:

Java
sourceimport akka.NotUsed;
import akka.japi.Pair;
import akka.persistence.query.NoOffset;
import akka.persistence.query.typed.EventEnvelope;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

// Split the slices into 4 ranges
int numberOfSliceRanges = 4;
List<Pair<Integer, Integer>> sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges);

// Example of using the first slice range
int minSlice = sliceRanges.get(0).first();
int maxSlice = sliceRanges.get(0).second();
String entityType = "MyEntity";
Source<EventEnvelope<MyEvent>, NotUsed> source =
    eventQueries.currentEventsBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance());
source
    .map(
        envelope ->
            "event from persistenceId "
                + envelope.persistenceId()
                + " with seqNr "
                + envelope.sequenceNr()
                + ": "
                + envelope.event())
    .runWith(Sink.foreach(System.out::println), system);
Scala
sourceimport akka.persistence.query.NoOffset
import akka.stream.scaladsl.Sink

// Split the slices into 4 ranges
val numberOfSliceRanges: Int = 4
val sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges)

// Example of using the first slice range
val minSlice: Int = sliceRanges.head.min
val maxSlice: Int = sliceRanges.head.max
val entityType: String = "MyEntity"
eventQueries
  .currentEventsBySlices[MyEvent](entityType, minSlice, maxSlice, NoOffset)
  .map(envelope =>
    s"event from persistenceId ${envelope.persistenceId} with " +
    s"seqNr ${envelope.sequenceNr}: ${envelope.event}")
  .runWith(Sink.foreach(println))

eventsBySlices should be used via DynamoDBProjection, which will automatically handle some challenges around ordering and missed events.

When using DynamoDBProjection the events will be delivered in sequence number order without duplicates.

The consumer can keep track of its current position in the event stream by storing the offset and restarting the query from a given offset after a crash/restart.

The offset is a TimestampOffset, based on when the event was stored. This means that a “later” event may be visible first. When retrieving events after the previously seen timestamp we may miss some events and emit an event with a later sequence number for a persistence id, without emitting all preceding events. eventsBySlices will perform additional backtracking queries to catch missed events. Events from backtracking will typically be duplicates of previously emitted events. It’s the responsibility of the consumer to filter duplicates and make sure that events are processed in exact sequence number order for each persistence id, and such de-duplication is provided by the DynamoDB Projection.

Events emitted by backtracking don’t contain the event payload (EventBySliceEnvelope.event is None) and the consumer can load the full EventBySliceEnvelope with DynamoDBReadJournal.loadEnvelopeDynamoDBReadJournal.loadEnvelope.

Events will be emitted in timestamp order with the caveat of duplicate events as described above. Events with the same timestamp are ordered by sequence number.

currentEventsBySlices doesn’t perform backtracking queries, will not emit duplicates, and the event payload is always fully loaded.

Note

The journal table must be created with a global secondary index to index events by slice.

eventsBySlicesStartingFromSnapshots

The eventsBySlicesStartingFromSnapshots and currentEventsBySlicesStartingFromSnapshots queries are like the eventsBySlices queries, but use snapshots as starting points to reduce the number of events that need to be loaded. This can be useful if a consumer starts from zero without any previously processed offsets, or if it has been disconnected for a long while and its offsets are far behind.

These queries first load all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one snapshot per persistenceId. The snapshots are transformed into events with the given transformSnapshot function. After emitting the snapshot events, the ordinary events with sequence numbers after the snapshots are emitted.

To use the start-from-snapshot queries you must also enable configuration:

akka.persistence.dynamodb.query.start-from-snapshot.enabled = true
Note

The snapshot table must be created with a global secondary index to index snapshots by slice.

Publish events for lower latency of eventsBySlices

The eventsBySlices query polls the database periodically to find new events. By default, this interval is a few seconds, see akka.persistence.dynamodb.query.refresh-interval in the Configuration. This interval can be reduced for lower latency, with the drawback of querying the database more frequently.

To reduce the latency there is a feature that will publish the events within the Akka Cluster. Running eventsBySlices will subscribe to the events and emit them directly without waiting for the next query poll. The trade-off is that more CPU and network resources are used. The events must still be retrieved from the database, but at a lower polling frequency, because delivery of published messages are not guaranteed.

This feature is enabled by default and it will measure the throughput and automatically disable/enable if the exponentially weighted moving average of measured throughput exceeds the configured threshold.

akka.persistence.dynamodb.journal.publish-events-dynamic.throughput-threshold = 400

Disable publishing of events with configuration:

akka.persistence.dynamodb.journal.publish-events = off

If you use many queries or Projection instances you should consider adjusting the akka.persistence.dynamodb.journal.publish-events-number-of-topics configuration, see Configuration.

Configuration

Query configuration is under akka.persistence.dynamodb.query.

The query plugin shares the DynamoDB client configuration with other plugins.

Reference configuration

The following can be overridden in your application.conf for query specific settings:

sourceakka.persistence.dynamodb {
  query {
    class = "akka.persistence.dynamodb.query.DynamoDBReadJournalProvider"

    # When live queries return no results or <= 10% of buffer-size, the next query
    # to db will be delayed for this duration.
    # When the number of rows from previous query is >= 90% of buffer-size, the next
    # query will be emitted immediately.
    # Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
    # for half of this duration.
    refresh-interval = 3s

    # Live queries read events up to this duration from the current time.
    behind-current-time = 100 millis

    backtracking {
      enabled = on
      # Backtracking queries will look back for this amount of time. It should
      # not be larger than the akka.projection.dynamodb.offset-store.time-window.
      window = 2 minutes
      # Backtracking queries read events up to this duration from the current time.
      behind-current-time = 10 seconds
    }

    # In-memory buffer holding events when reading from DynamoDB.
    buffer-size = 100

    # When journal publish-events is enabled a best effort deduplication can be enabled by setting
    # this property to the size of the deduplication buffer in the `eventsBySlices` query.
    # It keeps track of this number of entries and 5000 is recommended capacity. The drawback
    # of enabling this is that when the sequence numbers received via publish-events are out of sync
    # after some error scenarios it will take longer to receive those events, since it will rely on
    # the backtracking queries.
    deduplicate-capacity = 0

    # Settings for `eventsBySlicesStartingFromSnapshots` and `currentEventsBySlicesStartingFromSnapshots`.
    start-from-snapshot {
      # Enable if `eventsBySlicesStartingFromSnapshots` or `currentEventsBySlicesStartingFromSnapshots` are used.
      # This adds some small overhead when storing snapshots because the timestamp and tags of the corresponding event
      # are retrieved when storing each snapshot.
      enabled = false
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.