Choosing a Source Provider
A SourceProvider
SourceProvider
will provide the data to our projection. In Projections each element that’s processed is an Envelope
and each Envelope
contains an Event
. An Envelope
must include an Offset
, but it can also contain other information such as creation timestamp, a topic name, an entity tag, etc. There are several supported Source Provider’s available (or you can build your own), but in this example we will use the Akka Persistence EventSourced
Source Provider.
Add the following dependencies to your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-eventsourced" % "1.6.5"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-eventsourced_${scala.binary.version}</artifactId> <version>1.6.5</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-eventsourced_${versions.ScalaBinary}:1.6.5" }
Add the following imports to ShoppingCartApp
:
- Scala
-
source
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.SourceProvider
- Java
-
source
import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.SourceProvider;
Create the SourceProvider
SourceProvider
. The Event Sourced Source Provider is using Akka Persistence internally (specifically the eventsByTag API). To initialize the Source Provider we need to set parameters to choose the Akka Persistence plugin (Cassandra) to use as well as the name of the tag used for events we’re interested in from the journal.
Setup the SourceProvider
in the Guardian Behavior
defined in ShoppingCartApp
:
- Scala
-
source
val sourceProvider: SourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]] = EventSourcedProvider .eventsByTag[ShoppingCartEvents.Event]( system, readJournalPluginId = CassandraReadJournal.Identifier, tag = ShoppingCartTags.Single)
- Java
-
source
SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider = EventSourcedProvider.eventsByTag( system, CassandraReadJournal.Identifier(), ShoppingCartTags.SINGLE);
Finally, we must configure Akka Persistence by adding a configuration file guide-shopping-cart-app.conf
to the src/main/resources/
directory of the project:
sourcedatastax-java-driver {
# basic.contact-points = ["127.0.0.1:9042"]
# basic.load-balancing-policy.local-datacenter = "datacenter1"
advanced {
# reconnect to c* if down when app is started
reconnect-on-init = true
}
}
akka {
loglevel = DEBUG
persistence.journal {
plugin = "akka.persistence.cassandra.journal"
auto-start-journals = ["akka.persistence.cassandra.journal"]
}
persistence.snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
persistence {
cassandra {
journal {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
snapshot {
# to create the schema
keyspace-autocreate = true
tables-autocreate = true
}
query {
refresh-interval = 2s
}
events-by-tag {
# for lower latency
eventual-consistency-delay = 25ms
flush-interval = 25ms
pubsub-notification = on
}
}
}
}