Query Plugin
Event sourced queries
R2dbcReadJournal
R2dbcReadJournal
implements the following Persistence Queries:
eventsByPersistenceId
,currentEventsByPersistenceId
eventsBySlices
,currentEventsBySlices
currentPersistenceIds
Accessing the R2dbcReadJournal
:
- Java
-
source
import akka.persistence.query.PersistenceQuery; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; R2dbcReadJournal eventQueries = PersistenceQuery.get(system) .getReadJournalFor(R2dbcReadJournal.class, R2dbcReadJournal.Identifier());
- Scala
-
source
import akka.persistence.query.PersistenceQuery import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal val eventQueries = PersistenceQuery(system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
eventsByPersistenceId
The eventsByPersistenceId
and currentEventsByPersistenceId
queries are useful for retrieving events for a single entity with a given persistence id.
Example of currentEventsByPersistenceId
:
- Java
-
source
PersistenceId persistenceId = PersistenceId.of("MyEntity", "id1"); eventQueries .currentEventsByPersistenceId(persistenceId.id(), 1, 101) .map(envelope -> "event with seqNr " + envelope.sequenceNr() + ": " + envelope.event()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
val persistenceId = PersistenceId("MyEntity", "id1") eventQueries .currentEventsByPersistenceId(persistenceId.id, 1, 101) .map(envelope => s"event with seqNr ${envelope.sequenceNr}: ${envelope.event}") .runWith(Sink.foreach(println))
eventsByPersistenceIdStartingFromSnapshot
Same as eventsByPersistenceId
but with the purpose to use snapshot as starting point and thereby reducing number of events that have to be loaded.
First it tries to load the snapshot, if any, with sequence number within the given fromSequenceNr
and toSequenceNr
(inclusive) range. There is at most one snapshot per persistenceId. The snapshot is transformed to an event with the given transformSnapshot
function.
After emitting the snapshot event the ordinary events with sequence numbers after the snapshots are emitted.
To use eventsByPersistenceIdStartingFromSnapshot
or currentEventsByPersistenceIdStartingFromSnapshot
you must follow instructions in migration guide and enable configuration:
akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
eventsBySlices
The eventsBySlices
and currentEventsBySlices
queries are useful for retrieving all events for a given entity type. eventsBySlices
should be used via Akka Projection.
This has historically been done with eventsByTag
but the R2DBC plugin is instead providing eventsBySlices
as an improved solution.
The usage of eventsByTag
for Projections has the drawback that the number of tags must be decided up-front and can’t easily be changed afterwards. Starting with too many tags means much overhead since many projection instances would be running on each node in a small Akka Cluster. Each projection instance polling the database periodically. Starting with too few tags means that it can’t be scaled later to more Akka nodes.
With eventsBySlices
more Projection instances can be added when needed and still reuse the offsets for the previous slice distributions.
A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all persistence ids over the slices. The eventsBySlices
query is for a range of the slices. For example if using 1024 slices and running 4 Projection instances the slice ranges would be 0-255, 256-511, 512-767, 768-1023. Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 256-383, …, 768-895, 896-1023.
Example of currentEventsBySlices
:
- Java
-
source
import akka.stream.javadsl.Source; import akka.persistence.query.typed.EventEnvelope; // Split the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; Source<EventEnvelope<MyEvent>, NotUsed> source = eventQueries.currentEventsBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance()); source .map( envelope -> "event from persistenceId " + envelope.persistenceId() + " with seqNr " + envelope.sequenceNr() + ": " + envelope.event()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
import akka.persistence.query.typed.EventEnvelope // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = eventQueries.sliceRanges(numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "MyEntity" eventQueries .currentEventsBySlices[MyEvent](entityType, minSlice, maxSlice, NoOffset.getInstance) .map(envelope => s"event from persistenceId ${envelope.persistenceId} with " + s"seqNr ${envelope.sequenceNr}: ${envelope.event}") .runWith(Sink.foreach(println))
eventsBySlices
should be used via R2dbcProjection, which will automatically handle the following difficulties. When using R2dbcProjection
the events will be delivered in sequence number order without duplicates.
The consumer can keep track of its current position in the event stream by storing the offset
and restart the query from a given offset
after a crash/restart.
The offset is a TimestampOffset
and it is based on the database CURRENT_TIMESTAMP
when the event was stored. CURRENT_TIMESTAMP
is the time when the transaction started, not when it was committed. This means that a “later” event may be visible first and when retrieving events after the previously seen timestamp we may miss some events and emit event with a later sequence number for a persistence id without emitting all preceding events. In distributed SQL databases there can also be clock skews for the database timestamps. For that reason eventsBySlices
will perform additional backtracking queries to catch missed events. Events from backtracking will typically be duplicates of previously emitted events. It’s the responsibility of the consumer to filter duplicates and make sure that events are processed in exact sequence number order for each persistence id. Such deduplication is provided by the R2DBC Projection.
Events emitted by the backtracking don’t contain the event payload (EventBySliceEnvelope.event
is None) and the consumer can load the full EventBySliceEnvelope
with R2dbcReadJournal.loadEnvelope.
The events will be emitted in the timestamp order with the caveat of duplicate events as described above. Events with the same timestamp are ordered by sequence number.
currentEventsBySlices
doesn’t perform these backtracking queries and will not emit duplicates and the event payload is always full loaded.
eventsBySlicesStartingFromSnapshots
Same as eventsBySlices
but with the purpose to use snapshots as starting points and thereby reducing number of events that have to be loaded. This can be useful if the consumer start from zero without any previously processed offset or if it has been disconnected for a long while and its offset is far behind.
First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one snapshot per persistenceId. The snapshots are transformed to events with the given transformSnapshot
function.
After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted.
To use eventsBySlicesStartingFromSnapshots
or currentEventsBySlicesStartingFromSnapshots
you must follow instructions in migration guide and enable configuration:
akka.persistence.r2dbc.query.start-from-snapshot.enabled = true
Publish events for lower latency of eventsBySlices
The eventsBySlices
query polls the database periodically to find new events. By default, this interval is a few seconds, see akka.persistence.r2dbc.query.refresh-interval
in the Configuration. This interval can be reduced for lower latency, with the drawback of querying the database more frequently.
To reduce the latency there is a feature that will publish the events within the Akka Cluster. Running eventsBySlices
will subscribe to the events and emit them directly without waiting for next query poll. The tradeoff is that more CPU and network resources are used. The events must still be retrieved from the database, but at a lower polling frequency, because delivery of published messages are not guaranteed.
This feature is enabled by default and it will measure the throughput and automatically disable/enable if the exponentially weighted moving average of measured throughput exceeds the configured threshold.
akka.persistence.r2dbc.journal.publish-events-dynamic.throughput-threshold = 400
Disable publishing of events with configuration:
akka.persistence.r2dbc.journal.publish-events = off
If you use many queries or Projection instances you should consider adjusting the akka.persistence.r2dbc.journal.publish-events-number-of-topics
configuration, see Configuration.
Durable state queries
R2dbcDurableStateStore
R2dbcDurableStateStore
implements the following Persistence Queries:
getObject
changesBySlices
,currentChangesBySlices
currentPersistenceIds
Accessing the R2dbcDurableStateStore
:
- Java
-
source
import akka.persistence.r2dbc.state.javadsl.R2dbcDurableStateStore; import akka.persistence.state.DurableStateStoreRegistry; R2dbcDurableStateStore<MyState> stateQueries = DurableStateStoreRegistry.get(system) .getDurableStateStoreFor( R2dbcDurableStateStore.class, R2dbcDurableStateStore.Identifier());
- Scala
-
source
import akka.persistence.state.DurableStateStoreRegistry import akka.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore val stateQueries = DurableStateStoreRegistry(system) .durableStateStoreFor[R2dbcDurableStateStore[MyState]](R2dbcDurableStateStore.Identifier)
changesBySlices
The changesBySlices
and currentChangesBySlices
queries are useful for retrieving updates of the latest durable state for a given entity type.
Example of currentChangesBySlices
:
- Java
-
source
import akka.persistence.query.DurableStateChange; import akka.persistence.query.UpdatedDurableState; // Split the slices into 4 ranges int numberOfSliceRanges = 4; List<Pair<Integer, Integer>> sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges); // Example of using the first slice range int minSlice = sliceRanges.get(0).first(); int maxSlice = sliceRanges.get(0).second(); String entityType = "MyEntity"; Source<DurableStateChange<MyState>, NotUsed> source = stateQueries.currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance()); source .collectType(UpdatedDurableState.class) .map( change -> "state change from persistenceId " + change.persistenceId() + " with revision " + change.revision() + ": " + change.value()) .runWith(Sink.foreach(System.out::println), system);
- Scala
-
source
import akka.persistence.query.UpdatedDurableState // Slit the slices into 4 ranges val numberOfSliceRanges: Int = 4 val sliceRanges = stateQueries.sliceRanges(numberOfSliceRanges) // Example of using the first slice range val minSlice: Int = sliceRanges.head.min val maxSlice: Int = sliceRanges.head.max val entityType: String = "MyEntity" stateQueries .currentChangesBySlices(entityType, minSlice, maxSlice, NoOffset.getInstance) .collect { case change: UpdatedDurableState[MyState] => change } .map(change => s"state change from persistenceId ${change.persistenceId} with " + s"revision ${change.revision}: ${change.value}") .runWith(Sink.foreach(println))
The emitted DurableStateChange
can be a UpdatedDurableState
or DeletedDurableState
.
It will emit an UpdatedDurableState
when the durable state is updated. When the state is updated again another UpdatedDurableState
is emitted. It will always emit an UpdatedDurableState
for the latest revision of the state, but there is no guarantee that all intermediate changes are emitted if the state is updated several times. Note that UpdatedDurableState
contains the full current state, and it is not a delta from previous revision of state.
It will emit an DeletedDurableState
when the durable state is deleted. When the state is updated again a new UpdatedDurableState
is emitted. There is no guarantee that all intermediate changes are emitted if the state is updated or deleted several times.
changesBySlices
should be used via R2dbcProjection, which will automatically handle the similar difficulties with duplicates as described for eventsBySlices. When using R2dbcProjection
the changes will be delivered in revision number order without duplicates.
Configuration
Query configuration is under akka.persistence.r2dbc.query
. Here’s the default configuration values for the query plugin:
sourceakka.persistence.r2dbc {
query {
class = "akka.persistence.r2dbc.query.R2dbcReadJournalProvider"
# When live queries return no results or <= 10% of buffer-size, the next query
# to db will be delayed for this duration.
# When the number of rows from previous query is >= 90% of buffer-size, the next
# query will be emitted immediately.
# Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
# for half of this duration.
refresh-interval = 3s
# Live queries read events up to this duration from the current database time.
behind-current-time = 100 millis
backtracking {
enabled = on
# Backtracking queries will look back for this amount of time. It should
# not be larger than the akka.projection.r2dbc.offset-store.time-window.
window = 2 minutes
# Backtracking queries read events up to this duration from the current database time.
behind-current-time = 10 seconds
}
# In-memory buffer holding events when reading from database.
buffer-size = 1000
persistence-ids {
buffer-size = 1000
}
# When journal publish-events is enabled a best effort deduplication can be enabled by setting
# this property to the size of the deduplication buffer in the `eventsBySlices` query.
# It keeps track of this number of entries and 5000 is recommended capacity. The drawback
# of enabling this is that when the sequence numbers received via publish-events are out of sync
# after some error scenarios it will take longer to receive those events, since it will rely on
# the backtracking queries.
deduplicate-capacity = 0
# Settings for eventsBySlicesStartingFromSnapshots and currentEventsBySlicesStartingFromSnapshots
start-from-snapshot {
# Set this to on true if eventsBySlicesStartingFromSnapshots or
# currentEventsBySlicesStartingFromSnapshots are used. That has a small overhead when storing
# snapshots because the timestamp and tags of the corresponding event is retrieved when storing
# a snapshot.
enabled = false
}
}
}
The query plugin shares the connection pool as the rest of the plugin, see Connection configuration.