This example is using the Cassandra plugin for Akka Persistence, but same code can be used for other Akka Persistence plugins by replacing the CassandraReadJournal.Identifier. For example the JDBC plugin can be used. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior.
This source is consuming all events from the ShoppingCartEventSourcedBehavior that are tagged with "cart-1".
The EventEnvelope[ShoppingCart.Event]EventEnvelope<ShoppingCart.Event> is what the Projection handler will process. It contains the Event and additional meta data, such as the offset that will be stored by the Projection. See EventEnvelopeEventEnvelope for full details of what the envelope contains.
sourceimport akka.persistence.query.typed.EventEnvelopeimport akka.persistence.query.Offsetimport akka.projection.eventsourced.scaladsl.EventSourcedProviderimport akka.projection.scaladsl.SourceProvider// Slit the slices into 4 ranges
val numberOfSliceRanges:Int=4
val sliceRanges =EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier, numberOfSliceRanges)// Example of using the first slice range
val minSlice:Int= sliceRanges.head.min
val maxSlice:Int= sliceRanges.head.max
val entityType:String="ShoppingCart"
val sourceProvider:SourceProvider[Offset,EventEnvelope[ShoppingCart.Event]]=EventSourcedProvider.eventsBySlices[ShoppingCart.Event](
system,
readJournalPluginId = R2dbcReadJournal.Identifier,
entityType,
minSlice,
maxSlice)
sourceimport akka.japi.Pair;import akka.persistence.query.Offset;import akka.persistence.query.typed.EventEnvelope;import akka.projection.eventsourced.javadsl.EventSourcedProvider;import akka.projection.javadsl.SourceProvider;// Slit the slices into 4 rangesint numberOfSliceRanges =4;List<Pair<Integer,Integer>> sliceRanges =EventSourcedProvider.sliceRanges(
system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);// Example of using the first slice rangeint minSlice = sliceRanges.get(0).first();int maxSlice = sliceRanges.get(0).second();String entityType ="MyEntity";SourceProvider<Offset,EventEnvelope<ShoppingCart.Event>> sourceProvider =EventSourcedProvider.eventsBySlices(
system, R2dbcReadJournal.Identifier(), entityType, minSlice, maxSlice);
This example is using the R2DBC plugin for Akka Persistence. You will use the same plugin as you have configured for the write side that is used by the EventSourcedBehavior.
This source is consuming all events from the ShoppingCartEventSourcedBehavior for the given slice range. In a production application, you would need to start as many instances as the number of slice ranges. That way you consume the events from all entities.
The EventEnvelope[ShoppingCart.Event]EventEnvelope<ShoppingCart.Event> is what the Projection handler will process. It contains the Event and additional meta data, such as the offset that will be stored by the Projection. See EventEnvelopeEventEnvelope for full details of what the envelope contains.
SourceProvider for eventsBySlicesStartingFromSnapshots
The Projection can use snapshots as starting points and thereby reducing number of events that have to be loaded. This can be useful if the consumer start from zero without any previously processed offset or if it has been disconnected for a long while and its offset is far behind.
You need to define the snapshot to event transformation function in EventSourcedProvider.eventsBySlicesStartingFromSnapshots.
EventsBySliceFirehoseQueryEventsBySliceFirehoseQuery can give better scalability when many consumers retrieve the same events, for example many Projections of the same entity type. The purpose is to share the stream of events from the database and fan out to connected consumer streams. Thereby fewer queries and loading of events from the database.
EventsBySliceFirehoseQuery is used in place of EventsBySliceQuery with the EventSourcedProvider.